Refactor caching and memory management components
All checks were successful
Release Tag / release (push) Successful in 9s

- Updated the caching logic to utilize a predictive cache warmer, enhancing content prefetching based on access patterns.
- Replaced the legacy warming system with a more efficient predictive approach, allowing for better performance and resource management.
- Refactored memory management to integrate dynamic cache size adjustments based on system memory usage, improving overall efficiency.
- Simplified the VFS interface and improved concurrency handling with sharded locks for better performance in multi-threaded environments.
- Enhanced tests to validate the new caching and memory management behaviors, ensuring reliability and performance improvements.
This commit is contained in:
2025-09-22 01:59:15 -05:00
parent 9b2affe95a
commit bfe29dea75
13 changed files with 612 additions and 1215 deletions

View File

@@ -1,130 +0,0 @@
package memory
import (
"s1d3sw1ped/steamcache2/vfs"
"sync"
"sync/atomic"
"time"
)
// DynamicCacheManager manages cache size adjustments based on system memory usage
type DynamicCacheManager struct {
originalCacheSize uint64
currentCacheSize uint64
memoryMonitor *MemoryMonitor
cache vfs.VFS
adjustmentInterval time.Duration
lastAdjustment time.Time
mu sync.RWMutex
adjustmentCount int64
isAdjusting int32
}
// NewDynamicCacheManager creates a new dynamic cache manager
func NewDynamicCacheManager(cache vfs.VFS, originalSize uint64, memoryMonitor *MemoryMonitor) *DynamicCacheManager {
return &DynamicCacheManager{
originalCacheSize: originalSize,
currentCacheSize: originalSize,
memoryMonitor: memoryMonitor,
cache: cache,
adjustmentInterval: 30 * time.Second, // Adjust every 30 seconds
}
}
// Start begins the dynamic cache size adjustment process
func (dcm *DynamicCacheManager) Start() {
go dcm.adjustmentLoop()
}
// GetCurrentCacheSize returns the current cache size
func (dcm *DynamicCacheManager) GetCurrentCacheSize() uint64 {
dcm.mu.RLock()
defer dcm.mu.RUnlock()
return atomic.LoadUint64(&dcm.currentCacheSize)
}
// GetOriginalCacheSize returns the original cache size
func (dcm *DynamicCacheManager) GetOriginalCacheSize() uint64 {
dcm.mu.RLock()
defer dcm.mu.RUnlock()
return dcm.originalCacheSize
}
// GetAdjustmentCount returns the number of adjustments made
func (dcm *DynamicCacheManager) GetAdjustmentCount() int64 {
return atomic.LoadInt64(&dcm.adjustmentCount)
}
// adjustmentLoop runs the cache size adjustment loop
func (dcm *DynamicCacheManager) adjustmentLoop() {
ticker := time.NewTicker(dcm.adjustmentInterval)
defer ticker.Stop()
for range ticker.C {
dcm.performAdjustment()
}
}
// performAdjustment performs a cache size adjustment if needed
func (dcm *DynamicCacheManager) performAdjustment() {
// Prevent concurrent adjustments
if !atomic.CompareAndSwapInt32(&dcm.isAdjusting, 0, 1) {
return
}
defer atomic.StoreInt32(&dcm.isAdjusting, 0)
// Check if enough time has passed since last adjustment
if time.Since(dcm.lastAdjustment) < dcm.adjustmentInterval {
return
}
// Get recommended cache size
recommendedSize := dcm.memoryMonitor.GetRecommendedCacheSize(dcm.originalCacheSize)
currentSize := atomic.LoadUint64(&dcm.currentCacheSize)
// Only adjust if there's a significant difference (more than 5%)
sizeDiff := float64(recommendedSize) / float64(currentSize)
if sizeDiff < 0.95 || sizeDiff > 1.05 {
dcm.adjustCacheSize(recommendedSize)
dcm.lastAdjustment = time.Now()
atomic.AddInt64(&dcm.adjustmentCount, 1)
}
}
// adjustCacheSize adjusts the cache size to the recommended size
func (dcm *DynamicCacheManager) adjustCacheSize(newSize uint64) {
dcm.mu.Lock()
defer dcm.mu.Unlock()
oldSize := atomic.LoadUint64(&dcm.currentCacheSize)
atomic.StoreUint64(&dcm.currentCacheSize, newSize)
// If we're reducing the cache size, trigger GC to free up memory
if newSize < oldSize {
// Calculate how much to free
bytesToFree := oldSize - newSize
// Trigger GC on the cache to free up the excess memory
// This is a simplified approach - in practice, you'd want to integrate
// with the actual GC system to free the right amount
if gcCache, ok := dcm.cache.(interface{ ForceGC(uint) }); ok {
gcCache.ForceGC(uint(bytesToFree))
}
}
}
// GetStats returns statistics about the dynamic cache manager
func (dcm *DynamicCacheManager) GetStats() map[string]interface{} {
dcm.mu.RLock()
defer dcm.mu.RUnlock()
return map[string]interface{}{
"original_cache_size": dcm.originalCacheSize,
"current_cache_size": atomic.LoadUint64(&dcm.currentCacheSize),
"adjustment_count": atomic.LoadInt64(&dcm.adjustmentCount),
"last_adjustment": dcm.lastAdjustment,
"memory_utilization": dcm.memoryMonitor.GetMemoryUtilization(),
"target_memory_usage": dcm.memoryMonitor.GetTargetMemoryUsage(),
"current_memory_usage": dcm.memoryMonitor.GetCurrentMemoryUsage(),
}
}

