Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| bfe29dea75 | |||
| 9b2affe95a |
@@ -21,7 +21,6 @@ import (
|
||||
"s1d3sw1ped/steamcache2/vfs/gc"
|
||||
"s1d3sw1ped/steamcache2/vfs/memory"
|
||||
"s1d3sw1ped/steamcache2/vfs/predictive"
|
||||
"s1d3sw1ped/steamcache2/vfs/warming"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -781,14 +780,14 @@ type SteamCache struct {
|
||||
// Adaptive and predictive caching
|
||||
adaptiveManager *adaptive.AdaptiveCacheManager
|
||||
predictiveManager *predictive.PredictiveCacheManager
|
||||
cacheWarmer *warming.CacheWarmer
|
||||
cacheWarmer *predictive.CacheWarmer
|
||||
lastAccessKey string // Track last accessed key for sequence analysis
|
||||
lastAccessKeyMu sync.RWMutex
|
||||
adaptiveEnabled bool // Flag to enable/disable adaptive features
|
||||
|
||||
// Dynamic memory management
|
||||
memoryMonitor *memory.MemoryMonitor
|
||||
dynamicCacheMgr *memory.DynamicCacheManager
|
||||
dynamicCacheMgr *memory.MemoryMonitor
|
||||
}
|
||||
|
||||
func New(address string, memorySize string, diskSize string, diskPath, upstream, memoryGC, diskGC string, maxConcurrentRequests int64, maxRequestsPerClient int64) *SteamCache {
|
||||
@@ -925,8 +924,8 @@ func New(address string, memorySize string, diskSize string, diskPath, upstream,
|
||||
// Initialize adaptive and predictive caching (lightweight)
|
||||
adaptiveManager: adaptive.NewAdaptiveCacheManager(5 * time.Minute), // Much longer interval
|
||||
predictiveManager: predictive.NewPredictiveCacheManager(),
|
||||
cacheWarmer: warming.NewCacheWarmer(c, 2), // Reduced to 2 concurrent warmers
|
||||
adaptiveEnabled: true, // Enable by default but can be disabled
|
||||
cacheWarmer: predictive.NewCacheWarmer(), // Use predictive cache warmer
|
||||
adaptiveEnabled: true, // Enable by default but can be disabled
|
||||
|
||||
// Initialize dynamic memory management
|
||||
memoryMonitor: memory.NewMemoryMonitor(uint64(memorysize), 10*time.Second, 0.1), // 10% threshold
|
||||
@@ -935,7 +934,7 @@ func New(address string, memorySize string, diskSize string, diskPath, upstream,
|
||||
|
||||
// Initialize dynamic cache manager if we have memory cache
|
||||
if m != nil && sc.memoryMonitor != nil {
|
||||
sc.dynamicCacheMgr = memory.NewDynamicCacheManager(mgc, uint64(memorysize), sc.memoryMonitor)
|
||||
sc.dynamicCacheMgr = memory.NewMemoryMonitorWithCache(uint64(memorysize), 10*time.Second, 0.1, mgc, uint64(memorysize))
|
||||
sc.dynamicCacheMgr.Start()
|
||||
sc.memoryMonitor.Start()
|
||||
}
|
||||
@@ -1535,6 +1534,6 @@ func (sc *SteamCache) recordCacheMiss(key string, size int64) {
|
||||
|
||||
// Only trigger warming for very large files to reduce overhead
|
||||
if size > 10*1024*1024 { // Only warm files > 10MB
|
||||
sc.cacheWarmer.RequestWarming(key, 3, "cache_miss", size, "cache_miss_analyzer")
|
||||
sc.cacheWarmer.RequestWarming(key, 3, "cache_miss", size)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,20 +3,25 @@ package steamcache
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestCaching(t *testing.T) {
|
||||
td := t.TempDir()
|
||||
|
||||
os.WriteFile(filepath.Join(td, "key2"), []byte("value2"), 0644)
|
||||
|
||||
sc := New("localhost:8080", "1G", "1G", td, "", "lru", "lru", 200, 5)
|
||||
|
||||
w, err := sc.vfs.Create("key", 5)
|
||||
// Create key2 through the VFS system instead of directly
|
||||
w, err := sc.vfs.Create("key2", 6)
|
||||
if err != nil {
|
||||
t.Errorf("Create key2 failed: %v", err)
|
||||
}
|
||||
w.Write([]byte("value2"))
|
||||
w.Close()
|
||||
|
||||
w, err = sc.vfs.Create("key", 5)
|
||||
if err != nil {
|
||||
t.Errorf("Create failed: %v", err)
|
||||
}
|
||||
@@ -82,9 +87,18 @@ func TestCaching(t *testing.T) {
|
||||
t.Errorf("Total size too large: got %d, want at most 34", sc.vfs.Size())
|
||||
}
|
||||
|
||||
// First ensure the file is indexed by opening it
|
||||
rc, err = sc.vfs.Open("key2")
|
||||
if err != nil {
|
||||
t.Errorf("Open key2 failed: %v", err)
|
||||
}
|
||||
rc.Close()
|
||||
|
||||
// Give promotion goroutine time to complete before deleting
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
sc.memory.Delete("key2")
|
||||
sc.disk.Delete("key2") // Also delete from disk cache
|
||||
os.Remove(filepath.Join(td, "key2"))
|
||||
|
||||
if _, err := sc.vfs.Open("key2"); err == nil {
|
||||
t.Errorf("Open failed: got nil, want error")
|
||||
|
||||
376
vfs/cache/cache.go
vendored
376
vfs/cache/cache.go
vendored
@@ -5,56 +5,47 @@ import (
|
||||
"io"
|
||||
"s1d3sw1ped/steamcache2/vfs"
|
||||
"s1d3sw1ped/steamcache2/vfs/vfserror"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// TieredCache implements a two-tier cache with fast (memory) and slow (disk) storage
|
||||
// TieredCache implements a lock-free two-tier cache for better concurrency
|
||||
type TieredCache struct {
|
||||
fast vfs.VFS // Memory cache (fast)
|
||||
slow vfs.VFS // Disk cache (slow)
|
||||
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// LockFreeTieredCache implements a lock-free two-tier cache for better concurrency
|
||||
type LockFreeTieredCache struct {
|
||||
fast *atomic.Value // Memory cache (fast) - atomic.Value for lock-free access
|
||||
slow *atomic.Value // Disk cache (slow) - atomic.Value for lock-free access
|
||||
}
|
||||
|
||||
// New creates a new tiered cache
|
||||
func New() *TieredCache {
|
||||
return &TieredCache{}
|
||||
return &TieredCache{
|
||||
fast: &atomic.Value{},
|
||||
slow: &atomic.Value{},
|
||||
}
|
||||
}
|
||||
|
||||
// SetFast sets the fast (memory) tier
|
||||
// SetFast sets the fast (memory) tier atomically
|
||||
func (tc *TieredCache) SetFast(vfs vfs.VFS) {
|
||||
tc.mu.Lock()
|
||||
defer tc.mu.Unlock()
|
||||
tc.fast = vfs
|
||||
tc.fast.Store(vfs)
|
||||
}
|
||||
|
||||
// SetSlow sets the slow (disk) tier
|
||||
// SetSlow sets the slow (disk) tier atomically
|
||||
func (tc *TieredCache) SetSlow(vfs vfs.VFS) {
|
||||
tc.mu.Lock()
|
||||
defer tc.mu.Unlock()
|
||||
tc.slow = vfs
|
||||
tc.slow.Store(vfs)
|
||||
}
|
||||
|
||||
// Create creates a new file, preferring the slow tier for persistence testing
|
||||
// Create creates a new file, preferring the slow tier for persistence
|
||||
func (tc *TieredCache) Create(key string, size int64) (io.WriteCloser, error) {
|
||||
tc.mu.RLock()
|
||||
defer tc.mu.RUnlock()
|
||||
|
||||
// Try slow tier first (disk) for better testability
|
||||
if tc.slow != nil {
|
||||
return tc.slow.Create(key, size)
|
||||
if slow := tc.slow.Load(); slow != nil {
|
||||
if vfs, ok := slow.(vfs.VFS); ok {
|
||||
return vfs.Create(key, size)
|
||||
}
|
||||
}
|
||||
|
||||
// Fall back to fast tier (memory)
|
||||
if tc.fast != nil {
|
||||
return tc.fast.Create(key, size)
|
||||
if fast := tc.fast.Load(); fast != nil {
|
||||
if vfs, ok := fast.(vfs.VFS); ok {
|
||||
return vfs.Create(key, size)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, vfserror.ErrNotFound
|
||||
@@ -62,40 +53,34 @@ func (tc *TieredCache) Create(key string, size int64) (io.WriteCloser, error) {
|
||||
|
||||
// Open opens a file, checking fast tier first, then slow tier with promotion
|
||||
func (tc *TieredCache) Open(key string) (io.ReadCloser, error) {
|
||||
tc.mu.RLock()
|
||||
defer tc.mu.RUnlock()
|
||||
|
||||
// Try fast tier first (memory)
|
||||
if tc.fast != nil {
|
||||
if reader, err := tc.fast.Open(key); err == nil {
|
||||
return reader, nil
|
||||
if fast := tc.fast.Load(); fast != nil {
|
||||
if vfs, ok := fast.(vfs.VFS); ok {
|
||||
if reader, err := vfs.Open(key); err == nil {
|
||||
return reader, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fall back to slow tier (disk) and promote to fast tier
|
||||
if tc.slow != nil {
|
||||
reader, err := tc.slow.Open(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if slow := tc.slow.Load(); slow != nil {
|
||||
if vfs, ok := slow.(vfs.VFS); ok {
|
||||
reader, err := vfs.Open(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If we have both tiers, check if we should promote the file to fast tier
|
||||
if tc.fast != nil {
|
||||
// Check file size before promoting - don't promote if larger than available memory cache space
|
||||
if info, err := tc.slow.Stat(key); err == nil {
|
||||
availableSpace := tc.fast.Capacity() - tc.fast.Size()
|
||||
// Only promote if file fits in available space (with 10% buffer for safety)
|
||||
if info.Size <= int64(float64(availableSpace)*0.9) {
|
||||
// Create a new reader for promotion to avoid interfering with the returned reader
|
||||
promotionReader, err := tc.slow.Open(key)
|
||||
if err == nil {
|
||||
go tc.promoteToFast(key, promotionReader)
|
||||
}
|
||||
// If we have both tiers, promote the file to fast tier
|
||||
if fast := tc.fast.Load(); fast != nil {
|
||||
// Create a new reader for promotion to avoid interfering with the returned reader
|
||||
promotionReader, err := vfs.Open(key)
|
||||
if err == nil {
|
||||
go tc.promoteToFast(key, promotionReader)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return reader, nil
|
||||
return reader, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, vfserror.ErrNotFound
|
||||
@@ -103,22 +88,23 @@ func (tc *TieredCache) Open(key string) (io.ReadCloser, error) {
|
||||
|
||||
// Delete removes a file from all tiers
|
||||
func (tc *TieredCache) Delete(key string) error {
|
||||
tc.mu.RLock()
|
||||
defer tc.mu.RUnlock()
|
||||
|
||||
var lastErr error
|
||||
|
||||
// Delete from fast tier
|
||||
if tc.fast != nil {
|
||||
if err := tc.fast.Delete(key); err != nil {
|
||||
lastErr = err
|
||||
if fast := tc.fast.Load(); fast != nil {
|
||||
if vfs, ok := fast.(vfs.VFS); ok {
|
||||
if err := vfs.Delete(key); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete from slow tier
|
||||
if tc.slow != nil {
|
||||
if err := tc.slow.Delete(key); err != nil {
|
||||
lastErr = err
|
||||
if slow := tc.slow.Load(); slow != nil {
|
||||
if vfs, ok := slow.(vfs.VFS); ok {
|
||||
if err := vfs.Delete(key); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -127,19 +113,20 @@ func (tc *TieredCache) Delete(key string) error {
|
||||
|
||||
// Stat returns file information, checking fast tier first
|
||||
func (tc *TieredCache) Stat(key string) (*vfs.FileInfo, error) {
|
||||
tc.mu.RLock()
|
||||
defer tc.mu.RUnlock()
|
||||
|
||||
// Try fast tier first (memory)
|
||||
if tc.fast != nil {
|
||||
if info, err := tc.fast.Stat(key); err == nil {
|
||||
return info, nil
|
||||
if fast := tc.fast.Load(); fast != nil {
|
||||
if vfs, ok := fast.(vfs.VFS); ok {
|
||||
if info, err := vfs.Stat(key); err == nil {
|
||||
return info, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fall back to slow tier (disk)
|
||||
if tc.slow != nil {
|
||||
return tc.slow.Stat(key)
|
||||
if slow := tc.slow.Load(); slow != nil {
|
||||
if vfs, ok := slow.(vfs.VFS); ok {
|
||||
return vfs.Stat(key)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, vfserror.ErrNotFound
|
||||
@@ -152,31 +139,39 @@ func (tc *TieredCache) Name() string {
|
||||
|
||||
// Size returns the total size across all tiers
|
||||
func (tc *TieredCache) Size() int64 {
|
||||
tc.mu.RLock()
|
||||
defer tc.mu.RUnlock()
|
||||
|
||||
var total int64
|
||||
if tc.fast != nil {
|
||||
total += tc.fast.Size()
|
||||
|
||||
if fast := tc.fast.Load(); fast != nil {
|
||||
if vfs, ok := fast.(vfs.VFS); ok {
|
||||
total += vfs.Size()
|
||||
}
|
||||
}
|
||||
if tc.slow != nil {
|
||||
total += tc.slow.Size()
|
||||
|
||||
if slow := tc.slow.Load(); slow != nil {
|
||||
if vfs, ok := slow.(vfs.VFS); ok {
|
||||
total += vfs.Size()
|
||||
}
|
||||
}
|
||||
|
||||
return total
|
||||
}
|
||||
|
||||
// Capacity returns the total capacity across all tiers
|
||||
func (tc *TieredCache) Capacity() int64 {
|
||||
tc.mu.RLock()
|
||||
defer tc.mu.RUnlock()
|
||||
|
||||
var total int64
|
||||
if tc.fast != nil {
|
||||
total += tc.fast.Capacity()
|
||||
|
||||
if fast := tc.fast.Load(); fast != nil {
|
||||
if vfs, ok := fast.(vfs.VFS); ok {
|
||||
total += vfs.Capacity()
|
||||
}
|
||||
}
|
||||
if tc.slow != nil {
|
||||
total += tc.slow.Capacity()
|
||||
|
||||
if slow := tc.slow.Load(); slow != nil {
|
||||
if vfs, ok := slow.(vfs.VFS); ok {
|
||||
total += vfs.Capacity()
|
||||
}
|
||||
}
|
||||
|
||||
return total
|
||||
}
|
||||
|
||||
@@ -185,217 +180,8 @@ func (tc *TieredCache) promoteToFast(key string, reader io.ReadCloser) {
|
||||
defer reader.Close()
|
||||
|
||||
// Get file info from slow tier to determine size
|
||||
tc.mu.RLock()
|
||||
var size int64
|
||||
if tc.slow != nil {
|
||||
if info, err := tc.slow.Stat(key); err == nil {
|
||||
size = info.Size
|
||||
} else {
|
||||
tc.mu.RUnlock()
|
||||
return // Skip promotion if we can't get file info
|
||||
}
|
||||
}
|
||||
tc.mu.RUnlock()
|
||||
|
||||
// Check if file fits in available memory cache space
|
||||
tc.mu.RLock()
|
||||
if tc.fast != nil {
|
||||
availableSpace := tc.fast.Capacity() - tc.fast.Size()
|
||||
// Only promote if file fits in available space (with 10% buffer for safety)
|
||||
if size > int64(float64(availableSpace)*0.9) {
|
||||
tc.mu.RUnlock()
|
||||
return // Skip promotion if file is too large
|
||||
}
|
||||
}
|
||||
tc.mu.RUnlock()
|
||||
|
||||
// Read the entire file content
|
||||
content, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return // Skip promotion if read fails
|
||||
}
|
||||
|
||||
// Create the file in fast tier
|
||||
tc.mu.RLock()
|
||||
if tc.fast != nil {
|
||||
writer, err := tc.fast.Create(key, size)
|
||||
if err == nil {
|
||||
// Write content to fast tier
|
||||
writer.Write(content)
|
||||
writer.Close()
|
||||
}
|
||||
}
|
||||
tc.mu.RUnlock()
|
||||
}
|
||||
|
||||
// NewLockFree creates a new lock-free tiered cache
|
||||
func NewLockFree() *LockFreeTieredCache {
|
||||
return &LockFreeTieredCache{
|
||||
fast: &atomic.Value{},
|
||||
slow: &atomic.Value{},
|
||||
}
|
||||
}
|
||||
|
||||
// SetFast sets the fast (memory) tier atomically
|
||||
func (lftc *LockFreeTieredCache) SetFast(vfs vfs.VFS) {
|
||||
lftc.fast.Store(vfs)
|
||||
}
|
||||
|
||||
// SetSlow sets the slow (disk) tier atomically
|
||||
func (lftc *LockFreeTieredCache) SetSlow(vfs vfs.VFS) {
|
||||
lftc.slow.Store(vfs)
|
||||
}
|
||||
|
||||
// Create creates a new file, preferring the slow tier for persistence
|
||||
func (lftc *LockFreeTieredCache) Create(key string, size int64) (io.WriteCloser, error) {
|
||||
// Try slow tier first (disk) for better testability
|
||||
if slow := lftc.slow.Load(); slow != nil {
|
||||
if vfs, ok := slow.(vfs.VFS); ok {
|
||||
return vfs.Create(key, size)
|
||||
}
|
||||
}
|
||||
|
||||
// Fall back to fast tier (memory)
|
||||
if fast := lftc.fast.Load(); fast != nil {
|
||||
if vfs, ok := fast.(vfs.VFS); ok {
|
||||
return vfs.Create(key, size)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, vfserror.ErrNotFound
|
||||
}
|
||||
|
||||
// Open opens a file, checking fast tier first, then slow tier with promotion
|
||||
func (lftc *LockFreeTieredCache) Open(key string) (io.ReadCloser, error) {
|
||||
// Try fast tier first (memory)
|
||||
if fast := lftc.fast.Load(); fast != nil {
|
||||
if vfs, ok := fast.(vfs.VFS); ok {
|
||||
if reader, err := vfs.Open(key); err == nil {
|
||||
return reader, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fall back to slow tier (disk) and promote to fast tier
|
||||
if slow := lftc.slow.Load(); slow != nil {
|
||||
if vfs, ok := slow.(vfs.VFS); ok {
|
||||
reader, err := vfs.Open(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If we have both tiers, promote the file to fast tier
|
||||
if fast := lftc.fast.Load(); fast != nil {
|
||||
// Create a new reader for promotion to avoid interfering with the returned reader
|
||||
promotionReader, err := vfs.Open(key)
|
||||
if err == nil {
|
||||
go lftc.promoteToFast(key, promotionReader)
|
||||
}
|
||||
}
|
||||
|
||||
return reader, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, vfserror.ErrNotFound
|
||||
}
|
||||
|
||||
// Delete removes a file from all tiers
|
||||
func (lftc *LockFreeTieredCache) Delete(key string) error {
|
||||
var lastErr error
|
||||
|
||||
// Delete from fast tier
|
||||
if fast := lftc.fast.Load(); fast != nil {
|
||||
if vfs, ok := fast.(vfs.VFS); ok {
|
||||
if err := vfs.Delete(key); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete from slow tier
|
||||
if slow := lftc.slow.Load(); slow != nil {
|
||||
if vfs, ok := slow.(vfs.VFS); ok {
|
||||
if err := vfs.Delete(key); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return lastErr
|
||||
}
|
||||
|
||||
// Stat returns file information, checking fast tier first
|
||||
func (lftc *LockFreeTieredCache) Stat(key string) (*vfs.FileInfo, error) {
|
||||
// Try fast tier first (memory)
|
||||
if fast := lftc.fast.Load(); fast != nil {
|
||||
if vfs, ok := fast.(vfs.VFS); ok {
|
||||
if info, err := vfs.Stat(key); err == nil {
|
||||
return info, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fall back to slow tier (disk)
|
||||
if slow := lftc.slow.Load(); slow != nil {
|
||||
if vfs, ok := slow.(vfs.VFS); ok {
|
||||
return vfs.Stat(key)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, vfserror.ErrNotFound
|
||||
}
|
||||
|
||||
// Name returns the cache name
|
||||
func (lftc *LockFreeTieredCache) Name() string {
|
||||
return "LockFreeTieredCache"
|
||||
}
|
||||
|
||||
// Size returns the total size across all tiers
|
||||
func (lftc *LockFreeTieredCache) Size() int64 {
|
||||
var total int64
|
||||
|
||||
if fast := lftc.fast.Load(); fast != nil {
|
||||
if vfs, ok := fast.(vfs.VFS); ok {
|
||||
total += vfs.Size()
|
||||
}
|
||||
}
|
||||
|
||||
if slow := lftc.slow.Load(); slow != nil {
|
||||
if vfs, ok := slow.(vfs.VFS); ok {
|
||||
total += vfs.Size()
|
||||
}
|
||||
}
|
||||
|
||||
return total
|
||||
}
|
||||
|
||||
// Capacity returns the total capacity across all tiers
|
||||
func (lftc *LockFreeTieredCache) Capacity() int64 {
|
||||
var total int64
|
||||
|
||||
if fast := lftc.fast.Load(); fast != nil {
|
||||
if vfs, ok := fast.(vfs.VFS); ok {
|
||||
total += vfs.Capacity()
|
||||
}
|
||||
}
|
||||
|
||||
if slow := lftc.slow.Load(); slow != nil {
|
||||
if vfs, ok := slow.(vfs.VFS); ok {
|
||||
total += vfs.Capacity()
|
||||
}
|
||||
}
|
||||
|
||||
return total
|
||||
}
|
||||
|
||||
// promoteToFast promotes a file from slow tier to fast tier (lock-free version)
|
||||
func (lftc *LockFreeTieredCache) promoteToFast(key string, reader io.ReadCloser) {
|
||||
defer reader.Close()
|
||||
|
||||
// Get file info from slow tier to determine size
|
||||
var size int64
|
||||
if slow := lftc.slow.Load(); slow != nil {
|
||||
if slow := tc.slow.Load(); slow != nil {
|
||||
if vfs, ok := slow.(vfs.VFS); ok {
|
||||
if info, err := vfs.Stat(key); err == nil {
|
||||
size = info.Size
|
||||
@@ -406,7 +192,7 @@ func (lftc *LockFreeTieredCache) promoteToFast(key string, reader io.ReadCloser)
|
||||
}
|
||||
|
||||
// Check if file fits in available memory cache space
|
||||
if fast := lftc.fast.Load(); fast != nil {
|
||||
if fast := tc.fast.Load(); fast != nil {
|
||||
if vfs, ok := fast.(vfs.VFS); ok {
|
||||
availableSpace := vfs.Capacity() - vfs.Size()
|
||||
// Only promote if file fits in available space (with 10% buffer for safety)
|
||||
@@ -423,7 +209,7 @@ func (lftc *LockFreeTieredCache) promoteToFast(key string, reader io.ReadCloser)
|
||||
}
|
||||
|
||||
// Create the file in fast tier
|
||||
if fast := lftc.fast.Load(); fast != nil {
|
||||
if fast := tc.fast.Load(); fast != nil {
|
||||
if vfs, ok := fast.(vfs.VFS); ok {
|
||||
writer, err := vfs.Create(key, size)
|
||||
if err == nil {
|
||||
|
||||
425
vfs/disk/disk.go
425
vfs/disk/disk.go
@@ -2,17 +2,19 @@
|
||||
package disk
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"s1d3sw1ped/steamcache2/steamcache/logger"
|
||||
"s1d3sw1ped/steamcache2/vfs"
|
||||
"s1d3sw1ped/steamcache2/vfs/locks"
|
||||
"s1d3sw1ped/steamcache2/vfs/lru"
|
||||
"s1d3sw1ped/steamcache2/vfs/vfserror"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/docker/go-units"
|
||||
@@ -31,55 +33,10 @@ type DiskFS struct {
|
||||
size int64
|
||||
mu sync.RWMutex
|
||||
keyLocks []sync.Map // Sharded lock pools for better concurrency
|
||||
LRU *lruList
|
||||
LRU *lru.LRUList[*vfs.FileInfo]
|
||||
timeUpdater *vfs.BatchedTimeUpdate // Batched time updates for better performance
|
||||
}
|
||||
|
||||
// Number of lock shards for reducing contention
|
||||
const numLockShards = 32
|
||||
|
||||
// lruList for time-decayed LRU eviction
|
||||
type lruList struct {
|
||||
list *list.List
|
||||
elem map[string]*list.Element
|
||||
}
|
||||
|
||||
func newLruList() *lruList {
|
||||
return &lruList{
|
||||
list: list.New(),
|
||||
elem: make(map[string]*list.Element),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *lruList) Add(key string, fi *vfs.FileInfo) {
|
||||
elem := l.list.PushFront(fi)
|
||||
l.elem[key] = elem
|
||||
}
|
||||
|
||||
func (l *lruList) MoveToFront(key string, timeUpdater *vfs.BatchedTimeUpdate) {
|
||||
if elem, exists := l.elem[key]; exists {
|
||||
l.list.MoveToFront(elem)
|
||||
// Update the FileInfo in the element with new access time
|
||||
if fi := elem.Value.(*vfs.FileInfo); fi != nil {
|
||||
fi.UpdateAccessBatched(timeUpdater)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (l *lruList) Remove(key string) *vfs.FileInfo {
|
||||
if elem, exists := l.elem[key]; exists {
|
||||
delete(l.elem, key)
|
||||
if fi := l.list.Remove(elem).(*vfs.FileInfo); fi != nil {
|
||||
return fi
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *lruList) Len() int {
|
||||
return l.list.Len()
|
||||
}
|
||||
|
||||
// shardPath converts a Steam cache key to a sharded directory path to reduce inode pressure
|
||||
func (d *DiskFS) shardPath(key string) string {
|
||||
if !strings.HasPrefix(key, "steam/") {
|
||||
@@ -104,43 +61,6 @@ func (d *DiskFS) shardPath(key string) string {
|
||||
return filepath.Join("steam", shard1, shard2, hashPart)
|
||||
}
|
||||
|
||||
// extractKeyFromPath reverses the sharding logic to get the original key from a sharded path
|
||||
func (d *DiskFS) extractKeyFromPath(path string) string {
|
||||
// Fast path: if no slashes, it's not a sharded path
|
||||
if !strings.Contains(path, "/") {
|
||||
return path
|
||||
}
|
||||
|
||||
parts := strings.SplitN(path, "/", 5)
|
||||
numParts := len(parts)
|
||||
|
||||
if numParts >= 4 && parts[0] == "steam" {
|
||||
lastThree := parts[numParts-3:]
|
||||
shard1 := lastThree[0]
|
||||
shard2 := lastThree[1]
|
||||
filename := lastThree[2]
|
||||
|
||||
// Verify sharding is correct
|
||||
if len(filename) >= 4 && filename[:2] == shard1 && filename[2:4] == shard2 {
|
||||
return "steam/" + filename
|
||||
}
|
||||
}
|
||||
|
||||
// Handle single-level sharding for short hashes: steam/shard1/filename
|
||||
if numParts >= 3 && parts[0] == "steam" {
|
||||
lastTwo := parts[numParts-2:]
|
||||
shard1 := lastTwo[0]
|
||||
filename := lastTwo[1]
|
||||
|
||||
if len(filename) >= 2 && filename[:2] == shard1 {
|
||||
return "steam/" + filename
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: return as-is for any unrecognized format
|
||||
return path
|
||||
}
|
||||
|
||||
// New creates a new DiskFS.
|
||||
func New(root string, capacity int64) *DiskFS {
|
||||
if capacity <= 0 {
|
||||
@@ -151,7 +71,7 @@ func New(root string, capacity int64) *DiskFS {
|
||||
os.MkdirAll(root, 0755)
|
||||
|
||||
// Initialize sharded locks
|
||||
keyLocks := make([]sync.Map, numLockShards)
|
||||
keyLocks := make([]sync.Map, locks.NumLockShards)
|
||||
|
||||
d := &DiskFS{
|
||||
root: root,
|
||||
@@ -159,7 +79,7 @@ func New(root string, capacity int64) *DiskFS {
|
||||
capacity: capacity,
|
||||
size: 0,
|
||||
keyLocks: keyLocks,
|
||||
LRU: newLruList(),
|
||||
LRU: lru.NewLRUList[*vfs.FileInfo](),
|
||||
timeUpdater: vfs.NewBatchedTimeUpdate(100 * time.Millisecond), // Update time every 100ms
|
||||
}
|
||||
|
||||
@@ -167,56 +87,15 @@ func New(root string, capacity int64) *DiskFS {
|
||||
return d
|
||||
}
|
||||
|
||||
// init loads existing files from disk and migrates legacy depot files to sharded structure
|
||||
// init loads existing files from disk with ultra-fast lazy initialization
|
||||
func (d *DiskFS) init() {
|
||||
tstart := time.Now()
|
||||
|
||||
var depotFiles []string // Track depot files that need migration
|
||||
// Ultra-fast initialization: only scan directory structure, defer file stats
|
||||
d.scanDirectoriesOnly()
|
||||
|
||||
err := filepath.Walk(d.root, func(npath string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
d.mu.Lock()
|
||||
// Extract key from sharded path: remove root and convert sharding back
|
||||
// Handle both "./disk" and "disk" root paths
|
||||
rootPath := d.root
|
||||
rootPath = strings.TrimPrefix(rootPath, "./")
|
||||
relPath := strings.ReplaceAll(npath[len(rootPath)+1:], "\\", "/")
|
||||
|
||||
// Extract the original key from the sharded path
|
||||
k := d.extractKeyFromPath(relPath)
|
||||
|
||||
fi := vfs.NewFileInfoFromOS(info, k)
|
||||
d.info[k] = fi
|
||||
d.LRU.Add(k, fi)
|
||||
// Initialize access time with file modification time
|
||||
fi.UpdateAccessBatched(d.timeUpdater)
|
||||
d.size += info.Size()
|
||||
|
||||
// Track depot files for potential migration
|
||||
if strings.HasPrefix(relPath, "depot/") {
|
||||
depotFiles = append(depotFiles, relPath)
|
||||
}
|
||||
|
||||
d.mu.Unlock()
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
logger.Logger.Error().Err(err).Msg("Walk failed")
|
||||
}
|
||||
|
||||
// Migrate depot files to sharded structure if any exist
|
||||
if len(depotFiles) > 0 {
|
||||
logger.Logger.Info().Int("count", len(depotFiles)).Msg("Found legacy depot files, starting migration")
|
||||
d.migrateDepotFiles(depotFiles)
|
||||
}
|
||||
// Start background size calculation in a separate goroutine
|
||||
go d.calculateSizeInBackground()
|
||||
|
||||
logger.Logger.Info().
|
||||
Str("name", d.Name()).
|
||||
@@ -228,69 +107,144 @@ func (d *DiskFS) init() {
|
||||
Msg("init")
|
||||
}
|
||||
|
||||
// migrateDepotFiles moves legacy depot files to the sharded steam structure
|
||||
func (d *DiskFS) migrateDepotFiles(depotFiles []string) {
|
||||
migratedCount := 0
|
||||
errorCount := 0
|
||||
|
||||
for _, relPath := range depotFiles {
|
||||
// Extract the steam key from the depot path
|
||||
steamKey := d.extractKeyFromPath(relPath)
|
||||
if !strings.HasPrefix(steamKey, "steam/") {
|
||||
// Skip if we can't extract a proper steam key
|
||||
errorCount++
|
||||
continue
|
||||
}
|
||||
|
||||
// Get the source and destination paths
|
||||
sourcePath := filepath.Join(d.root, relPath)
|
||||
shardedPath := d.shardPath(steamKey)
|
||||
destPath := filepath.Join(d.root, shardedPath)
|
||||
|
||||
// Create destination directory
|
||||
destDir := filepath.Dir(destPath)
|
||||
if err := os.MkdirAll(destDir, 0755); err != nil {
|
||||
logger.Logger.Error().Err(err).Str("path", destDir).Msg("Failed to create migration destination directory")
|
||||
errorCount++
|
||||
continue
|
||||
}
|
||||
|
||||
// Move the file
|
||||
if err := os.Rename(sourcePath, destPath); err != nil {
|
||||
logger.Logger.Error().Err(err).Str("from", sourcePath).Str("to", destPath).Msg("Failed to migrate depot file")
|
||||
errorCount++
|
||||
continue
|
||||
}
|
||||
|
||||
migratedCount++
|
||||
|
||||
// Clean up empty depot directories (this is a simple cleanup, may not handle all cases)
|
||||
d.cleanupEmptyDepotDirs(filepath.Dir(sourcePath))
|
||||
}
|
||||
|
||||
logger.Logger.Info().
|
||||
Int("migrated", migratedCount).
|
||||
Int("errors", errorCount).
|
||||
Msg("Depot file migration completed")
|
||||
// scanDirectoriesOnly performs ultra-fast directory structure scanning without file stats
|
||||
func (d *DiskFS) scanDirectoriesOnly() {
|
||||
// Just ensure the root directory exists and is accessible
|
||||
// No file scanning during init - files will be discovered on-demand
|
||||
logger.Logger.Debug().
|
||||
Str("root", d.root).
|
||||
Msg("Directory structure scan completed (lazy file discovery enabled)")
|
||||
}
|
||||
|
||||
// cleanupEmptyDepotDirs removes empty depot directories after migration
|
||||
func (d *DiskFS) cleanupEmptyDepotDirs(dirPath string) {
|
||||
for dirPath != d.root && strings.HasPrefix(dirPath, filepath.Join(d.root, "depot")) {
|
||||
entries, err := os.ReadDir(dirPath)
|
||||
if err != nil || len(entries) > 0 {
|
||||
break
|
||||
}
|
||||
// calculateSizeInBackground calculates the total size of all files in the background
|
||||
func (d *DiskFS) calculateSizeInBackground() {
|
||||
tstart := time.Now()
|
||||
|
||||
// Directory is empty, remove it
|
||||
if err := os.Remove(dirPath); err != nil {
|
||||
logger.Logger.Error().Err(err).Str("dir", dirPath).Msg("Failed to remove empty depot directory")
|
||||
break
|
||||
}
|
||||
// Channel for collecting file information
|
||||
fileChan := make(chan fileSizeInfo, 1000)
|
||||
|
||||
// Move up to parent directory
|
||||
dirPath = filepath.Dir(dirPath)
|
||||
// Progress tracking
|
||||
var totalFiles int64
|
||||
var processedFiles int64
|
||||
progressTicker := time.NewTicker(2 * time.Second)
|
||||
defer progressTicker.Stop()
|
||||
|
||||
// Wait group for workers
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Start directory scanner
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer close(fileChan)
|
||||
d.scanFilesForSize(d.root, fileChan, &totalFiles)
|
||||
}()
|
||||
|
||||
// Collect results with progress reporting
|
||||
var totalSize int64
|
||||
|
||||
// Use a separate goroutine to collect results
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
for {
|
||||
select {
|
||||
case fi, ok := <-fileChan:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
totalSize += fi.size
|
||||
processedFiles++
|
||||
case <-progressTicker.C:
|
||||
if totalFiles > 0 {
|
||||
logger.Logger.Debug().
|
||||
Int64("processed", processedFiles).
|
||||
Int64("total", totalFiles).
|
||||
Int64("size", totalSize).
|
||||
Float64("progress", float64(processedFiles)/float64(totalFiles)*100).
|
||||
Msg("Background size calculation progress")
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for scanning to complete
|
||||
wg.Wait()
|
||||
<-done
|
||||
|
||||
// Update the total size
|
||||
d.mu.Lock()
|
||||
d.size = totalSize
|
||||
d.mu.Unlock()
|
||||
|
||||
logger.Logger.Info().
|
||||
Int64("files_scanned", processedFiles).
|
||||
Int64("total_size", totalSize).
|
||||
Str("duration", time.Since(tstart).String()).
|
||||
Msg("Background size calculation completed")
|
||||
}
|
||||
|
||||
// fileSizeInfo represents a file found during size calculation
|
||||
type fileSizeInfo struct {
|
||||
size int64
|
||||
}
|
||||
|
||||
// scanFilesForSize performs recursive file scanning for size calculation only
|
||||
func (d *DiskFS) scanFilesForSize(dirPath string, fileChan chan<- fileSizeInfo, totalFiles *int64) {
|
||||
// Use ReadDir for faster directory listing
|
||||
entries, err := os.ReadDir(dirPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Count files first for progress tracking
|
||||
fileCount := 0
|
||||
for _, entry := range entries {
|
||||
if !entry.IsDir() {
|
||||
fileCount++
|
||||
}
|
||||
}
|
||||
atomic.AddInt64(totalFiles, int64(fileCount))
|
||||
|
||||
// Process entries concurrently with limited workers
|
||||
semaphore := make(chan struct{}, 16) // More workers for size calculation
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, entry := range entries {
|
||||
entryPath := filepath.Join(dirPath, entry.Name())
|
||||
|
||||
if entry.IsDir() {
|
||||
// Recursively scan subdirectories
|
||||
wg.Add(1)
|
||||
go func(path string) {
|
||||
defer wg.Done()
|
||||
semaphore <- struct{}{} // Acquire semaphore
|
||||
defer func() { <-semaphore }() // Release semaphore
|
||||
d.scanFilesForSize(path, fileChan, totalFiles)
|
||||
}(entryPath)
|
||||
} else {
|
||||
// Process file for size only
|
||||
wg.Add(1)
|
||||
go func(entry os.DirEntry) {
|
||||
defer wg.Done()
|
||||
semaphore <- struct{}{} // Acquire semaphore
|
||||
defer func() { <-semaphore }() // Release semaphore
|
||||
|
||||
// Get file info for size calculation
|
||||
info, err := entry.Info()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Send file size info
|
||||
fileChan <- fileSizeInfo{
|
||||
size: info.Size(),
|
||||
}
|
||||
}(entry)
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Name returns the name of this VFS
|
||||
@@ -310,24 +264,9 @@ func (d *DiskFS) Capacity() int64 {
|
||||
return d.capacity
|
||||
}
|
||||
|
||||
// getShardIndex returns the shard index for a given key
|
||||
func getShardIndex(key string) int {
|
||||
// Use FNV-1a hash for good distribution
|
||||
var h uint32 = 2166136261 // FNV offset basis
|
||||
for i := 0; i < len(key); i++ {
|
||||
h ^= uint32(key[i])
|
||||
h *= 16777619 // FNV prime
|
||||
}
|
||||
return int(h % numLockShards)
|
||||
}
|
||||
|
||||
// getKeyLock returns a lock for the given key using sharding
|
||||
func (d *DiskFS) getKeyLock(key string) *sync.RWMutex {
|
||||
shardIndex := getShardIndex(key)
|
||||
shard := &d.keyLocks[shardIndex]
|
||||
|
||||
keyLock, _ := shard.LoadOrStore(key, &sync.RWMutex{})
|
||||
return keyLock.(*sync.RWMutex)
|
||||
return locks.GetKeyLock(d.keyLocks, key)
|
||||
}
|
||||
|
||||
// Create creates a new file
|
||||
@@ -379,6 +318,7 @@ func (d *DiskFS) Create(key string, size int64) (io.WriteCloser, error) {
|
||||
d.LRU.Add(key, fi)
|
||||
// Initialize access time with current time
|
||||
fi.UpdateAccessBatched(d.timeUpdater)
|
||||
// Add to size for new files (not discovered files)
|
||||
d.size += size
|
||||
d.mu.Unlock()
|
||||
|
||||
@@ -424,7 +364,7 @@ func (dwc *diskWriteCloser) Close() error {
|
||||
return dwc.file.Close()
|
||||
}
|
||||
|
||||
// Open opens a file for reading
|
||||
// Open opens a file for reading with lazy discovery
|
||||
func (d *DiskFS) Open(key string) (io.ReadCloser, error) {
|
||||
if key == "" {
|
||||
return nil, vfserror.ErrInvalidKey
|
||||
@@ -440,16 +380,22 @@ func (d *DiskFS) Open(key string) (io.ReadCloser, error) {
|
||||
return nil, vfserror.ErrInvalidKey
|
||||
}
|
||||
|
||||
keyMu := d.getKeyLock(key)
|
||||
keyMu.RLock()
|
||||
defer keyMu.RUnlock()
|
||||
|
||||
d.mu.Lock()
|
||||
// First, try to get the file info
|
||||
d.mu.RLock()
|
||||
fi, exists := d.info[key]
|
||||
d.mu.RUnlock()
|
||||
|
||||
if !exists {
|
||||
d.mu.Unlock()
|
||||
return nil, vfserror.ErrNotFound
|
||||
// Try lazy discovery
|
||||
var err error
|
||||
fi, err = d.Stat(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Update access time and LRU
|
||||
d.mu.Lock()
|
||||
fi.UpdateAccessBatched(d.timeUpdater)
|
||||
d.LRU.MoveToFront(key, d.timeUpdater)
|
||||
d.mu.Unlock()
|
||||
@@ -550,7 +496,7 @@ func (d *DiskFS) Delete(key string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stat returns file information
|
||||
// Stat returns file information with lazy discovery
|
||||
func (d *DiskFS) Stat(key string) (*vfs.FileInfo, error) {
|
||||
if key == "" {
|
||||
return nil, vfserror.ErrInvalidKey
|
||||
@@ -560,30 +506,49 @@ func (d *DiskFS) Stat(key string) (*vfs.FileInfo, error) {
|
||||
}
|
||||
|
||||
keyMu := d.getKeyLock(key)
|
||||
|
||||
// First, try to get the file info with read lock
|
||||
keyMu.RLock()
|
||||
defer keyMu.RUnlock()
|
||||
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
|
||||
if fi, ok := d.info[key]; ok {
|
||||
d.mu.RUnlock()
|
||||
keyMu.RUnlock()
|
||||
return fi, nil
|
||||
}
|
||||
d.mu.RUnlock()
|
||||
keyMu.RUnlock()
|
||||
|
||||
// Check if file exists on disk but wasn't indexed (for migration)
|
||||
// Lazy discovery: check if file exists on disk and index it
|
||||
shardedPath := d.shardPath(key)
|
||||
path := filepath.Join(d.root, shardedPath)
|
||||
path = strings.ReplaceAll(path, "\\", "/")
|
||||
|
||||
if info, err := os.Stat(path); err == nil {
|
||||
// File exists in sharded location but not indexed, re-index it
|
||||
fi := vfs.NewFileInfoFromOS(info, key)
|
||||
// We can't modify the map here because we're in a read lock
|
||||
// This is a simplified version - in production you'd need to handle this properly
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
return nil, vfserror.ErrNotFound
|
||||
}
|
||||
|
||||
// File exists, add it to the index with write lock
|
||||
keyMu.Lock()
|
||||
defer keyMu.Unlock()
|
||||
|
||||
// Double-check after acquiring write lock
|
||||
d.mu.Lock()
|
||||
if fi, ok := d.info[key]; ok {
|
||||
d.mu.Unlock()
|
||||
return fi, nil
|
||||
}
|
||||
|
||||
return nil, vfserror.ErrNotFound
|
||||
// Create and add file info
|
||||
fi := vfs.NewFileInfoFromOS(info, key)
|
||||
d.info[key] = fi
|
||||
d.LRU.Add(key, fi)
|
||||
fi.UpdateAccessBatched(d.timeUpdater)
|
||||
// Note: Don't add to d.size here as it's being calculated in background
|
||||
// The background calculation will handle the total size
|
||||
d.mu.Unlock()
|
||||
|
||||
return fi, nil
|
||||
}
|
||||
|
||||
// EvictLRU evicts the least recently used files to free up space
|
||||
@@ -596,7 +561,7 @@ func (d *DiskFS) EvictLRU(bytesNeeded uint) uint {
|
||||
// Evict from LRU list until we free enough space
|
||||
for d.size > d.capacity-int64(bytesNeeded) && d.LRU.Len() > 0 {
|
||||
// Get the least recently used item
|
||||
elem := d.LRU.list.Back()
|
||||
elem := d.LRU.Back()
|
||||
if elem == nil {
|
||||
break
|
||||
}
|
||||
@@ -625,7 +590,7 @@ func (d *DiskFS) EvictLRU(bytesNeeded uint) uint {
|
||||
evicted += uint(fi.Size)
|
||||
|
||||
// Clean up key lock
|
||||
shardIndex := getShardIndex(key)
|
||||
shardIndex := locks.GetShardIndex(key)
|
||||
d.keyLocks[shardIndex].Delete(key)
|
||||
}
|
||||
|
||||
@@ -681,7 +646,7 @@ func (d *DiskFS) EvictBySize(bytesNeeded uint, ascending bool) uint {
|
||||
evicted += uint(fi.Size)
|
||||
|
||||
// Clean up key lock
|
||||
shardIndex := getShardIndex(key)
|
||||
shardIndex := locks.GetShardIndex(key)
|
||||
d.keyLocks[shardIndex].Delete(key)
|
||||
}
|
||||
|
||||
@@ -734,7 +699,7 @@ func (d *DiskFS) EvictFIFO(bytesNeeded uint) uint {
|
||||
evicted += uint(fi.Size)
|
||||
|
||||
// Clean up key lock
|
||||
shardIndex := getShardIndex(key)
|
||||
shardIndex := locks.GetShardIndex(key)
|
||||
d.keyLocks[shardIndex].Delete(key)
|
||||
}
|
||||
|
||||
|
||||
110
vfs/eviction/eviction.go
Normal file
110
vfs/eviction/eviction.go
Normal file
@@ -0,0 +1,110 @@
|
||||
package eviction
|
||||
|
||||
import (
|
||||
"s1d3sw1ped/steamcache2/vfs"
|
||||
"s1d3sw1ped/steamcache2/vfs/disk"
|
||||
"s1d3sw1ped/steamcache2/vfs/memory"
|
||||
)
|
||||
|
||||
// EvictionStrategy defines different eviction strategies
|
||||
type EvictionStrategy string
|
||||
|
||||
const (
|
||||
StrategyLRU EvictionStrategy = "lru"
|
||||
StrategyLFU EvictionStrategy = "lfu"
|
||||
StrategyFIFO EvictionStrategy = "fifo"
|
||||
StrategyLargest EvictionStrategy = "largest"
|
||||
StrategySmallest EvictionStrategy = "smallest"
|
||||
StrategyHybrid EvictionStrategy = "hybrid"
|
||||
)
|
||||
|
||||
// EvictLRU performs LRU eviction by removing least recently used files
|
||||
func EvictLRU(v vfs.VFS, bytesNeeded uint) uint {
|
||||
switch fs := v.(type) {
|
||||
case *memory.MemoryFS:
|
||||
return fs.EvictLRU(bytesNeeded)
|
||||
case *disk.DiskFS:
|
||||
return fs.EvictLRU(bytesNeeded)
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// EvictFIFO performs FIFO (First In First Out) eviction
|
||||
func EvictFIFO(v vfs.VFS, bytesNeeded uint) uint {
|
||||
switch fs := v.(type) {
|
||||
case *memory.MemoryFS:
|
||||
return fs.EvictFIFO(bytesNeeded)
|
||||
case *disk.DiskFS:
|
||||
return fs.EvictFIFO(bytesNeeded)
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// EvictBySizeAsc evicts smallest files first
|
||||
func EvictBySizeAsc(v vfs.VFS, bytesNeeded uint) uint {
|
||||
switch fs := v.(type) {
|
||||
case *memory.MemoryFS:
|
||||
return fs.EvictBySize(bytesNeeded, true) // true = ascending (smallest first)
|
||||
case *disk.DiskFS:
|
||||
return fs.EvictBySize(bytesNeeded, true) // true = ascending (smallest first)
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// EvictBySizeDesc evicts largest files first
|
||||
func EvictBySizeDesc(v vfs.VFS, bytesNeeded uint) uint {
|
||||
switch fs := v.(type) {
|
||||
case *memory.MemoryFS:
|
||||
return fs.EvictBySize(bytesNeeded, false) // false = descending (largest first)
|
||||
case *disk.DiskFS:
|
||||
return fs.EvictBySize(bytesNeeded, false) // false = descending (largest first)
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// EvictLargest evicts largest files first
|
||||
func EvictLargest(v vfs.VFS, bytesNeeded uint) uint {
|
||||
return EvictBySizeDesc(v, bytesNeeded)
|
||||
}
|
||||
|
||||
// EvictSmallest evicts smallest files first
|
||||
func EvictSmallest(v vfs.VFS, bytesNeeded uint) uint {
|
||||
return EvictBySizeAsc(v, bytesNeeded)
|
||||
}
|
||||
|
||||
// EvictLFU performs LFU (Least Frequently Used) eviction
|
||||
func EvictLFU(v vfs.VFS, bytesNeeded uint) uint {
|
||||
// For now, fall back to size-based eviction
|
||||
// TODO: Implement proper LFU tracking
|
||||
return EvictBySizeAsc(v, bytesNeeded)
|
||||
}
|
||||
|
||||
// EvictHybrid implements a hybrid eviction strategy
|
||||
func EvictHybrid(v vfs.VFS, bytesNeeded uint) uint {
|
||||
// Use LRU as primary strategy, but consider size as tiebreaker
|
||||
return EvictLRU(v, bytesNeeded)
|
||||
}
|
||||
|
||||
// GetEvictionFunction returns the eviction function for the given strategy
|
||||
func GetEvictionFunction(strategy EvictionStrategy) func(vfs.VFS, uint) uint {
|
||||
switch strategy {
|
||||
case StrategyLRU:
|
||||
return EvictLRU
|
||||
case StrategyLFU:
|
||||
return EvictLFU
|
||||
case StrategyFIFO:
|
||||
return EvictFIFO
|
||||
case StrategyLargest:
|
||||
return EvictLargest
|
||||
case StrategySmallest:
|
||||
return EvictSmallest
|
||||
case StrategyHybrid:
|
||||
return EvictHybrid
|
||||
default:
|
||||
return EvictLRU
|
||||
}
|
||||
}
|
||||
151
vfs/gc/gc.go
151
vfs/gc/gc.go
@@ -5,8 +5,7 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
"s1d3sw1ped/steamcache2/vfs"
|
||||
"s1d3sw1ped/steamcache2/vfs/disk"
|
||||
"s1d3sw1ped/steamcache2/vfs/memory"
|
||||
"s1d3sw1ped/steamcache2/vfs/eviction"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -38,45 +37,14 @@ func New(wrappedVFS vfs.VFS, algorithm GCAlgorithm) *GCFS {
|
||||
algorithm: algorithm,
|
||||
}
|
||||
|
||||
switch algorithm {
|
||||
case LRU:
|
||||
gcfs.gcFunc = gcLRU
|
||||
case LFU:
|
||||
gcfs.gcFunc = gcLFU
|
||||
case FIFO:
|
||||
gcfs.gcFunc = gcFIFO
|
||||
case Largest:
|
||||
gcfs.gcFunc = gcLargest
|
||||
case Smallest:
|
||||
gcfs.gcFunc = gcSmallest
|
||||
case Hybrid:
|
||||
gcfs.gcFunc = gcHybrid
|
||||
default:
|
||||
// Default to LRU
|
||||
gcfs.gcFunc = gcLRU
|
||||
}
|
||||
gcfs.gcFunc = eviction.GetEvictionFunction(eviction.EvictionStrategy(algorithm))
|
||||
|
||||
return gcfs
|
||||
}
|
||||
|
||||
// GetGCAlgorithm returns the GC function for the given algorithm
|
||||
func GetGCAlgorithm(algorithm GCAlgorithm) func(vfs.VFS, uint) uint {
|
||||
switch algorithm {
|
||||
case LRU:
|
||||
return gcLRU
|
||||
case LFU:
|
||||
return gcLFU
|
||||
case FIFO:
|
||||
return gcFIFO
|
||||
case Largest:
|
||||
return gcLargest
|
||||
case Smallest:
|
||||
return gcSmallest
|
||||
case Hybrid:
|
||||
return gcHybrid
|
||||
default:
|
||||
return gcLRU
|
||||
}
|
||||
return eviction.GetEvictionFunction(eviction.EvictionStrategy(algorithm))
|
||||
}
|
||||
|
||||
// Create wraps the underlying Create method
|
||||
@@ -125,119 +93,6 @@ type EvictionStrategy interface {
|
||||
Evict(vfs vfs.VFS, bytesNeeded uint) uint
|
||||
}
|
||||
|
||||
// GC functions
|
||||
|
||||
// gcLRU implements Least Recently Used eviction
|
||||
func gcLRU(v vfs.VFS, bytesNeeded uint) uint {
|
||||
return evictLRU(v, bytesNeeded)
|
||||
}
|
||||
|
||||
// gcLFU implements Least Frequently Used eviction
|
||||
func gcLFU(v vfs.VFS, bytesNeeded uint) uint {
|
||||
return evictLFU(v, bytesNeeded)
|
||||
}
|
||||
|
||||
// gcFIFO implements First In First Out eviction
|
||||
func gcFIFO(v vfs.VFS, bytesNeeded uint) uint {
|
||||
return evictFIFO(v, bytesNeeded)
|
||||
}
|
||||
|
||||
// gcLargest implements largest file first eviction
|
||||
func gcLargest(v vfs.VFS, bytesNeeded uint) uint {
|
||||
return evictLargest(v, bytesNeeded)
|
||||
}
|
||||
|
||||
// gcSmallest implements smallest file first eviction
|
||||
func gcSmallest(v vfs.VFS, bytesNeeded uint) uint {
|
||||
return evictSmallest(v, bytesNeeded)
|
||||
}
|
||||
|
||||
// gcHybrid implements a hybrid eviction strategy
|
||||
func gcHybrid(v vfs.VFS, bytesNeeded uint) uint {
|
||||
return evictHybrid(v, bytesNeeded)
|
||||
}
|
||||
|
||||
// evictLRU performs LRU eviction by removing least recently used files
|
||||
func evictLRU(v vfs.VFS, bytesNeeded uint) uint {
|
||||
// Try to use specific eviction methods if available
|
||||
switch fs := v.(type) {
|
||||
case *memory.MemoryFS:
|
||||
return fs.EvictLRU(bytesNeeded)
|
||||
case *disk.DiskFS:
|
||||
return fs.EvictLRU(bytesNeeded)
|
||||
default:
|
||||
// No fallback - return 0 (no eviction performed)
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// evictLFU performs LFU (Least Frequently Used) eviction
|
||||
func evictLFU(v vfs.VFS, bytesNeeded uint) uint {
|
||||
// For now, fall back to size-based eviction
|
||||
// TODO: Implement proper LFU tracking
|
||||
return evictBySize(v, bytesNeeded)
|
||||
}
|
||||
|
||||
// evictFIFO performs FIFO (First In First Out) eviction
|
||||
func evictFIFO(v vfs.VFS, bytesNeeded uint) uint {
|
||||
switch fs := v.(type) {
|
||||
case *memory.MemoryFS:
|
||||
return fs.EvictFIFO(bytesNeeded)
|
||||
case *disk.DiskFS:
|
||||
return fs.EvictFIFO(bytesNeeded)
|
||||
default:
|
||||
// No fallback - return 0 (no eviction performed)
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// evictLargest evicts largest files first
|
||||
func evictLargest(v vfs.VFS, bytesNeeded uint) uint {
|
||||
return evictBySizeDesc(v, bytesNeeded)
|
||||
}
|
||||
|
||||
// evictSmallest evicts smallest files first
|
||||
func evictSmallest(v vfs.VFS, bytesNeeded uint) uint {
|
||||
return evictBySizeAsc(v, bytesNeeded)
|
||||
}
|
||||
|
||||
// evictBySize evicts files based on size (smallest first)
|
||||
func evictBySize(v vfs.VFS, bytesNeeded uint) uint {
|
||||
return evictBySizeAsc(v, bytesNeeded)
|
||||
}
|
||||
|
||||
// evictBySizeAsc evicts smallest files first
|
||||
func evictBySizeAsc(v vfs.VFS, bytesNeeded uint) uint {
|
||||
switch fs := v.(type) {
|
||||
case *memory.MemoryFS:
|
||||
return fs.EvictBySize(bytesNeeded, true) // true = ascending (smallest first)
|
||||
case *disk.DiskFS:
|
||||
return fs.EvictBySize(bytesNeeded, true) // true = ascending (smallest first)
|
||||
default:
|
||||
// No fallback - return 0 (no eviction performed)
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// evictBySizeDesc evicts largest files first
|
||||
func evictBySizeDesc(v vfs.VFS, bytesNeeded uint) uint {
|
||||
switch fs := v.(type) {
|
||||
case *memory.MemoryFS:
|
||||
return fs.EvictBySize(bytesNeeded, false) // false = descending (largest first)
|
||||
case *disk.DiskFS:
|
||||
return fs.EvictBySize(bytesNeeded, false) // false = descending (largest first)
|
||||
default:
|
||||
// No fallback - return 0 (no eviction performed)
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// evictHybrid implements a hybrid eviction strategy
|
||||
func evictHybrid(v vfs.VFS, bytesNeeded uint) uint {
|
||||
// Use LRU as primary strategy, but consider size as tiebreaker
|
||||
return evictLRU(v, bytesNeeded)
|
||||
}
|
||||
|
||||
// AdaptivePromotionDeciderFunc is a placeholder for the adaptive promotion logic
|
||||
var AdaptivePromotionDeciderFunc = func() interface{} {
|
||||
return nil
|
||||
|
||||
28
vfs/locks/sharding.go
Normal file
28
vfs/locks/sharding.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package locks
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Number of lock shards for reducing contention
|
||||
const NumLockShards = 32
|
||||
|
||||
// GetShardIndex returns the shard index for a given key using FNV-1a hash
|
||||
func GetShardIndex(key string) int {
|
||||
// Use FNV-1a hash for good distribution
|
||||
var h uint32 = 2166136261 // FNV offset basis
|
||||
for i := 0; i < len(key); i++ {
|
||||
h ^= uint32(key[i])
|
||||
h *= 16777619 // FNV prime
|
||||
}
|
||||
return int(h % NumLockShards)
|
||||
}
|
||||
|
||||
// GetKeyLock returns a lock for the given key using sharding
|
||||
func GetKeyLock(keyLocks []sync.Map, key string) *sync.RWMutex {
|
||||
shardIndex := GetShardIndex(key)
|
||||
shard := &keyLocks[shardIndex]
|
||||
|
||||
keyLock, _ := shard.LoadOrStore(key, &sync.RWMutex{})
|
||||
return keyLock.(*sync.RWMutex)
|
||||
}
|
||||
66
vfs/lru/lru.go
Normal file
66
vfs/lru/lru.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package lru
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"s1d3sw1ped/steamcache2/vfs/types"
|
||||
)
|
||||
|
||||
// LRUList represents a least recently used list for cache eviction
|
||||
type LRUList[T any] struct {
|
||||
list *list.List
|
||||
elem map[string]*list.Element
|
||||
}
|
||||
|
||||
// NewLRUList creates a new LRU list
|
||||
func NewLRUList[T any]() *LRUList[T] {
|
||||
return &LRUList[T]{
|
||||
list: list.New(),
|
||||
elem: make(map[string]*list.Element),
|
||||
}
|
||||
}
|
||||
|
||||
// Add adds an item to the front of the LRU list
|
||||
func (l *LRUList[T]) Add(key string, item T) {
|
||||
elem := l.list.PushFront(item)
|
||||
l.elem[key] = elem
|
||||
}
|
||||
|
||||
// MoveToFront moves an item to the front of the LRU list
|
||||
func (l *LRUList[T]) MoveToFront(key string, timeUpdater *types.BatchedTimeUpdate) {
|
||||
if elem, exists := l.elem[key]; exists {
|
||||
l.list.MoveToFront(elem)
|
||||
// Update the FileInfo in the element with new access time
|
||||
if fi, ok := any(elem.Value).(interface {
|
||||
UpdateAccessBatched(*types.BatchedTimeUpdate)
|
||||
}); ok {
|
||||
fi.UpdateAccessBatched(timeUpdater)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove removes an item from the LRU list
|
||||
func (l *LRUList[T]) Remove(key string) (T, bool) {
|
||||
if elem, exists := l.elem[key]; exists {
|
||||
delete(l.elem, key)
|
||||
if item, ok := l.list.Remove(elem).(T); ok {
|
||||
return item, true
|
||||
}
|
||||
}
|
||||
var zero T
|
||||
return zero, false
|
||||
}
|
||||
|
||||
// Len returns the number of items in the LRU list
|
||||
func (l *LRUList[T]) Len() int {
|
||||
return l.list.Len()
|
||||
}
|
||||
|
||||
// Back returns the least recently used item (at the back of the list)
|
||||
func (l *LRUList[T]) Back() *list.Element {
|
||||
return l.list.Back()
|
||||
}
|
||||
|
||||
// Front returns the most recently used item (at the front of the list)
|
||||
func (l *LRUList[T]) Front() *list.Element {
|
||||
return l.list.Front()
|
||||
}
|
||||
@@ -1,130 +0,0 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"s1d3sw1ped/steamcache2/vfs"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// DynamicCacheManager manages cache size adjustments based on system memory usage
|
||||
type DynamicCacheManager struct {
|
||||
originalCacheSize uint64
|
||||
currentCacheSize uint64
|
||||
memoryMonitor *MemoryMonitor
|
||||
cache vfs.VFS
|
||||
adjustmentInterval time.Duration
|
||||
lastAdjustment time.Time
|
||||
mu sync.RWMutex
|
||||
adjustmentCount int64
|
||||
isAdjusting int32
|
||||
}
|
||||
|
||||
// NewDynamicCacheManager creates a new dynamic cache manager
|
||||
func NewDynamicCacheManager(cache vfs.VFS, originalSize uint64, memoryMonitor *MemoryMonitor) *DynamicCacheManager {
|
||||
return &DynamicCacheManager{
|
||||
originalCacheSize: originalSize,
|
||||
currentCacheSize: originalSize,
|
||||
memoryMonitor: memoryMonitor,
|
||||
cache: cache,
|
||||
adjustmentInterval: 30 * time.Second, // Adjust every 30 seconds
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the dynamic cache size adjustment process
|
||||
func (dcm *DynamicCacheManager) Start() {
|
||||
go dcm.adjustmentLoop()
|
||||
}
|
||||
|
||||
// GetCurrentCacheSize returns the current cache size
|
||||
func (dcm *DynamicCacheManager) GetCurrentCacheSize() uint64 {
|
||||
dcm.mu.RLock()
|
||||
defer dcm.mu.RUnlock()
|
||||
return atomic.LoadUint64(&dcm.currentCacheSize)
|
||||
}
|
||||
|
||||
// GetOriginalCacheSize returns the original cache size
|
||||
func (dcm *DynamicCacheManager) GetOriginalCacheSize() uint64 {
|
||||
dcm.mu.RLock()
|
||||
defer dcm.mu.RUnlock()
|
||||
return dcm.originalCacheSize
|
||||
}
|
||||
|
||||
// GetAdjustmentCount returns the number of adjustments made
|
||||
func (dcm *DynamicCacheManager) GetAdjustmentCount() int64 {
|
||||
return atomic.LoadInt64(&dcm.adjustmentCount)
|
||||
}
|
||||
|
||||
// adjustmentLoop runs the cache size adjustment loop
|
||||
func (dcm *DynamicCacheManager) adjustmentLoop() {
|
||||
ticker := time.NewTicker(dcm.adjustmentInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
dcm.performAdjustment()
|
||||
}
|
||||
}
|
||||
|
||||
// performAdjustment performs a cache size adjustment if needed
|
||||
func (dcm *DynamicCacheManager) performAdjustment() {
|
||||
// Prevent concurrent adjustments
|
||||
if !atomic.CompareAndSwapInt32(&dcm.isAdjusting, 0, 1) {
|
||||
return
|
||||
}
|
||||
defer atomic.StoreInt32(&dcm.isAdjusting, 0)
|
||||
|
||||
// Check if enough time has passed since last adjustment
|
||||
if time.Since(dcm.lastAdjustment) < dcm.adjustmentInterval {
|
||||
return
|
||||
}
|
||||
|
||||
// Get recommended cache size
|
||||
recommendedSize := dcm.memoryMonitor.GetRecommendedCacheSize(dcm.originalCacheSize)
|
||||
currentSize := atomic.LoadUint64(&dcm.currentCacheSize)
|
||||
|
||||
// Only adjust if there's a significant difference (more than 5%)
|
||||
sizeDiff := float64(recommendedSize) / float64(currentSize)
|
||||
if sizeDiff < 0.95 || sizeDiff > 1.05 {
|
||||
dcm.adjustCacheSize(recommendedSize)
|
||||
dcm.lastAdjustment = time.Now()
|
||||
atomic.AddInt64(&dcm.adjustmentCount, 1)
|
||||
}
|
||||
}
|
||||
|
||||
// adjustCacheSize adjusts the cache size to the recommended size
|
||||
func (dcm *DynamicCacheManager) adjustCacheSize(newSize uint64) {
|
||||
dcm.mu.Lock()
|
||||
defer dcm.mu.Unlock()
|
||||
|
||||
oldSize := atomic.LoadUint64(&dcm.currentCacheSize)
|
||||
atomic.StoreUint64(&dcm.currentCacheSize, newSize)
|
||||
|
||||
// If we're reducing the cache size, trigger GC to free up memory
|
||||
if newSize < oldSize {
|
||||
// Calculate how much to free
|
||||
bytesToFree := oldSize - newSize
|
||||
|
||||
// Trigger GC on the cache to free up the excess memory
|
||||
// This is a simplified approach - in practice, you'd want to integrate
|
||||
// with the actual GC system to free the right amount
|
||||
if gcCache, ok := dcm.cache.(interface{ ForceGC(uint) }); ok {
|
||||
gcCache.ForceGC(uint(bytesToFree))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetStats returns statistics about the dynamic cache manager
|
||||
func (dcm *DynamicCacheManager) GetStats() map[string]interface{} {
|
||||
dcm.mu.RLock()
|
||||
defer dcm.mu.RUnlock()
|
||||
|
||||
return map[string]interface{}{
|
||||
"original_cache_size": dcm.originalCacheSize,
|
||||
"current_cache_size": atomic.LoadUint64(&dcm.currentCacheSize),
|
||||
"adjustment_count": atomic.LoadInt64(&dcm.adjustmentCount),
|
||||
"last_adjustment": dcm.lastAdjustment,
|
||||
"memory_utilization": dcm.memoryMonitor.GetMemoryUtilization(),
|
||||
"target_memory_usage": dcm.memoryMonitor.GetTargetMemoryUsage(),
|
||||
"current_memory_usage": dcm.memoryMonitor.GetCurrentMemoryUsage(),
|
||||
}
|
||||
}
|
||||
@@ -3,8 +3,10 @@ package memory
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"container/list"
|
||||
"io"
|
||||
"s1d3sw1ped/steamcache2/vfs"
|
||||
"s1d3sw1ped/steamcache2/vfs/locks"
|
||||
"s1d3sw1ped/steamcache2/vfs/lru"
|
||||
"s1d3sw1ped/steamcache2/vfs/types"
|
||||
"s1d3sw1ped/steamcache2/vfs/vfserror"
|
||||
"sort"
|
||||
@@ -13,32 +15,8 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// VFS defines the interface for virtual file systems
|
||||
type VFS interface {
|
||||
// Create creates a new file at the given key
|
||||
Create(key string, size int64) (io.WriteCloser, error)
|
||||
|
||||
// Open opens the file at the given key for reading
|
||||
Open(key string) (io.ReadCloser, error)
|
||||
|
||||
// Delete removes the file at the given key
|
||||
Delete(key string) error
|
||||
|
||||
// Stat returns information about the file at the given key
|
||||
Stat(key string) (*types.FileInfo, error)
|
||||
|
||||
// Name returns the name of this VFS
|
||||
Name() string
|
||||
|
||||
// Size returns the current size of the VFS
|
||||
Size() int64
|
||||
|
||||
// Capacity returns the maximum capacity of the VFS
|
||||
Capacity() int64
|
||||
}
|
||||
|
||||
// Ensure MemoryFS implements VFS.
|
||||
var _ VFS = (*MemoryFS)(nil)
|
||||
var _ vfs.VFS = (*MemoryFS)(nil)
|
||||
|
||||
// MemoryFS is an in-memory virtual file system
|
||||
type MemoryFS struct {
|
||||
@@ -48,55 +26,10 @@ type MemoryFS struct {
|
||||
size int64
|
||||
mu sync.RWMutex
|
||||
keyLocks []sync.Map // Sharded lock pools for better concurrency
|
||||
LRU *lruList
|
||||
LRU *lru.LRUList[*types.FileInfo]
|
||||
timeUpdater *types.BatchedTimeUpdate // Batched time updates for better performance
|
||||
}
|
||||
|
||||
// Number of lock shards for reducing contention
|
||||
const numLockShards = 32
|
||||
|
||||
// lruList for time-decayed LRU eviction
|
||||
type lruList struct {
|
||||
list *list.List
|
||||
elem map[string]*list.Element
|
||||
}
|
||||
|
||||
func newLruList() *lruList {
|
||||
return &lruList{
|
||||
list: list.New(),
|
||||
elem: make(map[string]*list.Element),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *lruList) Add(key string, fi *types.FileInfo) {
|
||||
elem := l.list.PushFront(fi)
|
||||
l.elem[key] = elem
|
||||
}
|
||||
|
||||
func (l *lruList) MoveToFront(key string, timeUpdater *types.BatchedTimeUpdate) {
|
||||
if elem, exists := l.elem[key]; exists {
|
||||
l.list.MoveToFront(elem)
|
||||
// Update the FileInfo in the element with new access time
|
||||
if fi := elem.Value.(*types.FileInfo); fi != nil {
|
||||
fi.UpdateAccessBatched(timeUpdater)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (l *lruList) Remove(key string) *types.FileInfo {
|
||||
if elem, exists := l.elem[key]; exists {
|
||||
delete(l.elem, key)
|
||||
if fi := l.list.Remove(elem).(*types.FileInfo); fi != nil {
|
||||
return fi
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *lruList) Len() int {
|
||||
return l.list.Len()
|
||||
}
|
||||
|
||||
// New creates a new MemoryFS
|
||||
func New(capacity int64) *MemoryFS {
|
||||
if capacity <= 0 {
|
||||
@@ -104,7 +37,7 @@ func New(capacity int64) *MemoryFS {
|
||||
}
|
||||
|
||||
// Initialize sharded locks
|
||||
keyLocks := make([]sync.Map, numLockShards)
|
||||
keyLocks := make([]sync.Map, locks.NumLockShards)
|
||||
|
||||
return &MemoryFS{
|
||||
data: make(map[string]*bytes.Buffer),
|
||||
@@ -112,7 +45,7 @@ func New(capacity int64) *MemoryFS {
|
||||
capacity: capacity,
|
||||
size: 0,
|
||||
keyLocks: keyLocks,
|
||||
LRU: newLruList(),
|
||||
LRU: lru.NewLRUList[*types.FileInfo](),
|
||||
timeUpdater: types.NewBatchedTimeUpdate(100 * time.Millisecond), // Update time every 100ms
|
||||
}
|
||||
}
|
||||
@@ -163,24 +96,9 @@ func (m *MemoryFS) GetFragmentationStats() map[string]interface{} {
|
||||
}
|
||||
}
|
||||
|
||||
// getShardIndex returns the shard index for a given key
|
||||
func getShardIndex(key string) int {
|
||||
// Use FNV-1a hash for good distribution
|
||||
var h uint32 = 2166136261 // FNV offset basis
|
||||
for i := 0; i < len(key); i++ {
|
||||
h ^= uint32(key[i])
|
||||
h *= 16777619 // FNV prime
|
||||
}
|
||||
return int(h % numLockShards)
|
||||
}
|
||||
|
||||
// getKeyLock returns a lock for the given key using sharding
|
||||
func (m *MemoryFS) getKeyLock(key string) *sync.RWMutex {
|
||||
shardIndex := getShardIndex(key)
|
||||
shard := &m.keyLocks[shardIndex]
|
||||
|
||||
keyLock, _ := shard.LoadOrStore(key, &sync.RWMutex{})
|
||||
return keyLock.(*sync.RWMutex)
|
||||
return locks.GetKeyLock(m.keyLocks, key)
|
||||
}
|
||||
|
||||
// Create creates a new file
|
||||
@@ -391,7 +309,7 @@ func (m *MemoryFS) EvictLRU(bytesNeeded uint) uint {
|
||||
// Evict from LRU list until we free enough space
|
||||
for m.size > m.capacity-int64(bytesNeeded) && m.LRU.Len() > 0 {
|
||||
// Get the least recently used item
|
||||
elem := m.LRU.list.Back()
|
||||
elem := m.LRU.Back()
|
||||
if elem == nil {
|
||||
break
|
||||
}
|
||||
@@ -411,7 +329,7 @@ func (m *MemoryFS) EvictLRU(bytesNeeded uint) uint {
|
||||
evicted += uint(fi.Size)
|
||||
|
||||
// Clean up key lock
|
||||
shardIndex := getShardIndex(key)
|
||||
shardIndex := locks.GetShardIndex(key)
|
||||
m.keyLocks[shardIndex].Delete(key)
|
||||
}
|
||||
|
||||
@@ -459,7 +377,7 @@ func (m *MemoryFS) EvictBySize(bytesNeeded uint, ascending bool) uint {
|
||||
evicted += uint(fi.Size)
|
||||
|
||||
// Clean up key lock
|
||||
shardIndex := getShardIndex(key)
|
||||
shardIndex := locks.GetShardIndex(key)
|
||||
m.keyLocks[shardIndex].Delete(key)
|
||||
}
|
||||
|
||||
@@ -504,7 +422,7 @@ func (m *MemoryFS) EvictFIFO(bytesNeeded uint) uint {
|
||||
evicted += uint(fi.Size)
|
||||
|
||||
// Clean up key lock
|
||||
shardIndex := getShardIndex(key)
|
||||
shardIndex := locks.GetShardIndex(key)
|
||||
m.keyLocks[shardIndex].Delete(key)
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,15 @@ type MemoryMonitor struct {
|
||||
ctx chan struct{}
|
||||
stopChan chan struct{}
|
||||
isMonitoring int32
|
||||
|
||||
// Dynamic cache management fields
|
||||
originalCacheSize uint64
|
||||
currentCacheSize uint64
|
||||
cache interface{} // Generic cache interface
|
||||
adjustmentInterval time.Duration
|
||||
lastAdjustment time.Time
|
||||
adjustmentCount int64
|
||||
isAdjusting int32
|
||||
}
|
||||
|
||||
// NewMemoryMonitor creates a new memory monitor
|
||||
@@ -27,9 +36,19 @@ func NewMemoryMonitor(targetMemoryUsage uint64, monitoringInterval time.Duration
|
||||
adjustmentThreshold: adjustmentThreshold,
|
||||
ctx: make(chan struct{}),
|
||||
stopChan: make(chan struct{}),
|
||||
adjustmentInterval: 30 * time.Second, // Default adjustment interval
|
||||
}
|
||||
}
|
||||
|
||||
// NewMemoryMonitorWithCache creates a new memory monitor with cache management
|
||||
func NewMemoryMonitorWithCache(targetMemoryUsage uint64, monitoringInterval time.Duration, adjustmentThreshold float64, cache interface{}, originalCacheSize uint64) *MemoryMonitor {
|
||||
mm := NewMemoryMonitor(targetMemoryUsage, monitoringInterval, adjustmentThreshold)
|
||||
mm.cache = cache
|
||||
mm.originalCacheSize = originalCacheSize
|
||||
mm.currentCacheSize = originalCacheSize
|
||||
return mm
|
||||
}
|
||||
|
||||
// Start begins monitoring memory usage
|
||||
func (mm *MemoryMonitor) Start() {
|
||||
if atomic.CompareAndSwapInt32(&mm.isMonitoring, 0, 1) {
|
||||
@@ -151,3 +170,105 @@ func (mm *MemoryMonitor) GetMemoryStats() map[string]interface{} {
|
||||
"gc_pause_total": m.PauseTotalNs,
|
||||
}
|
||||
}
|
||||
|
||||
// Dynamic Cache Management Methods
|
||||
|
||||
// StartDynamicAdjustment begins the dynamic cache size adjustment process
|
||||
func (mm *MemoryMonitor) StartDynamicAdjustment() {
|
||||
if mm.cache != nil {
|
||||
go mm.adjustmentLoop()
|
||||
}
|
||||
}
|
||||
|
||||
// GetCurrentCacheSize returns the current cache size
|
||||
func (mm *MemoryMonitor) GetCurrentCacheSize() uint64 {
|
||||
mm.mu.RLock()
|
||||
defer mm.mu.RUnlock()
|
||||
return atomic.LoadUint64(&mm.currentCacheSize)
|
||||
}
|
||||
|
||||
// GetOriginalCacheSize returns the original cache size
|
||||
func (mm *MemoryMonitor) GetOriginalCacheSize() uint64 {
|
||||
mm.mu.RLock()
|
||||
defer mm.mu.RUnlock()
|
||||
return mm.originalCacheSize
|
||||
}
|
||||
|
||||
// GetAdjustmentCount returns the number of adjustments made
|
||||
func (mm *MemoryMonitor) GetAdjustmentCount() int64 {
|
||||
return atomic.LoadInt64(&mm.adjustmentCount)
|
||||
}
|
||||
|
||||
// adjustmentLoop runs the cache size adjustment loop
|
||||
func (mm *MemoryMonitor) adjustmentLoop() {
|
||||
ticker := time.NewTicker(mm.adjustmentInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
mm.performAdjustment()
|
||||
}
|
||||
}
|
||||
|
||||
// performAdjustment performs a cache size adjustment if needed
|
||||
func (mm *MemoryMonitor) performAdjustment() {
|
||||
// Prevent concurrent adjustments
|
||||
if !atomic.CompareAndSwapInt32(&mm.isAdjusting, 0, 1) {
|
||||
return
|
||||
}
|
||||
defer atomic.StoreInt32(&mm.isAdjusting, 0)
|
||||
|
||||
// Check if enough time has passed since last adjustment
|
||||
if time.Since(mm.lastAdjustment) < mm.adjustmentInterval {
|
||||
return
|
||||
}
|
||||
|
||||
// Get recommended cache size
|
||||
recommendedSize := mm.GetRecommendedCacheSize(mm.originalCacheSize)
|
||||
currentSize := atomic.LoadUint64(&mm.currentCacheSize)
|
||||
|
||||
// Only adjust if there's a significant difference (more than 5%)
|
||||
sizeDiff := float64(recommendedSize) / float64(currentSize)
|
||||
if sizeDiff < 0.95 || sizeDiff > 1.05 {
|
||||
mm.adjustCacheSize(recommendedSize)
|
||||
mm.lastAdjustment = time.Now()
|
||||
atomic.AddInt64(&mm.adjustmentCount, 1)
|
||||
}
|
||||
}
|
||||
|
||||
// adjustCacheSize adjusts the cache size to the recommended size
|
||||
func (mm *MemoryMonitor) adjustCacheSize(newSize uint64) {
|
||||
mm.mu.Lock()
|
||||
defer mm.mu.Unlock()
|
||||
|
||||
oldSize := atomic.LoadUint64(&mm.currentCacheSize)
|
||||
atomic.StoreUint64(&mm.currentCacheSize, newSize)
|
||||
|
||||
// If we're reducing the cache size, trigger GC to free up memory
|
||||
if newSize < oldSize {
|
||||
// Calculate how much to free
|
||||
bytesToFree := oldSize - newSize
|
||||
|
||||
// Trigger GC on the cache to free up the excess memory
|
||||
// This is a simplified approach - in practice, you'd want to integrate
|
||||
// with the actual GC system to free the right amount
|
||||
if gcCache, ok := mm.cache.(interface{ ForceGC(uint) }); ok {
|
||||
gcCache.ForceGC(uint(bytesToFree))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetDynamicStats returns statistics about the dynamic cache manager
|
||||
func (mm *MemoryMonitor) GetDynamicStats() map[string]interface{} {
|
||||
mm.mu.RLock()
|
||||
defer mm.mu.RUnlock()
|
||||
|
||||
return map[string]interface{}{
|
||||
"original_cache_size": mm.originalCacheSize,
|
||||
"current_cache_size": atomic.LoadUint64(&mm.currentCacheSize),
|
||||
"adjustment_count": atomic.LoadInt64(&mm.adjustmentCount),
|
||||
"last_adjustment": mm.lastAdjustment,
|
||||
"memory_utilization": mm.GetMemoryUtilization(),
|
||||
"target_memory_usage": mm.GetTargetMemoryUsage(),
|
||||
"current_memory_usage": mm.GetCurrentMemoryUsage(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,9 +70,35 @@ type PopularContent struct {
|
||||
|
||||
// WarmRequest represents a cache warming request
|
||||
type WarmRequest struct {
|
||||
Key string
|
||||
Priority int
|
||||
Reason string
|
||||
Key string
|
||||
Priority int
|
||||
Reason string
|
||||
Size int64
|
||||
RequestedAt time.Time
|
||||
Source string // Where the warming request came from
|
||||
}
|
||||
|
||||
// ActiveWarmer tracks an active warming operation
|
||||
type ActiveWarmer struct {
|
||||
Key string
|
||||
StartTime time.Time
|
||||
Priority int
|
||||
Reason string
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// WarmingStats tracks cache warming statistics
|
||||
type WarmingStats struct {
|
||||
WarmRequests int64
|
||||
WarmSuccesses int64
|
||||
WarmFailures int64
|
||||
WarmBytes int64
|
||||
WarmDuration time.Duration
|
||||
PrefetchRequests int64
|
||||
PrefetchSuccesses int64
|
||||
PrefetchFailures int64
|
||||
PrefetchBytes int64
|
||||
PrefetchDuration time.Duration
|
||||
}
|
||||
|
||||
// NewPredictiveCacheManager creates a new predictive cache manager
|
||||
@@ -114,6 +140,21 @@ func NewCacheWarmer() *CacheWarmer {
|
||||
}
|
||||
}
|
||||
|
||||
// NewWarmingStats creates a new warming stats tracker
|
||||
func NewWarmingStats() *WarmingStats {
|
||||
return &WarmingStats{}
|
||||
}
|
||||
|
||||
// NewActiveWarmer creates a new active warmer tracker
|
||||
func NewActiveWarmer(key string, priority int, reason string) *ActiveWarmer {
|
||||
return &ActiveWarmer{
|
||||
Key: key,
|
||||
StartTime: time.Now(),
|
||||
Priority: priority,
|
||||
Reason: reason,
|
||||
}
|
||||
}
|
||||
|
||||
// RecordAccess records a file access for prediction analysis (lightweight version)
|
||||
func (pcm *PredictiveCacheManager) RecordAccess(key string, previousKey string, size int64) {
|
||||
// Only record if we have a previous key to avoid overhead
|
||||
@@ -282,6 +323,23 @@ func (cw *CacheWarmer) GetPopularContent(limit int) []*PopularContent {
|
||||
return popular
|
||||
}
|
||||
|
||||
// RequestWarming requests warming of a specific key
|
||||
func (cw *CacheWarmer) RequestWarming(key string, priority int, reason string, size int64) {
|
||||
select {
|
||||
case cw.warmerQueue <- WarmRequest{
|
||||
Key: key,
|
||||
Priority: priority,
|
||||
Reason: reason,
|
||||
Size: size,
|
||||
RequestedAt: time.Now(),
|
||||
Source: "predictive",
|
||||
}:
|
||||
// Successfully queued
|
||||
default:
|
||||
// Queue full, skip warming
|
||||
}
|
||||
}
|
||||
|
||||
// prefetchWorker processes prefetch requests
|
||||
func (pcm *PredictiveCacheManager) prefetchWorker() {
|
||||
defer pcm.wg.Done()
|
||||
|
||||
@@ -1,300 +0,0 @@
|
||||
package warming
|
||||
|
||||
import (
|
||||
"context"
|
||||
"s1d3sw1ped/steamcache2/vfs"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// CacheWarmer implements intelligent cache warming strategies
|
||||
type CacheWarmer struct {
|
||||
vfs vfs.VFS
|
||||
warmingQueue chan WarmRequest
|
||||
activeWarmers map[string]*ActiveWarmer
|
||||
stats *WarmingStats
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
mu sync.RWMutex
|
||||
maxConcurrent int
|
||||
warmingEnabled bool
|
||||
}
|
||||
|
||||
// WarmRequest represents a cache warming request
|
||||
type WarmRequest struct {
|
||||
Key string
|
||||
Priority int
|
||||
Reason string
|
||||
Size int64
|
||||
RequestedAt time.Time
|
||||
Source string // Where the warming request came from
|
||||
}
|
||||
|
||||
// ActiveWarmer tracks an active warming operation
|
||||
type ActiveWarmer struct {
|
||||
Key string
|
||||
StartTime time.Time
|
||||
Priority int
|
||||
Reason string
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// WarmingStats tracks cache warming statistics
|
||||
type WarmingStats struct {
|
||||
WarmRequests int64
|
||||
WarmSuccesses int64
|
||||
WarmFailures int64
|
||||
WarmBytes int64
|
||||
WarmDuration time.Duration
|
||||
ActiveWarmers int64
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// WarmingStrategy defines different warming strategies
|
||||
type WarmingStrategy int
|
||||
|
||||
const (
|
||||
StrategyImmediate WarmingStrategy = iota
|
||||
StrategyBackground
|
||||
StrategyScheduled
|
||||
StrategyPredictive
|
||||
)
|
||||
|
||||
// NewCacheWarmer creates a new cache warmer
|
||||
func NewCacheWarmer(vfs vfs.VFS, maxConcurrent int) *CacheWarmer {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
cw := &CacheWarmer{
|
||||
vfs: vfs,
|
||||
warmingQueue: make(chan WarmRequest, 1000),
|
||||
activeWarmers: make(map[string]*ActiveWarmer),
|
||||
stats: &WarmingStats{},
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
maxConcurrent: maxConcurrent,
|
||||
warmingEnabled: true,
|
||||
}
|
||||
|
||||
// Start warming workers
|
||||
for i := 0; i < maxConcurrent; i++ {
|
||||
cw.wg.Add(1)
|
||||
go cw.warmingWorker(i)
|
||||
}
|
||||
|
||||
// Start cleanup worker
|
||||
cw.wg.Add(1)
|
||||
go cw.cleanupWorker()
|
||||
|
||||
return cw
|
||||
}
|
||||
|
||||
// RequestWarming requests warming of content
|
||||
func (cw *CacheWarmer) RequestWarming(key string, priority int, reason string, size int64, source string) {
|
||||
if !cw.warmingEnabled {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if already warming
|
||||
cw.mu.RLock()
|
||||
if _, exists := cw.activeWarmers[key]; exists {
|
||||
cw.mu.RUnlock()
|
||||
return // Already warming
|
||||
}
|
||||
cw.mu.RUnlock()
|
||||
|
||||
// Check if already cached
|
||||
if _, err := cw.vfs.Stat(key); err == nil {
|
||||
return // Already cached
|
||||
}
|
||||
|
||||
select {
|
||||
case cw.warmingQueue <- WarmRequest{
|
||||
Key: key,
|
||||
Priority: priority,
|
||||
Reason: reason,
|
||||
Size: size,
|
||||
RequestedAt: time.Now(),
|
||||
Source: source,
|
||||
}:
|
||||
atomic.AddInt64(&cw.stats.WarmRequests, 1)
|
||||
default:
|
||||
// Queue full, skip warming
|
||||
}
|
||||
}
|
||||
|
||||
// warmingWorker processes warming requests
|
||||
func (cw *CacheWarmer) warmingWorker(workerID int) {
|
||||
defer cw.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-cw.ctx.Done():
|
||||
return
|
||||
case req := <-cw.warmingQueue:
|
||||
cw.processWarmingRequest(req, workerID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processWarmingRequest processes a warming request
|
||||
func (cw *CacheWarmer) processWarmingRequest(req WarmRequest, workerID int) {
|
||||
// Mark as active warmer
|
||||
cw.mu.Lock()
|
||||
cw.activeWarmers[req.Key] = &ActiveWarmer{
|
||||
Key: req.Key,
|
||||
StartTime: time.Now(),
|
||||
Priority: req.Priority,
|
||||
Reason: req.Reason,
|
||||
}
|
||||
cw.mu.Unlock()
|
||||
|
||||
atomic.AddInt64(&cw.stats.ActiveWarmers, 1)
|
||||
|
||||
// Simulate warming process
|
||||
// In a real implementation, this would:
|
||||
// 1. Fetch content from upstream
|
||||
// 2. Store in cache
|
||||
// 3. Update statistics
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
// Simulate warming delay based on priority
|
||||
warmingDelay := time.Duration(100-req.Priority*10) * time.Millisecond
|
||||
if warmingDelay < 10*time.Millisecond {
|
||||
warmingDelay = 10 * time.Millisecond
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(warmingDelay):
|
||||
// Warming completed successfully
|
||||
atomic.AddInt64(&cw.stats.WarmSuccesses, 1)
|
||||
atomic.AddInt64(&cw.stats.WarmBytes, req.Size)
|
||||
case <-cw.ctx.Done():
|
||||
// Context cancelled
|
||||
atomic.AddInt64(&cw.stats.WarmFailures, 1)
|
||||
}
|
||||
|
||||
duration := time.Since(startTime)
|
||||
cw.stats.mu.Lock()
|
||||
cw.stats.WarmDuration += duration
|
||||
cw.stats.mu.Unlock()
|
||||
|
||||
// Remove from active warmers
|
||||
cw.mu.Lock()
|
||||
delete(cw.activeWarmers, req.Key)
|
||||
cw.mu.Unlock()
|
||||
|
||||
atomic.AddInt64(&cw.stats.ActiveWarmers, -1)
|
||||
}
|
||||
|
||||
// cleanupWorker cleans up old warming requests
|
||||
func (cw *CacheWarmer) cleanupWorker() {
|
||||
defer cw.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-cw.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
cw.cleanupOldWarmers()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cleanupOldWarmers removes old warming requests
|
||||
func (cw *CacheWarmer) cleanupOldWarmers() {
|
||||
cw.mu.Lock()
|
||||
defer cw.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
cutoff := now.Add(-5 * time.Minute) // Remove warmers older than 5 minutes
|
||||
|
||||
for key, warmer := range cw.activeWarmers {
|
||||
warmer.mu.RLock()
|
||||
if warmer.StartTime.Before(cutoff) {
|
||||
warmer.mu.RUnlock()
|
||||
delete(cw.activeWarmers, key)
|
||||
atomic.AddInt64(&cw.stats.WarmFailures, 1)
|
||||
} else {
|
||||
warmer.mu.RUnlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetActiveWarmers returns currently active warming operations
|
||||
func (cw *CacheWarmer) GetActiveWarmers() []*ActiveWarmer {
|
||||
cw.mu.RLock()
|
||||
defer cw.mu.RUnlock()
|
||||
|
||||
warmers := make([]*ActiveWarmer, 0, len(cw.activeWarmers))
|
||||
for _, warmer := range cw.activeWarmers {
|
||||
warmers = append(warmers, warmer)
|
||||
}
|
||||
|
||||
return warmers
|
||||
}
|
||||
|
||||
// GetStats returns warming statistics
|
||||
func (cw *CacheWarmer) GetStats() *WarmingStats {
|
||||
cw.stats.mu.RLock()
|
||||
defer cw.stats.mu.RUnlock()
|
||||
|
||||
return &WarmingStats{
|
||||
WarmRequests: atomic.LoadInt64(&cw.stats.WarmRequests),
|
||||
WarmSuccesses: atomic.LoadInt64(&cw.stats.WarmSuccesses),
|
||||
WarmFailures: atomic.LoadInt64(&cw.stats.WarmFailures),
|
||||
WarmBytes: atomic.LoadInt64(&cw.stats.WarmBytes),
|
||||
WarmDuration: cw.stats.WarmDuration,
|
||||
ActiveWarmers: atomic.LoadInt64(&cw.stats.ActiveWarmers),
|
||||
}
|
||||
}
|
||||
|
||||
// SetWarmingEnabled enables or disables cache warming
|
||||
func (cw *CacheWarmer) SetWarmingEnabled(enabled bool) {
|
||||
cw.mu.Lock()
|
||||
defer cw.mu.Unlock()
|
||||
cw.warmingEnabled = enabled
|
||||
}
|
||||
|
||||
// IsWarmingEnabled returns whether warming is enabled
|
||||
func (cw *CacheWarmer) IsWarmingEnabled() bool {
|
||||
cw.mu.RLock()
|
||||
defer cw.mu.RUnlock()
|
||||
return cw.warmingEnabled
|
||||
}
|
||||
|
||||
// Stop stops the cache warmer
|
||||
func (cw *CacheWarmer) Stop() {
|
||||
cw.cancel()
|
||||
cw.wg.Wait()
|
||||
}
|
||||
|
||||
// WarmPopularContent warms popular content based on access patterns
|
||||
func (cw *CacheWarmer) WarmPopularContent(popularKeys []string, priority int) {
|
||||
for _, key := range popularKeys {
|
||||
cw.RequestWarming(key, priority, "popular_content", 0, "popular_analyzer")
|
||||
}
|
||||
}
|
||||
|
||||
// WarmPredictedContent warms predicted content
|
||||
func (cw *CacheWarmer) WarmPredictedContent(predictedKeys []string, priority int) {
|
||||
for _, key := range predictedKeys {
|
||||
cw.RequestWarming(key, priority, "predicted_access", 0, "predictor")
|
||||
}
|
||||
}
|
||||
|
||||
// WarmSequentialContent warms content in sequential order
|
||||
func (cw *CacheWarmer) WarmSequentialContent(sequentialKeys []string, priority int) {
|
||||
for i, key := range sequentialKeys {
|
||||
// Stagger warming requests to avoid overwhelming the system
|
||||
go func(k string, delay time.Duration) {
|
||||
time.Sleep(delay)
|
||||
cw.RequestWarming(k, priority, "sequential_access", 0, "sequential_analyzer")
|
||||
}(key, time.Duration(i)*100*time.Millisecond)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user