feat: update dependencies and improve caching mechanism
Some checks failed
PR Check / check-and-test (pull_request) Failing after 2m11s
Some checks failed
PR Check / check-and-test (pull_request) Failing after 2m11s
- Added Prometheus client library for metrics collection. - Refactored garbage collection strategy from random deletion to LRU (Least Recently Used) deletion. - Introduced per-key locking in cache to prevent race conditions. - Enhanced logging with structured log messages for cache hits and misses. - Implemented a retry mechanism for upstream requests with exponential backoff. - Updated Go modules and indirect dependencies for better compatibility and performance. - Removed unused sync filesystem implementation. - Added version initialization to ensure a default version string.
This commit is contained in:
26
vfs/cache/cache.go
vendored
26
vfs/cache/cache.go
vendored
@@ -5,6 +5,7 @@ import (
|
||||
"s1d3sw1ped/SteamCache2/vfs"
|
||||
"s1d3sw1ped/SteamCache2/vfs/cachestate"
|
||||
"s1d3sw1ped/SteamCache2/vfs/vfserror"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Ensure CacheFS implements VFS.
|
||||
@@ -16,6 +17,8 @@ type CacheFS struct {
|
||||
slow vfs.VFS
|
||||
|
||||
cacheHandler CacheHandler
|
||||
|
||||
keyLocks sync.Map // map[string]*sync.RWMutex for per-key locks
|
||||
}
|
||||
|
||||
type CacheHandler func(*vfs.FileInfo, cachestate.CacheState) bool
|
||||
@@ -24,6 +27,7 @@ type CacheHandler func(*vfs.FileInfo, cachestate.CacheState) bool
|
||||
func New(cacheHandler CacheHandler) *CacheFS {
|
||||
return &CacheFS{
|
||||
cacheHandler: cacheHandler,
|
||||
keyLocks: sync.Map{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,6 +43,12 @@ func (c *CacheFS) SetFast(vfs vfs.VFS) {
|
||||
c.fast = vfs
|
||||
}
|
||||
|
||||
// getKeyLock returns a RWMutex for the given key, creating it if necessary.
|
||||
func (c *CacheFS) getKeyLock(key string) *sync.RWMutex {
|
||||
mu, _ := c.keyLocks.LoadOrStore(key, &sync.RWMutex{})
|
||||
return mu.(*sync.RWMutex)
|
||||
}
|
||||
|
||||
// cacheState returns the state of the file at key.
|
||||
func (c *CacheFS) cacheState(key string) cachestate.CacheState {
|
||||
if c.fast != nil {
|
||||
@@ -65,6 +75,10 @@ func (c *CacheFS) Size() int64 {
|
||||
|
||||
// Set sets the file at key to src. If the file is already in the cache, it is replaced.
|
||||
func (c *CacheFS) Set(key string, src []byte) error {
|
||||
mu := c.getKeyLock(key)
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
state := c.cacheState(key)
|
||||
|
||||
switch state {
|
||||
@@ -82,6 +96,10 @@ func (c *CacheFS) Set(key string, src []byte) error {
|
||||
|
||||
// Delete deletes the file at key from the cache.
|
||||
func (c *CacheFS) Delete(key string) error {
|
||||
mu := c.getKeyLock(key)
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if c.fast != nil {
|
||||
c.fast.Delete(key)
|
||||
}
|
||||
@@ -96,6 +114,10 @@ func (c *CacheFS) Get(key string) ([]byte, error) {
|
||||
|
||||
// GetS returns the file at key. If the file is not in the cache, it is fetched from the storage. It also returns the cache state.
|
||||
func (c *CacheFS) GetS(key string) ([]byte, cachestate.CacheState, error) {
|
||||
mu := c.getKeyLock(key)
|
||||
mu.RLock()
|
||||
defer mu.RUnlock()
|
||||
|
||||
state := c.cacheState(key)
|
||||
|
||||
switch state {
|
||||
@@ -130,6 +152,10 @@ func (c *CacheFS) GetS(key string) ([]byte, cachestate.CacheState, error) {
|
||||
// Stat returns information about the file at key.
|
||||
// Warning: This will return information about the file in the fastest storage its in.
|
||||
func (c *CacheFS) Stat(key string) (*vfs.FileInfo, error) {
|
||||
mu := c.getKeyLock(key)
|
||||
mu.RLock()
|
||||
defer mu.RUnlock()
|
||||
|
||||
state := c.cacheState(key)
|
||||
|
||||
switch state {
|
||||
|
||||
@@ -25,6 +25,8 @@ type DiskFS struct {
|
||||
capacity int64
|
||||
mu sync.Mutex
|
||||
sg sync.WaitGroup
|
||||
|
||||
bytePool sync.Pool // Pool for []byte slices
|
||||
}
|
||||
|
||||
// New creates a new DiskFS.
|
||||
@@ -42,6 +44,11 @@ func new(root string, capacity int64, skipinit bool) *DiskFS {
|
||||
if !os.IsNotExist(err) {
|
||||
panic(err) // panic if the error is something other than not found
|
||||
}
|
||||
os.Mkdir(root, 0755) // create the root directory if it does not exist
|
||||
fi, err = os.Stat(root) // re-stat to get the file info
|
||||
if err != nil {
|
||||
panic(err) // panic if the re-stat fails
|
||||
}
|
||||
}
|
||||
if !fi.IsDir() {
|
||||
panic("disk root must be a directory") // panic if the root is not a directory
|
||||
@@ -53,6 +60,9 @@ func new(root string, capacity int64, skipinit bool) *DiskFS {
|
||||
capacity: capacity,
|
||||
mu: sync.Mutex{},
|
||||
sg: sync.WaitGroup{},
|
||||
bytePool: sync.Pool{
|
||||
New: func() interface{} { return make([]byte, 0) }, // Initial capacity for pooled slices is 0, will grow as needed
|
||||
},
|
||||
}
|
||||
|
||||
os.MkdirAll(dfs.root, 0755)
|
||||
@@ -73,8 +83,6 @@ func NewSkipInit(root string, capacity int64) *DiskFS {
|
||||
}
|
||||
|
||||
func (d *DiskFS) init() {
|
||||
// logger.Logger.Info().Str("name", d.Name()).Str("root", d.root).Str("capacity", units.HumanSize(float64(d.capacity))).Msg("init")
|
||||
|
||||
tstart := time.Now()
|
||||
|
||||
d.walk(d.root)
|
||||
@@ -110,11 +118,9 @@ func (d *DiskFS) walk(path string) {
|
||||
|
||||
d.mu.Lock()
|
||||
k := strings.ReplaceAll(npath[len(d.root)+1:], "\\", "/")
|
||||
logger.Logger.Debug().Str("name", k).Str("root", d.root).Msg("walk")
|
||||
d.info[k] = vfs.NewFileInfoFromOS(info, k)
|
||||
d.mu.Unlock()
|
||||
|
||||
// logger.Logger.Debug().Str("name", d.Name()).Str("root", d.root).Str("capacity", units.HumanSize(float64(d.capacity))).Str("path", npath).Msg("init")
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
@@ -153,10 +159,7 @@ func (d *DiskFS) Set(key string, src []byte) error {
|
||||
}
|
||||
}
|
||||
|
||||
logger.Logger.Debug().Str("name", key).Str("root", d.root).Msg("set")
|
||||
|
||||
if _, err := d.Stat(key); err == nil {
|
||||
logger.Logger.Debug().Str("name", key).Str("root", d.root).Msg("delete")
|
||||
d.Delete(key)
|
||||
}
|
||||
|
||||
@@ -224,7 +227,16 @@ func (d *DiskFS) Get(key string) ([]byte, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return data, nil
|
||||
// Use pooled slice for return if possible, but since ReadFile allocates new, copy to pool if beneficial
|
||||
dst := d.bytePool.Get().([]byte)
|
||||
if cap(dst) < len(data) {
|
||||
dst = make([]byte, len(data)) // create a new slice if the pool slice is too small
|
||||
} else {
|
||||
dst = dst[:len(data)] // reuse the pool slice, but resize it to fit
|
||||
}
|
||||
dst = dst[:len(data)]
|
||||
copy(dst, data)
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
// Stat returns the FileInfo of key. If key is not found in the cache, it will stat the file on disk. If the file is not found on disk, it will return vfs.ErrNotFound.
|
||||
@@ -236,8 +248,6 @@ func (d *DiskFS) Stat(key string) (*vfs.FileInfo, error) {
|
||||
return nil, vfserror.ErrInvalidKey
|
||||
}
|
||||
|
||||
logger.Logger.Debug().Str("name", key).Str("root", d.root).Msg("stat")
|
||||
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"s1d3sw1ped/SteamCache2/steamcache/logger"
|
||||
"s1d3sw1ped/SteamCache2/vfs"
|
||||
"s1d3sw1ped/SteamCache2/vfs/vfserror"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/docker/go-units"
|
||||
)
|
||||
|
||||
// Ensure MemoryFS implements VFS.
|
||||
@@ -21,6 +24,8 @@ type MemoryFS struct {
|
||||
files map[string]*file
|
||||
capacity int64
|
||||
mu sync.Mutex
|
||||
|
||||
bytePool sync.Pool // Pool for []byte slices
|
||||
}
|
||||
|
||||
// New creates a new MemoryFS.
|
||||
@@ -29,10 +34,18 @@ func New(capacity int64) *MemoryFS {
|
||||
panic("memory capacity must be greater than 0") // panic if the capacity is less than or equal to 0
|
||||
}
|
||||
|
||||
logger.Logger.Info().
|
||||
Str("name", "MemoryFS").
|
||||
Str("capacity", units.HumanSize(float64(capacity))).
|
||||
Msg("init")
|
||||
|
||||
return &MemoryFS{
|
||||
files: make(map[string]*file),
|
||||
capacity: capacity,
|
||||
mu: sync.Mutex{},
|
||||
bytePool: sync.Pool{
|
||||
New: func() interface{} { return make([]byte, 0) }, // Initial capacity for pooled slices
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,13 +80,22 @@ func (m *MemoryFS) Set(key string, src []byte) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
// Use pooled slice
|
||||
data := m.bytePool.Get().([]byte)
|
||||
if cap(data) < len(src) {
|
||||
data = make([]byte, len(src)) // expand the slice if the pool slice is too small
|
||||
} else {
|
||||
data = data[:len(src)] // reuse the pool slice, but resize it to fit
|
||||
}
|
||||
copy(data, src)
|
||||
|
||||
m.files[key] = &file{
|
||||
fileinfo: vfs.NewFileInfo(
|
||||
key,
|
||||
int64(len(src)),
|
||||
time.Now(),
|
||||
),
|
||||
data: src,
|
||||
data: data,
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -88,6 +110,11 @@ func (m *MemoryFS) Delete(key string) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
// Return data to pool
|
||||
if f, ok := m.files[key]; ok {
|
||||
m.bytePool.Put(f.data)
|
||||
}
|
||||
|
||||
delete(m.files, key)
|
||||
|
||||
return nil
|
||||
@@ -106,6 +133,12 @@ func (m *MemoryFS) Get(key string) ([]byte, error) {
|
||||
dst := make([]byte, len(m.files[key].data))
|
||||
copy(dst, m.files[key].data)
|
||||
|
||||
logger.Logger.Debug().
|
||||
Str("name", key).
|
||||
Str("status", "GET").
|
||||
Int64("size", int64(len(dst))).
|
||||
Msg("get file from memory")
|
||||
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
|
||||
116
vfs/sync/sync.go
116
vfs/sync/sync.go
@@ -1,76 +1,76 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"s1d3sw1ped/SteamCache2/vfs"
|
||||
"sync"
|
||||
)
|
||||
// import (
|
||||
// "fmt"
|
||||
// "s1d3sw1ped/SteamCache2/vfs"
|
||||
// "sync"
|
||||
// )
|
||||
|
||||
// Ensure SyncFS implements VFS.
|
||||
var _ vfs.VFS = (*SyncFS)(nil)
|
||||
// // Ensure SyncFS implements VFS.
|
||||
// var _ vfs.VFS = (*SyncFS)(nil)
|
||||
|
||||
type SyncFS struct {
|
||||
vfs vfs.VFS
|
||||
mu sync.RWMutex
|
||||
}
|
||||
// type SyncFS struct {
|
||||
// vfs vfs.VFS
|
||||
// mu sync.RWMutex
|
||||
// }
|
||||
|
||||
func New(vfs vfs.VFS) *SyncFS {
|
||||
return &SyncFS{
|
||||
vfs: vfs,
|
||||
mu: sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
// func New(vfs vfs.VFS) *SyncFS {
|
||||
// return &SyncFS{
|
||||
// vfs: vfs,
|
||||
// mu: sync.RWMutex{},
|
||||
// }
|
||||
// }
|
||||
|
||||
// Name returns the name of the file system.
|
||||
func (sfs *SyncFS) Name() string {
|
||||
return fmt.Sprintf("SyncFS(%s)", sfs.vfs.Name())
|
||||
}
|
||||
// // Name returns the name of the file system.
|
||||
// func (sfs *SyncFS) Name() string {
|
||||
// return fmt.Sprintf("SyncFS(%s)", sfs.vfs.Name())
|
||||
// }
|
||||
|
||||
// Size returns the total size of all files in the file system.
|
||||
func (sfs *SyncFS) Size() int64 {
|
||||
sfs.mu.RLock()
|
||||
defer sfs.mu.RUnlock()
|
||||
// // Size returns the total size of all files in the file system.
|
||||
// func (sfs *SyncFS) Size() int64 {
|
||||
// sfs.mu.RLock()
|
||||
// defer sfs.mu.RUnlock()
|
||||
|
||||
return sfs.vfs.Size()
|
||||
}
|
||||
// return sfs.vfs.Size()
|
||||
// }
|
||||
|
||||
// Set sets the value of key as src.
|
||||
// Setting the same key multiple times, the last set call takes effect.
|
||||
func (sfs *SyncFS) Set(key string, src []byte) error {
|
||||
sfs.mu.Lock()
|
||||
defer sfs.mu.Unlock()
|
||||
// // Set sets the value of key as src.
|
||||
// // Setting the same key multiple times, the last set call takes effect.
|
||||
// func (sfs *SyncFS) Set(key string, src []byte) error {
|
||||
// sfs.mu.Lock()
|
||||
// defer sfs.mu.Unlock()
|
||||
|
||||
return sfs.vfs.Set(key, src)
|
||||
}
|
||||
// return sfs.vfs.Set(key, src)
|
||||
// }
|
||||
|
||||
// Delete deletes the value of key.
|
||||
func (sfs *SyncFS) Delete(key string) error {
|
||||
sfs.mu.Lock()
|
||||
defer sfs.mu.Unlock()
|
||||
// // Delete deletes the value of key.
|
||||
// func (sfs *SyncFS) Delete(key string) error {
|
||||
// sfs.mu.Lock()
|
||||
// defer sfs.mu.Unlock()
|
||||
|
||||
return sfs.vfs.Delete(key)
|
||||
}
|
||||
// return sfs.vfs.Delete(key)
|
||||
// }
|
||||
|
||||
// Get gets the value of key to dst, and returns dst no matter whether or not there is an error.
|
||||
func (sfs *SyncFS) Get(key string) ([]byte, error) {
|
||||
sfs.mu.RLock()
|
||||
defer sfs.mu.RUnlock()
|
||||
// // Get gets the value of key to dst, and returns dst no matter whether or not there is an error.
|
||||
// func (sfs *SyncFS) Get(key string) ([]byte, error) {
|
||||
// sfs.mu.RLock()
|
||||
// defer sfs.mu.RUnlock()
|
||||
|
||||
return sfs.vfs.Get(key)
|
||||
}
|
||||
// return sfs.vfs.Get(key)
|
||||
// }
|
||||
|
||||
// Stat returns the FileInfo of key.
|
||||
func (sfs *SyncFS) Stat(key string) (*vfs.FileInfo, error) {
|
||||
sfs.mu.RLock()
|
||||
defer sfs.mu.RUnlock()
|
||||
// // Stat returns the FileInfo of key.
|
||||
// func (sfs *SyncFS) Stat(key string) (*vfs.FileInfo, error) {
|
||||
// sfs.mu.RLock()
|
||||
// defer sfs.mu.RUnlock()
|
||||
|
||||
return sfs.vfs.Stat(key)
|
||||
}
|
||||
// return sfs.vfs.Stat(key)
|
||||
// }
|
||||
|
||||
// StatAll returns the FileInfo of all keys.
|
||||
func (sfs *SyncFS) StatAll() []*vfs.FileInfo {
|
||||
sfs.mu.RLock()
|
||||
defer sfs.mu.RUnlock()
|
||||
// // StatAll returns the FileInfo of all keys.
|
||||
// func (sfs *SyncFS) StatAll() []*vfs.FileInfo {
|
||||
// sfs.mu.RLock()
|
||||
// defer sfs.mu.RUnlock()
|
||||
|
||||
return sfs.vfs.StatAll()
|
||||
}
|
||||
// return sfs.vfs.StatAll()
|
||||
// }
|
||||
|
||||
Reference in New Issue
Block a user