// vfs/gc/gc.go package gc import ( "context" "io" "s1d3sw1ped/steamcache2/vfs" "s1d3sw1ped/steamcache2/vfs/eviction" "sync" "sync/atomic" "time" ) // GCAlgorithm represents different garbage collection strategies type GCAlgorithm string const ( LRU GCAlgorithm = "lru" LFU GCAlgorithm = "lfu" FIFO GCAlgorithm = "fifo" Largest GCAlgorithm = "largest" Smallest GCAlgorithm = "smallest" Hybrid GCAlgorithm = "hybrid" ) // GCFS wraps a VFS with garbage collection capabilities type GCFS struct { vfs vfs.VFS algorithm GCAlgorithm gcFunc func(vfs.VFS, uint) uint } // New creates a new GCFS with the specified algorithm func New(wrappedVFS vfs.VFS, algorithm GCAlgorithm) *GCFS { gcfs := &GCFS{ vfs: wrappedVFS, algorithm: algorithm, } gcfs.gcFunc = eviction.GetEvictionFunction(eviction.EvictionStrategy(algorithm)) return gcfs } // GetGCAlgorithm returns the GC function for the given algorithm func GetGCAlgorithm(algorithm GCAlgorithm) func(vfs.VFS, uint) uint { return eviction.GetEvictionFunction(eviction.EvictionStrategy(algorithm)) } // Create wraps the underlying Create method func (gc *GCFS) Create(key string, size int64) (io.WriteCloser, error) { // Check if we need to GC before creating if gc.vfs.Size()+size > gc.vfs.Capacity() { needed := uint((gc.vfs.Size() + size) - gc.vfs.Capacity()) gc.gcFunc(gc.vfs, needed) } return gc.vfs.Create(key, size) } // Open wraps the underlying Open method func (gc *GCFS) Open(key string) (io.ReadCloser, error) { return gc.vfs.Open(key) } // Delete wraps the underlying Delete method func (gc *GCFS) Delete(key string) error { return gc.vfs.Delete(key) } // Stat wraps the underlying Stat method func (gc *GCFS) Stat(key string) (*vfs.FileInfo, error) { return gc.vfs.Stat(key) } // Name wraps the underlying Name method func (gc *GCFS) Name() string { return gc.vfs.Name() + "(GC:" + string(gc.algorithm) + ")" } // Size wraps the underlying Size method func (gc *GCFS) Size() int64 { return gc.vfs.Size() } // Capacity wraps the underlying Capacity method func (gc *GCFS) Capacity() int64 { return gc.vfs.Capacity() } // EvictionStrategy defines an interface for cache eviction type EvictionStrategy interface { Evict(vfs vfs.VFS, bytesNeeded uint) uint } // AdaptivePromotionDeciderFunc is a placeholder for the adaptive promotion logic var AdaptivePromotionDeciderFunc = func() interface{} { return nil } // AsyncGCFS wraps a GCFS with asynchronous garbage collection capabilities type AsyncGCFS struct { *GCFS gcQueue chan gcRequest ctx context.Context cancel context.CancelFunc wg sync.WaitGroup gcRunning int32 preemptive bool asyncThreshold float64 // Async GC threshold as percentage of capacity (e.g., 0.8 = 80%) syncThreshold float64 // Sync GC threshold as percentage of capacity (e.g., 0.95 = 95%) hardLimit float64 // Hard limit threshold (e.g., 1.0 = 100%) } type gcRequest struct { bytesNeeded uint priority int // Higher number = higher priority } // NewAsync creates a new AsyncGCFS with asynchronous garbage collection func NewAsync(wrappedVFS vfs.VFS, algorithm GCAlgorithm, preemptive bool, asyncThreshold, syncThreshold, hardLimit float64) *AsyncGCFS { ctx, cancel := context.WithCancel(context.Background()) asyncGC := &AsyncGCFS{ GCFS: New(wrappedVFS, algorithm), gcQueue: make(chan gcRequest, 100), // Buffer for GC requests ctx: ctx, cancel: cancel, preemptive: preemptive, asyncThreshold: asyncThreshold, syncThreshold: syncThreshold, hardLimit: hardLimit, } // Start the background GC worker asyncGC.wg.Add(1) go asyncGC.gcWorker() // Start preemptive GC if enabled if preemptive { asyncGC.wg.Add(1) go asyncGC.preemptiveGC() } return asyncGC } // Create wraps the underlying Create method with hybrid GC (async + sync hard limits) func (agc *AsyncGCFS) Create(key string, size int64) (io.WriteCloser, error) { currentSize := agc.vfs.Size() capacity := agc.vfs.Capacity() projectedSize := currentSize + size // Calculate utilization percentages currentUtilization := float64(currentSize) / float64(capacity) projectedUtilization := float64(projectedSize) / float64(capacity) // Hard limit check - never exceed the hard limit if projectedUtilization > agc.hardLimit { needed := uint(projectedSize - capacity) // Immediate sync GC to prevent exceeding hard limit agc.gcFunc(agc.vfs, needed) } else if projectedUtilization > agc.syncThreshold { // Near hard limit - do immediate sync GC needed := uint(projectedSize - int64(float64(capacity)*agc.syncThreshold)) agc.gcFunc(agc.vfs, needed) } else if currentUtilization > agc.asyncThreshold { // Above async threshold - queue for async GC needed := uint(projectedSize - int64(float64(capacity)*agc.asyncThreshold)) select { case agc.gcQueue <- gcRequest{bytesNeeded: needed, priority: 2}: default: // Queue full, do immediate GC agc.gcFunc(agc.vfs, needed) } } return agc.vfs.Create(key, size) } // gcWorker processes GC requests asynchronously func (agc *AsyncGCFS) gcWorker() { defer agc.wg.Done() ticker := time.NewTicker(100 * time.Millisecond) // Check every 100ms defer ticker.Stop() for { select { case <-agc.ctx.Done(): return case req := <-agc.gcQueue: atomic.StoreInt32(&agc.gcRunning, 1) agc.gcFunc(agc.vfs, req.bytesNeeded) atomic.StoreInt32(&agc.gcRunning, 0) case <-ticker.C: // Process any pending GC requests select { case req := <-agc.gcQueue: atomic.StoreInt32(&agc.gcRunning, 1) agc.gcFunc(agc.vfs, req.bytesNeeded) atomic.StoreInt32(&agc.gcRunning, 0) default: // No pending requests } } } } // preemptiveGC runs background GC to keep cache utilization below threshold func (agc *AsyncGCFS) preemptiveGC() { defer agc.wg.Done() ticker := time.NewTicker(5 * time.Second) // Check every 5 seconds defer ticker.Stop() for { select { case <-agc.ctx.Done(): return case <-ticker.C: currentSize := agc.vfs.Size() capacity := agc.vfs.Capacity() currentUtilization := float64(currentSize) / float64(capacity) // Check if we're above the async threshold if currentUtilization > agc.asyncThreshold { // Calculate how much to free to get back to async threshold targetSize := int64(float64(capacity) * agc.asyncThreshold) if currentSize > targetSize { overage := currentSize - targetSize select { case agc.gcQueue <- gcRequest{bytesNeeded: uint(overage), priority: 0}: default: // Queue full, skip this round } } } } } } // Stop stops the async GC workers func (agc *AsyncGCFS) Stop() { agc.cancel() agc.wg.Wait() } // IsGCRunning returns true if GC is currently running func (agc *AsyncGCFS) IsGCRunning() bool { return atomic.LoadInt32(&agc.gcRunning) == 1 } // ForceGC forces immediate garbage collection to free the specified number of bytes func (agc *AsyncGCFS) ForceGC(bytesNeeded uint) { agc.gcFunc(agc.vfs, bytesNeeded) }