View File

@@ -3,8 +3,10 @@ package memory
import (
"bytes"
"container/list"
"io"
"s1d3sw1ped/steamcache2/vfs"
"s1d3sw1ped/steamcache2/vfs/locks"
"s1d3sw1ped/steamcache2/vfs/lru"
"s1d3sw1ped/steamcache2/vfs/types"
"s1d3sw1ped/steamcache2/vfs/vfserror"
"sort"
@@ -13,32 +15,8 @@ import (
"time"
)
// VFS defines the interface for virtual file systems
type VFS interface {
// Create creates a new file at the given key
Create(key string, size int64) (io.WriteCloser, error)
// Open opens the file at the given key for reading
Open(key string) (io.ReadCloser, error)
// Delete removes the file at the given key
Delete(key string) error
// Stat returns information about the file at the given key
Stat(key string) (*types.FileInfo, error)
// Name returns the name of this VFS
Name() string
// Size returns the current size of the VFS
Size() int64
// Capacity returns the maximum capacity of the VFS
Capacity() int64
}
// Ensure MemoryFS implements VFS.
var _ VFS = (*MemoryFS)(nil)
var _ vfs.VFS = (*MemoryFS)(nil)
// MemoryFS is an in-memory virtual file system
type MemoryFS struct {
@@ -48,55 +26,10 @@ type MemoryFS struct {
size int64
mu sync.RWMutex
keyLocks []sync.Map // Sharded lock pools for better concurrency
LRU *lruList
LRU *lru.LRUList[*types.FileInfo]
timeUpdater *types.BatchedTimeUpdate // Batched time updates for better performance
}
// Number of lock shards for reducing contention
const numLockShards = 32
// lruList for time-decayed LRU eviction
type lruList struct {
list *list.List
elem map[string]*list.Element
}
func newLruList() *lruList {
return &lruList{
list: list.New(),
elem: make(map[string]*list.Element),
}
}
func (l *lruList) Add(key string, fi *types.FileInfo) {
elem := l.list.PushFront(fi)
l.elem[key] = elem
}
func (l *lruList) MoveToFront(key string, timeUpdater *types.BatchedTimeUpdate) {
if elem, exists := l.elem[key]; exists {
l.list.MoveToFront(elem)
// Update the FileInfo in the element with new access time
if fi := elem.Value.(*types.FileInfo); fi != nil {
fi.UpdateAccessBatched(timeUpdater)
}
}
}
func (l *lruList) Remove(key string) *types.FileInfo {
if elem, exists := l.elem[key]; exists {
delete(l.elem, key)
if fi := l.list.Remove(elem).(*types.FileInfo); fi != nil {
return fi
}
}
return nil
}
func (l *lruList) Len() int {
return l.list.Len()
}
// New creates a new MemoryFS
func New(capacity int64) *MemoryFS {
if capacity <= 0 {
@@ -104,7 +37,7 @@ func New(capacity int64) *MemoryFS {
}
// Initialize sharded locks
keyLocks := make([]sync.Map, numLockShards)
keyLocks := make([]sync.Map, locks.NumLockShards)
return &MemoryFS{
data: make(map[string]*bytes.Buffer),
@@ -112,7 +45,7 @@ func New(capacity int64) *MemoryFS {
capacity: capacity,
size: 0,
keyLocks: keyLocks,
LRU: newLruList(),
LRU: lru.NewLRUList[*types.FileInfo](),
timeUpdater: types.NewBatchedTimeUpdate(100 * time.Millisecond), // Update time every 100ms
}
}
@@ -163,24 +96,9 @@ func (m *MemoryFS) GetFragmentationStats() map[string]interface{} {
}
}
// getShardIndex returns the shard index for a given key
func getShardIndex(key string) int {
// Use FNV-1a hash for good distribution
var h uint32 = 2166136261 // FNV offset basis
for i := 0; i < len(key); i++ {
h ^= uint32(key[i])
h *= 16777619 // FNV prime
}
return int(h % numLockShards)
}
// getKeyLock returns a lock for the given key using sharding
func (m *MemoryFS) getKeyLock(key string) *sync.RWMutex {
shardIndex := getShardIndex(key)
shard := &m.keyLocks[shardIndex]
keyLock, _ := shard.LoadOrStore(key, &sync.RWMutex{})
return keyLock.(*sync.RWMutex)
return locks.GetKeyLock(m.keyLocks, key)
}
// Create creates a new file
@@ -391,7 +309,7 @@ func (m *MemoryFS) EvictLRU(bytesNeeded uint) uint {
// Evict from LRU list until we free enough space
for m.size > m.capacity-int64(bytesNeeded) && m.LRU.Len() > 0 {
// Get the least recently used item
elem := m.LRU.list.Back()
elem := m.LRU.Back()
if elem == nil {
break
}
@@ -411,7 +329,7 @@ func (m *MemoryFS) EvictLRU(bytesNeeded uint) uint {
evicted += uint(fi.Size)
// Clean up key lock
shardIndex := getShardIndex(key)
shardIndex := locks.GetShardIndex(key)
m.keyLocks[shardIndex].Delete(key)
}
@@ -459,7 +377,7 @@ func (m *MemoryFS) EvictBySize(bytesNeeded uint, ascending bool) uint {
evicted += uint(fi.Size)
// Clean up key lock
shardIndex := getShardIndex(key)
shardIndex := locks.GetShardIndex(key)
m.keyLocks[shardIndex].Delete(key)
}
@@ -504,7 +422,7 @@ func (m *MemoryFS) EvictFIFO(bytesNeeded uint) uint {
evicted += uint(fi.Size)
// Clean up key lock
shardIndex := getShardIndex(key)
shardIndex := locks.GetShardIndex(key)
m.keyLocks[shardIndex].Delete(key)
}

View File

@@ -17,6 +17,15 @@ type MemoryMonitor struct {
ctx chan struct{}
stopChan chan struct{}
isMonitoring int32
// Dynamic cache management fields
originalCacheSize uint64
currentCacheSize uint64
cache interface{} // Generic cache interface
adjustmentInterval time.Duration
lastAdjustment time.Time
adjustmentCount int64
isAdjusting int32
}
// NewMemoryMonitor creates a new memory monitor
@@ -27,9 +36,19 @@ func NewMemoryMonitor(targetMemoryUsage uint64, monitoringInterval time.Duration
adjustmentThreshold: adjustmentThreshold,
ctx: make(chan struct{}),
stopChan: make(chan struct{}),
adjustmentInterval: 30 * time.Second, // Default adjustment interval
}
}
// NewMemoryMonitorWithCache creates a new memory monitor with cache management
func NewMemoryMonitorWithCache(targetMemoryUsage uint64, monitoringInterval time.Duration, adjustmentThreshold float64, cache interface{}, originalCacheSize uint64) *MemoryMonitor {
mm := NewMemoryMonitor(targetMemoryUsage, monitoringInterval, adjustmentThreshold)
mm.cache = cache
mm.originalCacheSize = originalCacheSize
mm.currentCacheSize = originalCacheSize
return mm
}
// Start begins monitoring memory usage
func (mm *MemoryMonitor) Start() {
if atomic.CompareAndSwapInt32(&mm.isMonitoring, 0, 1) {
@@ -151,3 +170,105 @@ func (mm *MemoryMonitor) GetMemoryStats() map[string]interface{} {
"gc_pause_total": m.PauseTotalNs,
}
}
// Dynamic Cache Management Methods
// StartDynamicAdjustment begins the dynamic cache size adjustment process
func (mm *MemoryMonitor) StartDynamicAdjustment() {
if mm.cache != nil {
go mm.adjustmentLoop()
}
}
// GetCurrentCacheSize returns the current cache size
func (mm *MemoryMonitor) GetCurrentCacheSize() uint64 {
mm.mu.RLock()
defer mm.mu.RUnlock()
return atomic.LoadUint64(&mm.currentCacheSize)
}
// GetOriginalCacheSize returns the original cache size
func (mm *MemoryMonitor) GetOriginalCacheSize() uint64 {
mm.mu.RLock()
defer mm.mu.RUnlock()
return mm.originalCacheSize
}
// GetAdjustmentCount returns the number of adjustments made
func (mm *MemoryMonitor) GetAdjustmentCount() int64 {
return atomic.LoadInt64(&mm.adjustmentCount)
}
// adjustmentLoop runs the cache size adjustment loop
func (mm *MemoryMonitor) adjustmentLoop() {
ticker := time.NewTicker(mm.adjustmentInterval)
defer ticker.Stop()
for range ticker.C {
mm.performAdjustment()
}
}
// performAdjustment performs a cache size adjustment if needed
func (mm *MemoryMonitor) performAdjustment() {
// Prevent concurrent adjustments
if !atomic.CompareAndSwapInt32(&mm.isAdjusting, 0, 1) {
return
}
defer atomic.StoreInt32(&mm.isAdjusting, 0)
// Check if enough time has passed since last adjustment
if time.Since(mm.lastAdjustment) < mm.adjustmentInterval {
return
}
// Get recommended cache size
recommendedSize := mm.GetRecommendedCacheSize(mm.originalCacheSize)
currentSize := atomic.LoadUint64(&mm.currentCacheSize)
// Only adjust if there's a significant difference (more than 5%)
sizeDiff := float64(recommendedSize) / float64(currentSize)
if sizeDiff < 0.95 || sizeDiff > 1.05 {
mm.adjustCacheSize(recommendedSize)
mm.lastAdjustment = time.Now()
atomic.AddInt64(&mm.adjustmentCount, 1)
}
}
// adjustCacheSize adjusts the cache size to the recommended size
func (mm *MemoryMonitor) adjustCacheSize(newSize uint64) {
mm.mu.Lock()
defer mm.mu.Unlock()
oldSize := atomic.LoadUint64(&mm.currentCacheSize)
atomic.StoreUint64(&mm.currentCacheSize, newSize)
// If we're reducing the cache size, trigger GC to free up memory
if newSize < oldSize {
// Calculate how much to free
bytesToFree := oldSize - newSize
// Trigger GC on the cache to free up the excess memory
// This is a simplified approach - in practice, you'd want to integrate
// with the actual GC system to free the right amount
if gcCache, ok := mm.cache.(interface{ ForceGC(uint) }); ok {
gcCache.ForceGC(uint(bytesToFree))
}
}
}
// GetDynamicStats returns statistics about the dynamic cache manager
func (mm *MemoryMonitor) GetDynamicStats() map[string]interface{} {
mm.mu.RLock()
defer mm.mu.RUnlock()
return map[string]interface{}{
"original_cache_size": mm.originalCacheSize,
"current_cache_size": atomic.LoadUint64(&mm.currentCacheSize),
"adjustment_count": atomic.LoadInt64(&mm.adjustmentCount),
"last_adjustment": mm.lastAdjustment,
"memory_utilization": mm.GetMemoryUtilization(),
"target_memory_usage": mm.GetTargetMemoryUsage(),
"current_memory_usage": mm.GetCurrentMemoryUsage(),
}
}