4 Commits
1.0.16 ... main

Author SHA1 Message Date
f945ccef05 Enhance error handling and metrics tracking in SteamCache
- Introduced a new error handling system with custom error types for better context and clarity in error reporting.
- Implemented URL validation to prevent invalid requests and enhance security.
- Updated cache key generation functions to return errors, improving robustness in handling invalid inputs.
- Added comprehensive metrics tracking for requests, cache hits, misses, and performance metrics, allowing for better monitoring and analysis of the caching system.
- Enhanced logging to include detailed metrics and error information for improved debugging and operational insights.
2025-09-22 17:29:41 -05:00
3703e40442 Add comprehensive documentation for caching, configuration, development, and security patterns
- Introduced multiple new markdown files detailing caching patterns, configuration management, development workflows, Go language conventions, HTTP proxy patterns, logging and monitoring practices, performance optimization guidelines, project structure, security validation, and VFS architecture.
- Each document outlines best practices, patterns, and guidelines to enhance the understanding and implementation of various components within the SteamCache2 project.
- This documentation aims to improve maintainability, facilitate onboarding for new contributors, and ensure consistent application of coding and architectural standards across the codebase.
2025-09-22 17:29:26 -05:00
bfe29dea75 Refactor caching and memory management components
All checks were successful
Release Tag / release (push) Successful in 9s
- Updated the caching logic to utilize a predictive cache warmer, enhancing content prefetching based on access patterns.
- Replaced the legacy warming system with a more efficient predictive approach, allowing for better performance and resource management.
- Refactored memory management to integrate dynamic cache size adjustments based on system memory usage, improving overall efficiency.
- Simplified the VFS interface and improved concurrency handling with sharded locks for better performance in multi-threaded environments.
- Enhanced tests to validate the new caching and memory management behaviors, ensuring reliability and performance improvements.
2025-09-22 01:59:15 -05:00
9b2affe95a Refactor disk initialization and file processing in DiskFS
All checks were successful
Release Tag / release (push) Successful in 9s
- Replaced legacy depot file migration logic with concurrent directory scanning for improved performance.
- Introduced batch processing of files to minimize lock contention during initialization.
- Simplified the init function by removing unnecessary complexity and focusing on efficient file handling.
- Enhanced logging to provide better insights into directory scan progress and completion.
2025-09-22 00:51:51 -05:00
28 changed files with 2226 additions and 1243 deletions

View File

@@ -0,0 +1,64 @@
---
description: Caching system patterns and best practices
---
# Caching System Patterns
## Cache Key Generation
- Use SHA256 hashing for cache keys to ensure uniform distribution
- Include service prefix (e.g., "steam/", "epic/") based on User-Agent detection
- Never include query parameters in cache keys - strip them before hashing
- Cache keys should be deterministic and consistent
## Cache File Format
The cache uses a custom format with:
- Magic number: "SC2C" (SteamCache2 Cache)
- Content hash: SHA256 of response body
- Response size: Total HTTP response size
- Raw HTTP response: Complete response as received from upstream
- Header line format: "SC2C <hash> <size>\n"
- Integrity verification on read operations
- Automatic corruption detection and cleanup
## Garbage Collection Algorithms
Available algorithms and their use cases:
- **LRU**: Best for general gaming patterns, keeps recently accessed content
- **LFU**: Good for gaming cafes with popular games
- **FIFO**: Predictable behavior, good for testing
- **Largest**: Maximizes number of cached files
- **Smallest**: Maximizes cache hit rate
- **Hybrid**: Combines access time and file size for optimal performance
## Cache Validation
- Always verify Content-Length matches received data
- Use SHA256 hashing for content integrity
- Don't cache chunked transfer encoding (no Content-Length)
- Reject files with invalid or missing Content-Length
## Request Coalescing
- Multiple clients requesting the same file should share the download
- Use channels and mutexes to coordinate concurrent requests
- Buffer response data for coalesced clients
- Clean up coalesced request structures after completion
## Range Request Support
- Always cache the full file, regardless of Range headers
- Support serving partial content from cached full files
- Parse Range headers correctly (bytes=start-end, bytes=start-, bytes=-suffix)
- Return appropriate HTTP status codes (206 for partial content, 416 for invalid ranges)
## Service Detection
- Use regex patterns to match User-Agent strings
- Support multiple services (Steam, Epic Games, etc.)
- Cache keys include service prefix for isolation
- Default to Steam service configuration
## Memory vs Disk Caching
- Memory cache: Fast access, limited size, use LRU or LFU
- Disk cache: Slower access, large size, use Hybrid or Largest
- Tiered caching: Memory as L1, disk as L2
- Dynamic memory management with configurable thresholds
- Cache promotion: Move frequently accessed files from disk to memory
- Sharded storage: Use directory sharding for Steam keys to reduce inode pressure
- Memory-mapped files: Use mmap for large disk operations
- Batched operations: Group operations for better performance

View File

@@ -0,0 +1,65 @@
---
description: Configuration management patterns
---
# Configuration Management Patterns
## YAML Configuration
- Use YAML format for human-readable configuration
- Provide sensible defaults for all configuration options
- Validate configuration on startup
- Generate default configuration file on first run
## Configuration Structure
- Group related settings in nested structures
- Use descriptive field names with YAML tags
- Provide default values in struct tags where possible
- Use appropriate data types (strings for sizes, ints for limits)
## Size Configuration
- Use human-readable size strings (e.g., "1GB", "512MB")
- Parse sizes using `github.com/docker/go-units`
- Support "0" to disable cache layers
- Validate size limits are reasonable
## Garbage Collection Configuration
- Support multiple GC algorithms per cache layer
- Provide algorithm-specific configuration options
- Allow different algorithms for memory vs disk caches
- Document algorithm characteristics and use cases
## Server Configuration
- Configure listen address and port
- Set concurrency limits (global and per-client)
- Configure upstream server URL
- Support both absolute and relative upstream URLs
## Runtime Configuration
- Allow command-line overrides for critical settings
- Support configuration file path specification
- Provide help and version information
- Validate configuration before starting services
## Default Configuration
- Generate appropriate defaults for different use cases
- Consider system resources when setting defaults
- Provide conservative defaults for home users
- Document configuration options in comments
## Configuration Validation
- Validate required fields are present
- Check that size limits are reasonable
- Verify file paths are accessible
- Test upstream server connectivity
## Configuration Updates
- Support configuration reloading (if needed)
- Handle configuration changes gracefully
- Log configuration changes
- Maintain backward compatibility
## Environment-Specific Configuration
- Support different configurations for development/production
- Allow environment variable overrides
- Provide configuration templates for common scenarios
- Document configuration best practices

View File

@@ -0,0 +1,77 @@
---
description: Development workflow and best practices
---
# Development Workflow for SteamCache2
## Build System
- Use the provided [Makefile](mdc:Makefile) for all build operations
- Prefer `make` commands over direct `go` commands
- Use `make test` to run all tests before committing
- Use `make run-debug` for development with debug logging
## Code Organization
- Keep related functionality in the same package
- Use clear package boundaries and interfaces
- Minimize dependencies between packages
- Follow the existing project structure
## Git Workflow
- Use descriptive commit messages
- Keep commits focused and atomic
- Test changes thoroughly before committing
- Use meaningful branch names
## Code Review
- Review code for correctness and performance
- Check for proper error handling
- Verify test coverage for new functionality
- Ensure code follows project conventions
## Documentation
- Update README.md for user-facing changes
- Add comments for complex algorithms
- Document configuration options
- Keep API documentation current
## Testing Strategy
- Write tests for new functionality
- Maintain high test coverage
- Test edge cases and error conditions
- Run integration tests before major releases
## Performance Testing
- Test with realistic data sizes
- Measure performance impact of changes
- Profile the application under load
- Monitor memory usage and leaks
## Configuration Management
- Test configuration changes thoroughly
- Validate configuration on startup
- Provide sensible defaults
- Document configuration options
## Error Handling
- Implement proper error handling
- Use structured logging for errors
- Provide meaningful error messages
- Handle edge cases gracefully
## Security Considerations
- Validate all inputs
- Implement proper rate limiting
- Log security-relevant events
- Follow security best practices
## Release Process
- Test thoroughly before releasing
- Update version information
- Create release notes
- Tag releases appropriately
## Maintenance
- Monitor application performance
- Update dependencies regularly
- Fix bugs promptly
- Refactor code when needed

View File

@@ -0,0 +1,62 @@
---
globs: *.go
---
# Go Language Conventions for SteamCache2
## Code Style
- Use `gofmt` and `goimports` for formatting
- Follow standard Go naming conventions (camelCase for private, PascalCase for public)
- Use meaningful variable names that reflect their purpose
- Prefer explicit error handling over panic (except in constructors where configuration is invalid)
## Package Organization
- Keep packages focused and cohesive
- Use internal packages for implementation details that shouldn't be exported
- Group related functionality together (e.g., all VFS implementations in `vfs/`)
- Use interface implementation verification: `var _ Interface = (*Implementation)(nil)`
- Create type aliases for backward compatibility when refactoring
- Use separate packages for different concerns (e.g., `vfserror`, `types`, `locks`)
## Error Handling
- Always handle errors explicitly - never ignore them with `_`
- Use `fmt.Errorf` with `%w` verb for error wrapping
- Log errors with context using structured logging (zerolog)
- Return meaningful error messages that help with debugging
- Create custom error types for domain-specific errors (see `vfs/vfserror/`)
- Use `errors.New()` for simple error constants
- Include relevant context in error messages (file paths, operation names)
## Testing
- All tests should run with a timeout (as per user rules)
- Use table-driven tests for multiple test cases
- Use `t.Helper()` in test helper functions
- Test both success and failure cases
- Use `t.TempDir()` for temporary files in tests
## Concurrency
- Use `sync.RWMutex` for read-heavy operations
- Prefer channels over shared memory when possible
- Use `context.Context` for cancellation and timeouts
- Be explicit about goroutine lifecycle management
- Use sharded locks for high-concurrency scenarios (see `vfs/locks/sharding.go`)
- Use `atomic.Value` for lock-free data structure updates
- Use `sync.Map` for concurrent map operations when appropriate
## Performance
- Use `io.ReadAll` sparingly - prefer streaming for large data
- Use connection pooling for HTTP clients
- Implement proper resource cleanup (defer statements)
- Use buffered channels when appropriate
## Logging
- Use structured logging with zerolog
- Include relevant context in log messages (keys, URLs, client IPs)
- Use appropriate log levels (Debug, Info, Warn, Error)
- Avoid logging sensitive information
## Memory Management
- Be mindful of memory usage in caching scenarios
- Use appropriate data structures for the use case
- Implement proper cleanup for long-running services
- Monitor memory usage in production

View File

@@ -0,0 +1,59 @@
---
description: HTTP proxy and server patterns
---
# HTTP Proxy and Server Patterns
## Request Handling
- Only support GET requests (Steam doesn't use other methods)
- Reject non-GET requests with 405 Method Not Allowed
- Handle health checks at "/" endpoint
- Support LanCache heartbeat at "/lancache-heartbeat"
## Upstream Communication
- Use optimized HTTP transport with connection pooling
- Set appropriate timeouts (10s dial, 15s header, 60s total)
- Enable HTTP/2 and keep-alives for better performance
- Use large buffers (64KB) for better throughput
## Response Streaming
- Stream responses directly to clients for better performance
- Support both full file and range request streaming
- Preserve original HTTP headers (excluding hop-by-hop headers)
- Add cache-specific headers (X-LanCache-Status, X-LanCache-Processed-By)
## Error Handling
- Implement retry logic with exponential backoff
- Handle upstream server errors gracefully
- Return appropriate HTTP status codes
- Log errors with sufficient context for debugging
## Concurrency Control
- Use semaphores to limit concurrent requests globally
- Implement per-client rate limiting
- Clean up old client limiters to prevent memory leaks
- Use proper synchronization for shared data structures
## Header Management
- Copy relevant headers from upstream responses
- Exclude hop-by-hop headers (Connection, Keep-Alive, etc.)
- Add cache status headers for monitoring
- Preserve Content-Type and Content-Length headers
## Client IP Detection
- Check X-Forwarded-For header first (for proxy setups)
- Fall back to X-Real-IP header
- Use RemoteAddr as final fallback
- Handle comma-separated IP lists in X-Forwarded-For
## Performance Optimizations
- Set keep-alive headers for better connection reuse
- Use appropriate server timeouts
- Implement request coalescing for duplicate requests
- Use buffered I/O for better performance
## Security Considerations
- Validate request URLs and paths
- Implement rate limiting to prevent abuse
- Log suspicious activity
- Handle malformed requests gracefully

View File

@@ -0,0 +1,87 @@
---
description: Logging and monitoring patterns for SteamCache2
---
# Logging and Monitoring Patterns
## Structured Logging with Zerolog
- Use zerolog for all logging operations
- Include structured fields for better querying and analysis
- Use appropriate log levels: Debug, Info, Warn, Error
- Include timestamps and context in all log messages
- Configure log format (JSON for production, console for development)
## Log Context and Fields
- Always include relevant context in log messages
- Use consistent field names: `client_ip`, `cache_key`, `url`, `service`
- Include operation duration with `Dur()` for performance monitoring
- Log cache hit/miss status for analytics
- Include file sizes and operation counts for monitoring
## Performance Monitoring
- Log request processing times with `zduration` field
- Monitor cache hit/miss ratios
- Track memory and disk usage
- Log garbage collection events and statistics
- Monitor concurrent request counts and limits
## Error Logging
- Log errors with full context and stack traces
- Include relevant request information in error logs
- Use structured error logging with `Err()` field
- Log configuration errors with file paths
- Include upstream server errors with status codes
## Cache Operation Logging
- Log cache hits with key and response time
- Log cache misses with reason and upstream response time
- Log cache corruption detection and cleanup
- Log garbage collection operations and evicted items
- Log cache promotion events (disk to memory)
## Service Detection Logging
- Log service detection results (Steam, Epic, etc.)
- Log User-Agent patterns and matches
- Log service configuration changes
- Log cache key generation for different services
## HTTP Request Logging
- Log incoming requests with method, URL, and client IP
- Log response status codes and sizes
- Log upstream server communication
- Log rate limiting events and client limits
- Log health check and heartbeat requests
## Configuration Logging
- Log configuration loading and validation
- Log default configuration generation
- Log configuration changes and overrides
- Log startup parameters and settings
## Security Event Logging
- Log suspicious request patterns
- Log rate limiting violations
- Log authentication failures (if applicable)
- Log configuration security issues
- Log potential security threats
## System Health Logging
- Log memory usage and fragmentation
- Log disk usage and capacity
- Log connection pool statistics
- Log goroutine counts and lifecycle
- Log system resource utilization
## Log Rotation and Management
- Implement log rotation for long-running services
- Use appropriate log retention policies
- Monitor log file sizes and disk usage
- Configure log levels for different environments
- Use structured logging for log analysis tools
## Monitoring Integration
- Design logs for easy parsing by monitoring tools
- Include metrics that can be scraped by Prometheus
- Use consistent field naming for dashboard creation
- Log events that can trigger alerts
- Include correlation IDs for request tracing

View File

@@ -0,0 +1,71 @@
---
description: Performance optimization guidelines
---
# Performance Optimization Guidelines
## Memory Management
- Use appropriate data structures for the use case
- Implement proper cleanup for long-running services
- Monitor memory usage and implement limits
- Use memory pools for frequently allocated objects
## I/O Optimization
- Use buffered I/O for better performance
- Implement connection pooling for HTTP clients
- Use appropriate buffer sizes (64KB for HTTP)
- Minimize system calls and context switches
## Concurrency Patterns
- Use worker pools for CPU-intensive tasks
- Implement proper backpressure with semaphores
- Use channels for coordination between goroutines
- Avoid excessive goroutine creation
## Caching Strategies
- Use tiered caching (memory + disk) for optimal performance
- Implement intelligent cache eviction policies
- Use cache warming for predictable access patterns
- Monitor cache hit ratios and adjust strategies
## Network Optimization
- Use HTTP/2 when available
- Enable connection keep-alives
- Use appropriate timeouts for different operations
- Implement request coalescing for duplicate requests
## Data Structures
- Choose appropriate data structures for access patterns
- Use sync.RWMutex for read-heavy operations
- Consider lock-free data structures where appropriate
- Minimize memory allocations in hot paths
## Algorithm Selection
- Choose GC algorithms based on access patterns
- Use LRU for general gaming workloads
- Use LFU for gaming cafes with popular content
- Use Hybrid algorithms for mixed workloads
## Monitoring and Profiling
- Implement performance metrics collection
- Use structured logging for performance analysis
- Monitor key performance indicators
- Profile the application under realistic loads
## Resource Management
- Implement proper resource cleanup
- Use context.Context for cancellation
- Set appropriate limits on resource usage
- Monitor resource consumption over time
## Scalability Considerations
- Design for horizontal scaling where possible
- Use sharding for large datasets
- Implement proper load balancing
- Consider distributed caching for large deployments
## Bottleneck Identification
- Profile the application to identify bottlenecks
- Focus optimization efforts on the most critical paths
- Use appropriate tools for performance analysis
- Test performance under realistic conditions

View File

@@ -0,0 +1,57 @@
---
alwaysApply: true
---
# SteamCache2 Project Structure Guide
This is a high-performance Steam download cache written in Go. The main entry point is [main.go](mdc:main.go), which delegates to the command structure in [cmd/](mdc:cmd/).
## Core Architecture
- **Main Entry**: [main.go](mdc:main.go) - Simple entry point that calls `cmd.Execute()`
- **Command Layer**: [cmd/root.go](mdc:cmd/root.go) - CLI interface using Cobra, handles configuration loading and service startup
- **Core Service**: [steamcache/steamcache.go](mdc:steamcache/steamcache.go) - Main HTTP proxy and caching logic
- **Configuration**: [config/config.go](mdc:config/config.go) - YAML-based configuration management
- **Virtual File System**: [vfs/](mdc:vfs/) - Abstracted storage layer supporting memory and disk caches
## Key Components
### VFS (Virtual File System)
- [vfs/vfs.go](mdc:vfs/vfs.go) - Core VFS interface
- [vfs/memory/](mdc:vfs/memory/) - In-memory cache implementation
- [vfs/disk/](mdc:vfs/disk/) - Disk-based cache implementation
- [vfs/cache/](mdc:vfs/cache/) - Cache coordination layer
- [vfs/gc/](mdc:vfs/gc/) - Garbage collection algorithms (LRU, LFU, FIFO, etc.)
### Service Management
- Service detection via User-Agent patterns
- Support for multiple gaming services (Steam, Epic, etc.)
- SHA256-based cache key generation with service prefixes
### Advanced Features
- [vfs/adaptive/](mdc:vfs/adaptive/) - Adaptive caching strategies
- [vfs/predictive/](mdc:vfs/predictive/) - Predictive cache warming
- Request coalescing for concurrent downloads
- Range request support for partial content
## Development Workflow
Use the [Makefile](mdc:Makefile) for development:
- `make` - Run tests and build
- `make test` - Run all tests
- `make run` - Run the application
- `make run-debug` - Run with debug logging
## Testing
- Unit tests: [steamcache/steamcache_test.go](mdc:steamcache/steamcache_test.go)
- Integration tests: [steamcache/integration_test.go](mdc:steamcache/integration_test.go)
- Test cache data: [steamcache/test_cache/](mdc:steamcache/test_cache/)
## Configuration
Default configuration is generated in [config.yaml](mdc:config.yaml) on first run. The application supports:
- Memory and disk cache sizing
- Garbage collection algorithm selection
- Concurrency limits
- Upstream server configuration

View File

@@ -0,0 +1,89 @@
---
description: Security and validation patterns for SteamCache2
---
# Security and Validation Patterns
## Input Validation
- Validate all HTTP request parameters and headers
- Sanitize file paths and cache keys to prevent directory traversal
- Validate URL paths before processing
- Check Content-Length headers for reasonable values
- Reject malformed or suspicious requests
## Cache Key Security
- Use SHA256 hashing for all cache keys to prevent collisions
- Never include user input directly in cache keys
- Strip query parameters from URLs before hashing
- Use service prefixes to isolate different services
- Validate cache key format and length
## Content Integrity
- Always verify Content-Length matches received data
- Use SHA256 hashing for content integrity verification
- Don't cache chunked transfer encoding (no Content-Length)
- Reject files with invalid or missing Content-Length
- Implement cache file format validation with magic numbers
## Rate Limiting and DoS Protection
- Implement global concurrency limits with semaphores
- Use per-client rate limiting to prevent abuse
- Clean up old client limiters to prevent memory leaks
- Set appropriate timeouts for all operations
- Monitor and log suspicious activity
## HTTP Security
- Only support GET requests (Steam doesn't use other methods)
- Validate HTTP method and reject unsupported methods
- Handle malformed HTTP requests gracefully
- Implement proper error responses with appropriate status codes
- Use hop-by-hop header filtering
## Client IP Detection
- Check X-Forwarded-For header for proxy setups
- Fall back to X-Real-IP header
- Use RemoteAddr as final fallback
- Handle comma-separated IP lists in X-Forwarded-For
- Log client IPs for monitoring and debugging
## Service Detection Security
- Use regex patterns for User-Agent matching
- Validate service configurations before use
- Support multiple services with proper isolation
- Default to Steam service configuration
- Log service detection for monitoring
## Error Handling Security
- Don't expose internal system information in error messages
- Log detailed errors for debugging but return generic messages to clients
- Handle errors gracefully without crashing
- Implement proper cleanup on errors
- Use structured logging for security events
## Configuration Security
- Validate configuration values on startup
- Use sensible defaults for security-sensitive settings
- Validate file paths and permissions
- Check upstream server connectivity
- Log configuration changes
## Memory and Resource Security
- Implement memory limits to prevent OOM attacks
- Use proper resource cleanup and garbage collection
- Monitor memory usage and implement alerts
- Use bounded data structures where possible
- Implement proper connection limits
## Logging Security
- Don't log sensitive information (passwords, tokens)
- Use structured logging for security events
- Include relevant context (IPs, URLs, timestamps)
- Implement log rotation and retention policies
- Monitor logs for security issues
## Network Security
- Use HTTPS for upstream connections when possible
- Implement proper TLS configuration
- Use connection pooling with appropriate limits
- Set reasonable timeouts for network operations
- Monitor network traffic for anomalies

View File

@@ -0,0 +1,48 @@
---
alwaysApply: true
---
# SteamCache2 Overview
SteamCache2 is a high-performance HTTP proxy cache specifically designed for Steam game downloads. It reduces bandwidth usage and speeds up downloads by caching game files locally.
## Key Features
- **Tiered Caching**: Memory + disk cache with intelligent promotion
- **Service Detection**: Automatically detects Steam clients via User-Agent
- **Request Coalescing**: Multiple clients share downloads of the same file
- **Range Support**: Serves partial content from cached full files
- **Garbage Collection**: Multiple algorithms (LRU, LFU, FIFO, Hybrid, etc.)
- **Adaptive Caching**: Learns from access patterns for better performance
## Architecture
- **HTTP Proxy**: Intercepts Steam requests and serves from cache when possible
- **VFS Layer**: Abstracted storage supporting memory and disk caches
- **Service Manager**: Handles multiple gaming services (Steam, Epic, etc.)
- **GC System**: Intelligent cache eviction with configurable algorithms
## Development
- **Language**: Go 1.23+
- **Build**: Use `make` commands (see [Makefile](mdc:Makefile))
- **Testing**: Comprehensive unit and integration tests
- **Configuration**: YAML-based with automatic generation
## Performance
- **Concurrency**: Configurable request limits and rate limiting
- **Memory**: Dynamic memory management with configurable thresholds
- **Network**: Optimized HTTP transport with connection pooling
- **Storage**: Efficient cache file format with integrity verification
## Use Cases
- **Gaming Cafes**: Reduce bandwidth costs and improve download speeds
- **LAN Events**: Share game downloads across multiple clients
- **Home Networks**: Speed up game updates for multiple gamers
- **Development**: Test game downloads without hitting Steam servers
## Configuration
Default configuration is generated on first run. Key settings:
- Cache sizes (memory/disk)
- Garbage collection algorithms
- Concurrency limits
- Upstream server configuration
See [config.yaml](mdc:config.yaml) for configuration options and [README.md](mdc:README.md) for detailed setup instructions.

View File

@@ -0,0 +1,78 @@
---
globs: *_test.go
---
# Testing Guidelines for SteamCache2
## Test Structure
- Use table-driven tests for multiple test cases
- Group related tests in the same test function when appropriate
- Use descriptive test names that explain what is being tested
- Include both positive and negative test cases
## Test Data Management
- Use `t.TempDir()` for temporary files and directories
- Clean up resources in defer statements
- Use unique temporary directories for each test to avoid conflicts
- Don't rely on external services in unit tests
## Integration Testing
- Mark integration tests with `testing.Short()` checks
- Use real Steam URLs for integration tests when appropriate
- Test both cache hits and cache misses
- Verify response integrity between direct and cached responses
- Test against actual Steam servers for real-world validation
- Use `httptest.NewServer` for local testing scenarios
- Compare direct vs cached responses byte-for-byte
## Mocking and Stubbing
- Use `httptest.NewServer` for HTTP server mocking
- Create mock responses that match real Steam responses
- Test error conditions and edge cases
- Use `httptest.NewRecorder` for response testing
## Performance Testing
- Test with realistic data sizes
- Measure cache hit/miss ratios
- Test concurrent request handling
- Verify memory usage doesn't grow unbounded
## Cache Testing
- Test cache key generation and uniqueness
- Verify cache file format serialization/deserialization
- Test garbage collection algorithms
- Test cache eviction policies
- Test cache corruption scenarios and recovery
- Verify cache file format integrity (magic numbers, hashes)
- Test range request handling from cached files
- Test request coalescing behavior
## Service Detection Testing
- Test User-Agent pattern matching
- Test service configuration management
- Test cache key generation for different services
- Test service expandability (adding new services)
## Error Handling Testing
- Test network failures and timeouts
- Test malformed requests and responses
- Test cache corruption scenarios
- Test resource exhaustion conditions
## Test Timeouts
- All tests should run with appropriate timeouts
- Use `context.WithTimeout` for long-running operations
- Set reasonable timeouts for network operations
- Fail fast on obvious errors
## Test Coverage
- Aim for high test coverage on critical paths
- Test edge cases and error conditions
- Test concurrent access patterns
- Test resource cleanup and memory management
## Test Documentation
- Document complex test scenarios
- Explain the purpose of integration tests
- Include comments for non-obvious test logic
- Document expected behavior and assumptions

View File

@@ -0,0 +1,72 @@
---
description: VFS (Virtual File System) patterns and architecture
---
# VFS (Virtual File System) Patterns
## Core VFS Interface
- Implement the `vfs.VFS` interface for all storage backends
- Use interface implementation verification: `var _ vfs.VFS = (*Implementation)(nil)`
- Support both memory and disk-based storage with the same interface
- Provide size and capacity information for monitoring
## Tiered Cache Architecture
- Use `vfs/cache/cache.go` for two-tier caching (memory + disk)
- Implement lock-free tier switching with `atomic.Value`
- Prefer disk tier for persistence, memory tier for speed
- Support cache promotion from disk to memory
## Sharded File Systems
- Use sharded directory structures for Steam cache keys
- Implement 2-level sharding: `steam/XX/YY/hash` for optimal performance
- Use `vfs/locks/sharding.go` for sharded locking
- Reduce inode pressure with directory sharding
## Memory Management
- Use `bytes.Buffer` for in-memory file storage
- Implement batched time updates for performance
- Use LRU lists for eviction tracking
- Monitor memory fragmentation and usage
## Disk Storage
- Use memory-mapped files (`mmap`) for large file operations
- Implement efficient file path sharding
- Use batched operations for better I/O performance
- Support concurrent access with proper locking
## Garbage Collection Integration
- Wrap VFS implementations with `vfs/gc/gc.go`
- Support multiple GC algorithms (LRU, LFU, FIFO, etc.)
- Implement async GC with configurable thresholds
- Use eviction functions from `vfs/eviction/eviction.go`
## Performance Optimizations
- Use sharded locks to reduce contention
- Implement batched time updates (100ms intervals)
- Use atomic operations for lock-free updates
- Monitor and log performance metrics
## Error Handling
- Use custom VFS errors from `vfs/vfserror/vfserror.go`
- Handle capacity exceeded scenarios gracefully
- Implement proper cleanup on errors
- Log VFS operations with context
## File Information Management
- Use `vfs/types/types.go` for file metadata
- Track access times, sizes, and other statistics
- Implement efficient file info storage and retrieval
- Support batched metadata updates
## Adaptive and Predictive Features
- Integrate with `vfs/adaptive/adaptive.go` for learning patterns
- Use `vfs/predictive/predictive.go` for cache warming
- Implement intelligent cache promotion strategies
- Monitor access patterns for optimization
## Testing VFS Implementations
- Test with realistic file sizes and access patterns
- Verify concurrent access scenarios
- Test garbage collection behavior
- Validate sharding and path generation
- Test error conditions and edge cases

120
steamcache/errors/errors.go Normal file
View File

@@ -0,0 +1,120 @@
// steamcache/errors/errors.go
package errors
import (
"errors"
"fmt"
"net/http"
)
// Common SteamCache errors
var (
ErrInvalidURL = errors.New("steamcache: invalid URL")
ErrUnsupportedService = errors.New("steamcache: unsupported service")
ErrUpstreamUnavailable = errors.New("steamcache: upstream server unavailable")
ErrCacheCorrupted = errors.New("steamcache: cache file corrupted")
ErrInvalidContentLength = errors.New("steamcache: invalid content length")
ErrRequestTimeout = errors.New("steamcache: request timeout")
ErrRateLimitExceeded = errors.New("steamcache: rate limit exceeded")
ErrInvalidUserAgent = errors.New("steamcache: invalid user agent")
)
// SteamCacheError represents a SteamCache-specific error with context
type SteamCacheError struct {
Op string // Operation that failed
URL string // URL that caused the error
ClientIP string // Client IP address
StatusCode int // HTTP status code if applicable
Err error // Underlying error
Context interface{} // Additional context
}
// Error implements the error interface
func (e *SteamCacheError) Error() string {
if e.URL != "" && e.ClientIP != "" {
return fmt.Sprintf("steamcache: %s failed for URL %q from client %s: %v", e.Op, e.URL, e.ClientIP, e.Err)
}
if e.URL != "" {
return fmt.Sprintf("steamcache: %s failed for URL %q: %v", e.Op, e.URL, e.Err)
}
return fmt.Sprintf("steamcache: %s failed: %v", e.Op, e.Err)
}
// Unwrap returns the underlying error
func (e *SteamCacheError) Unwrap() error {
return e.Err
}
// NewSteamCacheError creates a new SteamCache error with context
func NewSteamCacheError(op, url, clientIP string, err error) *SteamCacheError {
return &SteamCacheError{
Op: op,
URL: url,
ClientIP: clientIP,
Err: err,
}
}
// NewSteamCacheErrorWithStatus creates a new SteamCache error with HTTP status
func NewSteamCacheErrorWithStatus(op, url, clientIP string, statusCode int, err error) *SteamCacheError {
return &SteamCacheError{
Op: op,
URL: url,
ClientIP: clientIP,
StatusCode: statusCode,
Err: err,
}
}
// NewSteamCacheErrorWithContext creates a new SteamCache error with additional context
func NewSteamCacheErrorWithContext(op, url, clientIP string, context interface{}, err error) *SteamCacheError {
return &SteamCacheError{
Op: op,
URL: url,
ClientIP: clientIP,
Context: context,
Err: err,
}
}
// IsRetryableError determines if an error is retryable
func IsRetryableError(err error) bool {
if err == nil {
return false
}
// Check for specific retryable errors
if errors.Is(err, ErrUpstreamUnavailable) ||
errors.Is(err, ErrRequestTimeout) {
return true
}
// Check for HTTP status codes that are retryable
if steamErr, ok := err.(*SteamCacheError); ok {
switch steamErr.StatusCode {
case http.StatusServiceUnavailable,
http.StatusGatewayTimeout,
http.StatusTooManyRequests,
http.StatusInternalServerError:
return true
}
}
return false
}
// IsClientError determines if an error is a client error (4xx)
func IsClientError(err error) bool {
if steamErr, ok := err.(*SteamCacheError); ok {
return steamErr.StatusCode >= 400 && steamErr.StatusCode < 500
}
return false
}
// IsServerError determines if an error is a server error (5xx)
func IsServerError(err error) bool {
if steamErr, ok := err.(*SteamCacheError); ok {
return steamErr.StatusCode >= 500
}
return false
}

View File

@@ -0,0 +1,213 @@
// steamcache/metrics/metrics.go
package metrics
import (
"sync"
"sync/atomic"
"time"
)
// Metrics tracks various performance and operational metrics
type Metrics struct {
// Request metrics
TotalRequests int64
CacheHits int64
CacheMisses int64
CacheCoalesced int64
Errors int64
RateLimited int64
// Performance metrics
TotalResponseTime int64 // in nanoseconds
TotalBytesServed int64
TotalBytesCached int64
// Cache metrics
MemoryCacheSize int64
DiskCacheSize int64
MemoryCacheHits int64
DiskCacheHits int64
// Service metrics
ServiceRequests map[string]int64
serviceMutex sync.RWMutex
// Time tracking
StartTime time.Time
LastResetTime time.Time
}
// NewMetrics creates a new metrics instance
func NewMetrics() *Metrics {
now := time.Now()
return &Metrics{
ServiceRequests: make(map[string]int64),
StartTime: now,
LastResetTime: now,
}
}
// IncrementTotalRequests increments the total request counter
func (m *Metrics) IncrementTotalRequests() {
atomic.AddInt64(&m.TotalRequests, 1)
}
// IncrementCacheHits increments the cache hit counter
func (m *Metrics) IncrementCacheHits() {
atomic.AddInt64(&m.CacheHits, 1)
}
// IncrementCacheMisses increments the cache miss counter
func (m *Metrics) IncrementCacheMisses() {
atomic.AddInt64(&m.CacheMisses, 1)
}
// IncrementCacheCoalesced increments the coalesced request counter
func (m *Metrics) IncrementCacheCoalesced() {
atomic.AddInt64(&m.CacheCoalesced, 1)
}
// IncrementErrors increments the error counter
func (m *Metrics) IncrementErrors() {
atomic.AddInt64(&m.Errors, 1)
}
// IncrementRateLimited increments the rate limited counter
func (m *Metrics) IncrementRateLimited() {
atomic.AddInt64(&m.RateLimited, 1)
}
// AddResponseTime adds response time to the total
func (m *Metrics) AddResponseTime(duration time.Duration) {
atomic.AddInt64(&m.TotalResponseTime, int64(duration))
}
// AddBytesServed adds bytes served to the total
func (m *Metrics) AddBytesServed(bytes int64) {
atomic.AddInt64(&m.TotalBytesServed, bytes)
}
// AddBytesCached adds bytes cached to the total
func (m *Metrics) AddBytesCached(bytes int64) {
atomic.AddInt64(&m.TotalBytesCached, bytes)
}
// SetMemoryCacheSize sets the current memory cache size
func (m *Metrics) SetMemoryCacheSize(size int64) {
atomic.StoreInt64(&m.MemoryCacheSize, size)
}
// SetDiskCacheSize sets the current disk cache size
func (m *Metrics) SetDiskCacheSize(size int64) {
atomic.StoreInt64(&m.DiskCacheSize, size)
}
// IncrementMemoryCacheHits increments memory cache hits
func (m *Metrics) IncrementMemoryCacheHits() {
atomic.AddInt64(&m.MemoryCacheHits, 1)
}
// IncrementDiskCacheHits increments disk cache hits
func (m *Metrics) IncrementDiskCacheHits() {
atomic.AddInt64(&m.DiskCacheHits, 1)
}
// IncrementServiceRequests increments requests for a specific service
func (m *Metrics) IncrementServiceRequests(service string) {
m.serviceMutex.Lock()
defer m.serviceMutex.Unlock()
m.ServiceRequests[service]++
}
// GetServiceRequests returns the number of requests for a service
func (m *Metrics) GetServiceRequests(service string) int64 {
m.serviceMutex.RLock()
defer m.serviceMutex.RUnlock()
return m.ServiceRequests[service]
}
// GetStats returns a snapshot of current metrics
func (m *Metrics) GetStats() *Stats {
totalRequests := atomic.LoadInt64(&m.TotalRequests)
cacheHits := atomic.LoadInt64(&m.CacheHits)
cacheMisses := atomic.LoadInt64(&m.CacheMisses)
var hitRate float64
if totalRequests > 0 {
hitRate = float64(cacheHits) / float64(totalRequests)
}
var avgResponseTime time.Duration
if totalRequests > 0 {
avgResponseTime = time.Duration(atomic.LoadInt64(&m.TotalResponseTime) / totalRequests)
}
m.serviceMutex.RLock()
serviceRequests := make(map[string]int64)
for k, v := range m.ServiceRequests {
serviceRequests[k] = v
}
m.serviceMutex.RUnlock()
return &Stats{
TotalRequests: totalRequests,
CacheHits: cacheHits,
CacheMisses: cacheMisses,
CacheCoalesced: atomic.LoadInt64(&m.CacheCoalesced),
Errors: atomic.LoadInt64(&m.Errors),
RateLimited: atomic.LoadInt64(&m.RateLimited),
HitRate: hitRate,
AvgResponseTime: avgResponseTime,
TotalBytesServed: atomic.LoadInt64(&m.TotalBytesServed),
TotalBytesCached: atomic.LoadInt64(&m.TotalBytesCached),
MemoryCacheSize: atomic.LoadInt64(&m.MemoryCacheSize),
DiskCacheSize: atomic.LoadInt64(&m.DiskCacheSize),
MemoryCacheHits: atomic.LoadInt64(&m.MemoryCacheHits),
DiskCacheHits: atomic.LoadInt64(&m.DiskCacheHits),
ServiceRequests: serviceRequests,
Uptime: time.Since(m.StartTime),
LastResetTime: m.LastResetTime,
}
}
// Reset resets all metrics to zero
func (m *Metrics) Reset() {
atomic.StoreInt64(&m.TotalRequests, 0)
atomic.StoreInt64(&m.CacheHits, 0)
atomic.StoreInt64(&m.CacheMisses, 0)
atomic.StoreInt64(&m.CacheCoalesced, 0)
atomic.StoreInt64(&m.Errors, 0)
atomic.StoreInt64(&m.RateLimited, 0)
atomic.StoreInt64(&m.TotalResponseTime, 0)
atomic.StoreInt64(&m.TotalBytesServed, 0)
atomic.StoreInt64(&m.TotalBytesCached, 0)
atomic.StoreInt64(&m.MemoryCacheHits, 0)
atomic.StoreInt64(&m.DiskCacheHits, 0)
m.serviceMutex.Lock()
m.ServiceRequests = make(map[string]int64)
m.serviceMutex.Unlock()
m.LastResetTime = time.Now()
}
// Stats represents a snapshot of metrics
type Stats struct {
TotalRequests int64
CacheHits int64
CacheMisses int64
CacheCoalesced int64
Errors int64
RateLimited int64
HitRate float64
AvgResponseTime time.Duration
TotalBytesServed int64
TotalBytesCached int64
MemoryCacheSize int64
DiskCacheSize int64
MemoryCacheHits int64
DiskCacheHits int64
ServiceRequests map[string]int64
Uptime time.Duration
LastResetTime time.Time
}

View File

@@ -13,7 +13,9 @@ import (
"net/url" "net/url"
"os" "os"
"regexp" "regexp"
"s1d3sw1ped/steamcache2/steamcache/errors"
"s1d3sw1ped/steamcache2/steamcache/logger" "s1d3sw1ped/steamcache2/steamcache/logger"
"s1d3sw1ped/steamcache2/steamcache/metrics"
"s1d3sw1ped/steamcache2/vfs" "s1d3sw1ped/steamcache2/vfs"
"s1d3sw1ped/steamcache2/vfs/adaptive" "s1d3sw1ped/steamcache2/vfs/adaptive"
"s1d3sw1ped/steamcache2/vfs/cache" "s1d3sw1ped/steamcache2/vfs/cache"
@@ -21,7 +23,6 @@ import (
"s1d3sw1ped/steamcache2/vfs/gc" "s1d3sw1ped/steamcache2/vfs/gc"
"s1d3sw1ped/steamcache2/vfs/memory" "s1d3sw1ped/steamcache2/vfs/memory"
"s1d3sw1ped/steamcache2/vfs/predictive" "s1d3sw1ped/steamcache2/vfs/predictive"
"s1d3sw1ped/steamcache2/vfs/warming"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@@ -361,13 +362,14 @@ func (sc *SteamCache) streamCachedResponse(w http.ResponseWriter, r *http.Reques
w.Write(rangeData) w.Write(rangeData)
logger.Logger.Info(). logger.Logger.Info().
Str("key", cacheKey). Str("cache_key", cacheKey).
Str("url", r.URL.String()). Str("url", r.URL.String()).
Str("host", r.Host). Str("host", r.Host).
Str("client_ip", clientIP). Str("client_ip", clientIP).
Str("status", "HIT"). Str("cache_status", "HIT").
Str("range", fmt.Sprintf("%d-%d/%d", start, end, totalSize)). Str("range", fmt.Sprintf("%d-%d/%d", start, end, totalSize)).
Dur("zduration", time.Since(tstart)). Int64("range_size", end-start+1).
Dur("response_time", time.Since(tstart)).
Msg("cache request") Msg("cache request")
return return
@@ -395,12 +397,13 @@ func (sc *SteamCache) streamCachedResponse(w http.ResponseWriter, r *http.Reques
w.Write(bodyData) w.Write(bodyData)
logger.Logger.Info(). logger.Logger.Info().
Str("key", cacheKey). Str("cache_key", cacheKey).
Str("url", r.URL.String()). Str("url", r.URL.String()).
Str("host", r.Host). Str("host", r.Host).
Str("client_ip", clientIP). Str("client_ip", clientIP).
Str("status", "HIT"). Str("cache_status", "HIT").
Dur("zduration", time.Since(tstart)). Int64("file_size", int64(len(bodyData))).
Dur("response_time", time.Since(tstart)).
Msg("cache request") Msg("cache request")
} }
@@ -496,14 +499,19 @@ func parseRangeHeader(rangeHeader string, totalSize int64) (start, end, total in
} }
// generateURLHash creates a SHA256 hash of the entire URL path for cache key // generateURLHash creates a SHA256 hash of the entire URL path for cache key
func generateURLHash(urlPath string) string { func generateURLHash(urlPath string) (string, error) {
// Validate input to prevent cache key pollution // Validate input to prevent cache key pollution
if urlPath == "" { if urlPath == "" {
return "" return "", errors.NewSteamCacheError("generateURLHash", urlPath, "", errors.ErrInvalidURL)
}
// Additional validation for suspicious patterns
if strings.Contains(urlPath, "..") || strings.Contains(urlPath, "//") {
return "", errors.NewSteamCacheError("generateURLHash", urlPath, "", errors.ErrInvalidURL)
} }
hash := sha256.Sum256([]byte(urlPath)) hash := sha256.Sum256([]byte(urlPath))
return hex.EncodeToString(hash[:]) return hex.EncodeToString(hash[:]), nil
} }
// calculateSHA256 calculates SHA256 hash of the given data // calculateSHA256 calculates SHA256 hash of the given data
@@ -513,6 +521,35 @@ func calculateSHA256(data []byte) string {
return hex.EncodeToString(hasher.Sum(nil)) return hex.EncodeToString(hasher.Sum(nil))
} }
// validateURLPath validates URL path for security concerns
func validateURLPath(urlPath string) error {
if urlPath == "" {
return errors.NewSteamCacheError("validateURLPath", urlPath, "", errors.ErrInvalidURL)
}
// Check for directory traversal attempts
if strings.Contains(urlPath, "..") {
return errors.NewSteamCacheError("validateURLPath", urlPath, "", errors.ErrInvalidURL)
}
// Check for double slashes (potential path manipulation)
if strings.Contains(urlPath, "//") {
return errors.NewSteamCacheError("validateURLPath", urlPath, "", errors.ErrInvalidURL)
}
// Check for suspicious characters
if strings.ContainsAny(urlPath, "<>\"'&") {
return errors.NewSteamCacheError("validateURLPath", urlPath, "", errors.ErrInvalidURL)
}
// Check for reasonable length (prevent DoS)
if len(urlPath) > 2048 {
return errors.NewSteamCacheError("validateURLPath", urlPath, "", errors.ErrInvalidURL)
}
return nil
}
// verifyCompleteFile verifies that we received the complete file by checking Content-Length // verifyCompleteFile verifies that we received the complete file by checking Content-Length
// Returns true if the file is complete, false if it's incomplete (allowing retry) // Returns true if the file is complete, false if it's incomplete (allowing retry)
func (sc *SteamCache) verifyCompleteFile(bodyData []byte, resp *http.Response, urlPath string, cacheKey string) bool { func (sc *SteamCache) verifyCompleteFile(bodyData []byte, resp *http.Response, urlPath string, cacheKey string) bool {
@@ -572,9 +609,20 @@ func (sc *SteamCache) detectService(r *http.Request) (*ServiceConfig, bool) {
// The prefix indicates which service the request came from (detected via User-Agent) // The prefix indicates which service the request came from (detected via User-Agent)
// Input: /depot/1684171/chunk/0016cfc5019b8baa6026aa1cce93e685d6e06c6e, "steam" // Input: /depot/1684171/chunk/0016cfc5019b8baa6026aa1cce93e685d6e06c6e, "steam"
// Output: steam/a1b2c3d4e5f678901234567890123456789012345678901234567890 // Output: steam/a1b2c3d4e5f678901234567890123456789012345678901234567890
func generateServiceCacheKey(urlPath string, servicePrefix string) string { func generateServiceCacheKey(urlPath string, servicePrefix string) (string, error) {
// Validate service prefix
if servicePrefix == "" {
return "", errors.NewSteamCacheError("generateServiceCacheKey", urlPath, "", errors.ErrUnsupportedService)
}
// Generate hash for URL path
hash, err := generateURLHash(urlPath)
if err != nil {
return "", err
}
// Create a SHA256 hash of the entire path for all service client requests // Create a SHA256 hash of the entire path for all service client requests
return servicePrefix + "/" + generateURLHash(urlPath) return servicePrefix + "/" + hash, nil
} }
var hopByHopHeaders = map[string]struct{}{ var hopByHopHeaders = map[string]struct{}{
@@ -781,14 +829,17 @@ type SteamCache struct {
// Adaptive and predictive caching // Adaptive and predictive caching
adaptiveManager *adaptive.AdaptiveCacheManager adaptiveManager *adaptive.AdaptiveCacheManager
predictiveManager *predictive.PredictiveCacheManager predictiveManager *predictive.PredictiveCacheManager
cacheWarmer *warming.CacheWarmer cacheWarmer *predictive.CacheWarmer
lastAccessKey string // Track last accessed key for sequence analysis lastAccessKey string // Track last accessed key for sequence analysis
lastAccessKeyMu sync.RWMutex lastAccessKeyMu sync.RWMutex
adaptiveEnabled bool // Flag to enable/disable adaptive features adaptiveEnabled bool // Flag to enable/disable adaptive features
// Dynamic memory management // Dynamic memory management
memoryMonitor *memory.MemoryMonitor memoryMonitor *memory.MemoryMonitor
dynamicCacheMgr *memory.DynamicCacheManager dynamicCacheMgr *memory.MemoryMonitor
// Metrics
metrics *metrics.Metrics
} }
func New(address string, memorySize string, diskSize string, diskPath, upstream, memoryGC, diskGC string, maxConcurrentRequests int64, maxRequestsPerClient int64) *SteamCache { func New(address string, memorySize string, diskSize string, diskPath, upstream, memoryGC, diskGC string, maxConcurrentRequests int64, maxRequestsPerClient int64) *SteamCache {
@@ -925,17 +976,20 @@ func New(address string, memorySize string, diskSize string, diskPath, upstream,
// Initialize adaptive and predictive caching (lightweight) // Initialize adaptive and predictive caching (lightweight)
adaptiveManager: adaptive.NewAdaptiveCacheManager(5 * time.Minute), // Much longer interval adaptiveManager: adaptive.NewAdaptiveCacheManager(5 * time.Minute), // Much longer interval
predictiveManager: predictive.NewPredictiveCacheManager(), predictiveManager: predictive.NewPredictiveCacheManager(),
cacheWarmer: warming.NewCacheWarmer(c, 2), // Reduced to 2 concurrent warmers cacheWarmer: predictive.NewCacheWarmer(), // Use predictive cache warmer
adaptiveEnabled: true, // Enable by default but can be disabled adaptiveEnabled: true, // Enable by default but can be disabled
// Initialize dynamic memory management // Initialize dynamic memory management
memoryMonitor: memory.NewMemoryMonitor(uint64(memorysize), 10*time.Second, 0.1), // 10% threshold memoryMonitor: memory.NewMemoryMonitor(uint64(memorysize), 10*time.Second, 0.1), // 10% threshold
dynamicCacheMgr: nil, // Will be set after cache creation dynamicCacheMgr: nil, // Will be set after cache creation
// Initialize metrics
metrics: metrics.NewMetrics(),
} }
// Initialize dynamic cache manager if we have memory cache // Initialize dynamic cache manager if we have memory cache
if m != nil && sc.memoryMonitor != nil { if m != nil && sc.memoryMonitor != nil {
sc.dynamicCacheMgr = memory.NewDynamicCacheManager(mgc, uint64(memorysize), sc.memoryMonitor) sc.dynamicCacheMgr = memory.NewMemoryMonitorWithCache(uint64(memorysize), 10*time.Second, 0.1, mgc, uint64(memorysize))
sc.dynamicCacheMgr.Start() sc.dynamicCacheMgr.Start()
sc.memoryMonitor.Start() sc.memoryMonitor.Start()
} }
@@ -1001,21 +1055,44 @@ func (sc *SteamCache) Shutdown() {
sc.wg.Wait() sc.wg.Wait()
} }
// GetMetrics returns current metrics
func (sc *SteamCache) GetMetrics() *metrics.Stats {
// Update cache sizes
if sc.memory != nil {
sc.metrics.SetMemoryCacheSize(sc.memory.Size())
}
if sc.disk != nil {
sc.metrics.SetDiskCacheSize(sc.disk.Size())
}
return sc.metrics.GetStats()
}
// ResetMetrics resets all metrics to zero
func (sc *SteamCache) ResetMetrics() {
sc.metrics.Reset()
}
func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
clientIP := getClientIP(r)
// Set keep-alive headers for better performance // Set keep-alive headers for better performance
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
w.Header().Set("Keep-Alive", "timeout=300, max=1000") w.Header().Set("Keep-Alive", "timeout=300, max=1000")
// Apply global concurrency limit first // Apply global concurrency limit first
if err := sc.requestSemaphore.Acquire(context.Background(), 1); err != nil { if err := sc.requestSemaphore.Acquire(context.Background(), 1); err != nil {
logger.Logger.Warn().Str("client_ip", getClientIP(r)).Msg("Server at capacity, rejecting request") sc.metrics.IncrementRateLimited()
logger.Logger.Warn().Str("client_ip", clientIP).Msg("Server at capacity, rejecting request")
http.Error(w, "Server busy, please try again later", http.StatusServiceUnavailable) http.Error(w, "Server busy, please try again later", http.StatusServiceUnavailable)
return return
} }
defer sc.requestSemaphore.Release(1) defer sc.requestSemaphore.Release(1)
// Track total requests
sc.metrics.IncrementTotalRequests()
// Apply per-client rate limiting // Apply per-client rate limiting
clientIP := getClientIP(r)
clientLimiter := sc.getOrCreateClientLimiter(clientIP) clientLimiter := sc.getOrCreateClientLimiter(clientIP)
if err := clientLimiter.semaphore.Acquire(context.Background(), 1); err != nil { if err := clientLimiter.semaphore.Acquire(context.Background(), 1); err != nil {
@@ -1055,19 +1132,56 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
if r.URL.String() == "/metrics" {
// Return metrics in a simple text format
stats := sc.GetMetrics()
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "# SteamCache2 Metrics\n")
fmt.Fprintf(w, "total_requests %d\n", stats.TotalRequests)
fmt.Fprintf(w, "cache_hits %d\n", stats.CacheHits)
fmt.Fprintf(w, "cache_misses %d\n", stats.CacheMisses)
fmt.Fprintf(w, "cache_coalesced %d\n", stats.CacheCoalesced)
fmt.Fprintf(w, "errors %d\n", stats.Errors)
fmt.Fprintf(w, "rate_limited %d\n", stats.RateLimited)
fmt.Fprintf(w, "hit_rate %.4f\n", stats.HitRate)
fmt.Fprintf(w, "avg_response_time_ms %.2f\n", float64(stats.AvgResponseTime.Nanoseconds())/1e6)
fmt.Fprintf(w, "total_bytes_served %d\n", stats.TotalBytesServed)
fmt.Fprintf(w, "total_bytes_cached %d\n", stats.TotalBytesCached)
fmt.Fprintf(w, "memory_cache_size %d\n", stats.MemoryCacheSize)
fmt.Fprintf(w, "disk_cache_size %d\n", stats.DiskCacheSize)
fmt.Fprintf(w, "uptime_seconds %.2f\n", stats.Uptime.Seconds())
return
}
// Check if this is a request from a supported service // Check if this is a request from a supported service
if service, isSupported := sc.detectService(r); isSupported { if service, isSupported := sc.detectService(r); isSupported {
// trim the query parameters from the URL path // trim the query parameters from the URL path
// this is necessary because the cache key should not include query parameters // this is necessary because the cache key should not include query parameters
urlPath, _, _ := strings.Cut(r.URL.String(), "?") urlPath, _, _ := strings.Cut(r.URL.String(), "?")
// Validate URL path for security
if err := validateURLPath(urlPath); err != nil {
logger.Logger.Warn().
Err(err).
Str("url", urlPath).
Str("client_ip", clientIP).
Msg("Invalid URL path detected")
http.Error(w, "Invalid URL", http.StatusBadRequest)
return
}
tstart := time.Now() tstart := time.Now()
// Generate service cache key: {service}/{hash} (prefix indicates service via User-Agent) // Generate service cache key: {service}/{hash} (prefix indicates service via User-Agent)
cacheKey := generateServiceCacheKey(urlPath, service.Prefix) cacheKey, err := generateServiceCacheKey(urlPath, service.Prefix)
if err != nil {
if cacheKey == "" { logger.Logger.Warn().
logger.Logger.Warn().Str("url", urlPath).Msg("Invalid URL") Err(err).
Str("url", urlPath).
Str("service", service.Name).
Str("client_ip", clientIP).
Msg("Failed to generate cache key")
http.Error(w, "Invalid URL", http.StatusBadRequest) http.Error(w, "Invalid URL", http.StatusBadRequest)
return return
} }
@@ -1111,6 +1225,12 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Cache validation passed - record access for adaptive/predictive analysis // Cache validation passed - record access for adaptive/predictive analysis
sc.recordCacheAccess(cacheKey, int64(len(cachedData))) sc.recordCacheAccess(cacheKey, int64(len(cachedData)))
// Track cache hit metrics
sc.metrics.IncrementCacheHits()
sc.metrics.AddResponseTime(time.Since(tstart))
sc.metrics.AddBytesServed(int64(len(cachedData)))
sc.metrics.IncrementServiceRequests(service.Name)
logger.Logger.Debug(). logger.Logger.Debug().
Str("key", cacheKey). Str("key", cacheKey).
Str("url", urlPath). Str("url", urlPath).
@@ -1176,13 +1296,21 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(coalescedReq.statusCode) w.WriteHeader(coalescedReq.statusCode)
w.Write(responseData) w.Write(responseData)
// Track coalesced cache hit metrics
sc.metrics.IncrementCacheCoalesced()
sc.metrics.AddResponseTime(time.Since(tstart))
sc.metrics.AddBytesServed(int64(len(responseData)))
sc.metrics.IncrementServiceRequests(service.Name)
logger.Logger.Info(). logger.Logger.Info().
Str("key", cacheKey). Str("cache_key", cacheKey).
Str("url", urlPath). Str("url", urlPath).
Str("host", r.Host). Str("host", r.Host).
Str("client_ip", clientIP). Str("client_ip", clientIP).
Str("status", "HIT-COALESCED"). Str("cache_status", "HIT-COALESCED").
Dur("zduration", time.Since(tstart)). Int("waiting_clients", coalescedReq.waitingCount).
Int64("file_size", int64(len(responseData))).
Dur("response_time", time.Since(tstart)).
Msg("cache request") Msg("cache request")
return return
@@ -1360,6 +1488,12 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(resp.StatusCode) w.WriteHeader(resp.StatusCode)
w.Write(bodyData) w.Write(bodyData)
// Track cache miss metrics
sc.metrics.IncrementCacheMisses()
sc.metrics.AddResponseTime(time.Since(tstart))
sc.metrics.AddBytesServed(int64(len(bodyData)))
sc.metrics.IncrementServiceRequests(service.Name)
// Cache the file if validation passed // Cache the file if validation passed
if validationPassed { if validationPassed {
// Verify we received the complete file by checking Content-Length // Verify we received the complete file by checking Content-Length
@@ -1400,6 +1534,8 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Msg("Cache write failed or incomplete - removing corrupted entry") Msg("Cache write failed or incomplete - removing corrupted entry")
sc.vfs.Delete(cachePath) sc.vfs.Delete(cachePath)
} else { } else {
// Track successful cache write
sc.metrics.AddBytesCached(int64(len(cacheData)))
logger.Logger.Debug(). logger.Logger.Debug().
Str("key", cacheKey). Str("key", cacheKey).
Str("url", urlPath). Str("url", urlPath).
@@ -1457,12 +1593,14 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
logger.Logger.Info(). logger.Logger.Info().
Str("key", cacheKey). Str("cache_key", cacheKey).
Str("url", urlPath). Str("url", urlPath).
Str("host", r.Host). Str("host", r.Host).
Str("client_ip", clientIP). Str("client_ip", clientIP).
Str("status", "MISS"). Str("service", service.Name).
Dur("zduration", time.Since(tstart)). Str("cache_status", "MISS").
Int64("file_size", int64(len(bodyData))).
Dur("response_time", time.Since(tstart)).
Msg("cache request") Msg("cache request")
return return
@@ -1535,6 +1673,6 @@ func (sc *SteamCache) recordCacheMiss(key string, size int64) {
// Only trigger warming for very large files to reduce overhead // Only trigger warming for very large files to reduce overhead
if size > 10*1024*1024 { // Only warm files > 10MB if size > 10*1024*1024 { // Only warm files > 10MB
sc.cacheWarmer.RequestWarming(key, 3, "cache_miss", size, "cache_miss_analyzer") sc.cacheWarmer.RequestWarming(key, 3, "cache_miss", size)
} }
} }

View File

@@ -3,20 +3,27 @@ package steamcache
import ( import (
"io" "io"
"os" "s1d3sw1ped/steamcache2/steamcache/errors"
"path/filepath" "s1d3sw1ped/steamcache2/vfs/vfserror"
"strings" "strings"
"testing" "testing"
"time"
) )
func TestCaching(t *testing.T) { func TestCaching(t *testing.T) {
td := t.TempDir() td := t.TempDir()
os.WriteFile(filepath.Join(td, "key2"), []byte("value2"), 0644)
sc := New("localhost:8080", "1G", "1G", td, "", "lru", "lru", 200, 5) sc := New("localhost:8080", "1G", "1G", td, "", "lru", "lru", 200, 5)
w, err := sc.vfs.Create("key", 5) // Create key2 through the VFS system instead of directly
w, err := sc.vfs.Create("key2", 6)
if err != nil {
t.Errorf("Create key2 failed: %v", err)
}
w.Write([]byte("value2"))
w.Close()
w, err = sc.vfs.Create("key", 5)
if err != nil { if err != nil {
t.Errorf("Create failed: %v", err) t.Errorf("Create failed: %v", err)
} }
@@ -82,9 +89,18 @@ func TestCaching(t *testing.T) {
t.Errorf("Total size too large: got %d, want at most 34", sc.vfs.Size()) t.Errorf("Total size too large: got %d, want at most 34", sc.vfs.Size())
} }
// First ensure the file is indexed by opening it
rc, err = sc.vfs.Open("key2")
if err != nil {
t.Errorf("Open key2 failed: %v", err)
}
rc.Close()
// Give promotion goroutine time to complete before deleting
time.Sleep(100 * time.Millisecond)
sc.memory.Delete("key2") sc.memory.Delete("key2")
sc.disk.Delete("key2") // Also delete from disk cache sc.disk.Delete("key2") // Also delete from disk cache
os.Remove(filepath.Join(td, "key2"))
if _, err := sc.vfs.Open("key2"); err == nil { if _, err := sc.vfs.Open("key2"); err == nil {
t.Errorf("Open failed: got nil, want error") t.Errorf("Open failed: got nil, want error")
@@ -150,10 +166,13 @@ func TestURLHashing(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) { t.Run(tc.desc, func(t *testing.T) {
result := generateServiceCacheKey(tc.input, "steam") result, err := generateServiceCacheKey(tc.input, "steam")
if tc.shouldCache { if tc.shouldCache {
// Should return a cache key with "steam/" prefix // Should return a cache key with "steam/" prefix
if err != nil {
t.Errorf("generateServiceCacheKey(%s, \"steam\") returned error: %v", tc.input, err)
}
if !strings.HasPrefix(result, "steam/") { if !strings.HasPrefix(result, "steam/") {
t.Errorf("generateServiceCacheKey(%s, \"steam\") = %s, expected steam/ prefix", tc.input, result) t.Errorf("generateServiceCacheKey(%s, \"steam\") = %s, expected steam/ prefix", tc.input, result)
} }
@@ -162,9 +181,9 @@ func TestURLHashing(t *testing.T) {
t.Errorf("generateServiceCacheKey(%s, \"steam\") length = %d, expected 70", tc.input, len(result)) t.Errorf("generateServiceCacheKey(%s, \"steam\") length = %d, expected 70", tc.input, len(result))
} }
} else { } else {
// Should return empty string for non-Steam URLs // Should return error for invalid URLs
if result != "" { if err == nil {
t.Errorf("generateServiceCacheKey(%s, \"steam\") = %s, expected empty string", tc.input, result) t.Errorf("generateServiceCacheKey(%s, \"steam\") should have returned error", tc.input)
} }
} }
}) })
@@ -308,8 +327,14 @@ func TestServiceManagerExpandability(t *testing.T) {
} }
// Test cache key generation for different services // Test cache key generation for different services
steamKey := generateServiceCacheKey("/depot/123/chunk/abc", "steam") steamKey, err := generateServiceCacheKey("/depot/123/chunk/abc", "steam")
epicKey := generateServiceCacheKey("/epic/123/chunk/abc", "epic") if err != nil {
t.Errorf("Failed to generate Steam cache key: %v", err)
}
epicKey, err := generateServiceCacheKey("/epic/123/chunk/abc", "epic")
if err != nil {
t.Errorf("Failed to generate Epic cache key: %v", err)
}
if !strings.HasPrefix(steamKey, "steam/") { if !strings.HasPrefix(steamKey, "steam/") {
t.Errorf("Steam cache key should start with 'steam/', got: %s", steamKey) t.Errorf("Steam cache key should start with 'steam/', got: %s", steamKey)
@@ -353,4 +378,139 @@ func TestSteamKeySharding(t *testing.T) {
// and be readable, whereas without sharding it might not work correctly // and be readable, whereas without sharding it might not work correctly
} }
// TestURLValidation tests the URL validation function
func TestURLValidation(t *testing.T) {
testCases := []struct {
urlPath string
shouldPass bool
description string
}{
{
urlPath: "/depot/123/chunk/abc",
shouldPass: true,
description: "valid Steam URL",
},
{
urlPath: "/appinfo/456",
shouldPass: true,
description: "valid app info URL",
},
{
urlPath: "",
shouldPass: false,
description: "empty URL",
},
{
urlPath: "/depot/../etc/passwd",
shouldPass: false,
description: "directory traversal attempt",
},
{
urlPath: "/depot//123/chunk/abc",
shouldPass: false,
description: "double slash",
},
{
urlPath: "/depot/123/chunk/abc<script>",
shouldPass: false,
description: "suspicious characters",
},
{
urlPath: strings.Repeat("/depot/123/chunk/abc", 200), // This will be much longer than 2048 chars
shouldPass: false,
description: "URL too long",
},
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
err := validateURLPath(tc.urlPath)
if tc.shouldPass && err != nil {
t.Errorf("validateURLPath(%q) should pass but got error: %v", tc.urlPath, err)
}
if !tc.shouldPass && err == nil {
t.Errorf("validateURLPath(%q) should fail but passed", tc.urlPath)
}
})
}
}
// TestErrorTypes tests the custom error types
func TestErrorTypes(t *testing.T) {
// Test VFS error
vfsErr := vfserror.NewVFSError("test", "key1", vfserror.ErrNotFound)
if vfsErr.Error() == "" {
t.Error("VFS error should have a message")
}
if vfsErr.Unwrap() != vfserror.ErrNotFound {
t.Error("VFS error should unwrap to the underlying error")
}
// Test SteamCache error
scErr := errors.NewSteamCacheError("test", "/test/url", "127.0.0.1", errors.ErrInvalidURL)
if scErr.Error() == "" {
t.Error("SteamCache error should have a message")
}
if scErr.Unwrap() != errors.ErrInvalidURL {
t.Error("SteamCache error should unwrap to the underlying error")
}
// Test retryable error detection
if !errors.IsRetryableError(errors.ErrUpstreamUnavailable) {
t.Error("Upstream unavailable should be retryable")
}
if errors.IsRetryableError(errors.ErrInvalidURL) {
t.Error("Invalid URL should not be retryable")
}
}
// TestMetrics tests the metrics functionality
func TestMetrics(t *testing.T) {
td := t.TempDir()
sc := New("localhost:8080", "1G", "1G", td, "", "lru", "lru", 200, 5)
// Test initial metrics
stats := sc.GetMetrics()
if stats.TotalRequests != 0 {
t.Error("Initial total requests should be 0")
}
if stats.CacheHits != 0 {
t.Error("Initial cache hits should be 0")
}
// Test metrics increment
sc.metrics.IncrementTotalRequests()
sc.metrics.IncrementCacheHits()
sc.metrics.IncrementCacheMisses()
sc.metrics.AddBytesServed(1024)
sc.metrics.IncrementServiceRequests("steam")
stats = sc.GetMetrics()
if stats.TotalRequests != 1 {
t.Error("Total requests should be 1")
}
if stats.CacheHits != 1 {
t.Error("Cache hits should be 1")
}
if stats.CacheMisses != 1 {
t.Error("Cache misses should be 1")
}
if stats.TotalBytesServed != 1024 {
t.Error("Total bytes served should be 1024")
}
if stats.ServiceRequests["steam"] != 1 {
t.Error("Steam service requests should be 1")
}
// Test metrics reset
sc.ResetMetrics()
stats = sc.GetMetrics()
if stats.TotalRequests != 0 {
t.Error("After reset, total requests should be 0")
}
if stats.CacheHits != 0 {
t.Error("After reset, cache hits should be 0")
}
}
// Removed old TestKeyGeneration - replaced with TestURLHashing that uses SHA256 // Removed old TestKeyGeneration - replaced with TestURLHashing that uses SHA256

376
vfs/cache/cache.go vendored
View File

@@ -5,56 +5,47 @@ import (
"io" "io"
"s1d3sw1ped/steamcache2/vfs" "s1d3sw1ped/steamcache2/vfs"
"s1d3sw1ped/steamcache2/vfs/vfserror" "s1d3sw1ped/steamcache2/vfs/vfserror"
"sync"
"sync/atomic" "sync/atomic"
) )
// TieredCache implements a two-tier cache with fast (memory) and slow (disk) storage // TieredCache implements a lock-free two-tier cache for better concurrency
type TieredCache struct { type TieredCache struct {
fast vfs.VFS // Memory cache (fast)
slow vfs.VFS // Disk cache (slow)
mu sync.RWMutex
}
// LockFreeTieredCache implements a lock-free two-tier cache for better concurrency
type LockFreeTieredCache struct {
fast *atomic.Value // Memory cache (fast) - atomic.Value for lock-free access fast *atomic.Value // Memory cache (fast) - atomic.Value for lock-free access
slow *atomic.Value // Disk cache (slow) - atomic.Value for lock-free access slow *atomic.Value // Disk cache (slow) - atomic.Value for lock-free access
} }
// New creates a new tiered cache // New creates a new tiered cache
func New() *TieredCache { func New() *TieredCache {
return &TieredCache{} return &TieredCache{
fast: &atomic.Value{},
slow: &atomic.Value{},
}
} }
// SetFast sets the fast (memory) tier // SetFast sets the fast (memory) tier atomically
func (tc *TieredCache) SetFast(vfs vfs.VFS) { func (tc *TieredCache) SetFast(vfs vfs.VFS) {
tc.mu.Lock() tc.fast.Store(vfs)
defer tc.mu.Unlock()
tc.fast = vfs
} }
// SetSlow sets the slow (disk) tier // SetSlow sets the slow (disk) tier atomically
func (tc *TieredCache) SetSlow(vfs vfs.VFS) { func (tc *TieredCache) SetSlow(vfs vfs.VFS) {
tc.mu.Lock() tc.slow.Store(vfs)
defer tc.mu.Unlock()
tc.slow = vfs
} }
// Create creates a new file, preferring the slow tier for persistence testing // Create creates a new file, preferring the slow tier for persistence
func (tc *TieredCache) Create(key string, size int64) (io.WriteCloser, error) { func (tc *TieredCache) Create(key string, size int64) (io.WriteCloser, error) {
tc.mu.RLock()
defer tc.mu.RUnlock()
// Try slow tier first (disk) for better testability // Try slow tier first (disk) for better testability
if tc.slow != nil { if slow := tc.slow.Load(); slow != nil {
return tc.slow.Create(key, size) if vfs, ok := slow.(vfs.VFS); ok {
return vfs.Create(key, size)
}
} }
// Fall back to fast tier (memory) // Fall back to fast tier (memory)
if tc.fast != nil { if fast := tc.fast.Load(); fast != nil {
return tc.fast.Create(key, size) if vfs, ok := fast.(vfs.VFS); ok {
return vfs.Create(key, size)
}
} }
return nil, vfserror.ErrNotFound return nil, vfserror.ErrNotFound
@@ -62,40 +53,34 @@ func (tc *TieredCache) Create(key string, size int64) (io.WriteCloser, error) {
// Open opens a file, checking fast tier first, then slow tier with promotion // Open opens a file, checking fast tier first, then slow tier with promotion
func (tc *TieredCache) Open(key string) (io.ReadCloser, error) { func (tc *TieredCache) Open(key string) (io.ReadCloser, error) {
tc.mu.RLock()
defer tc.mu.RUnlock()
// Try fast tier first (memory) // Try fast tier first (memory)
if tc.fast != nil { if fast := tc.fast.Load(); fast != nil {
if reader, err := tc.fast.Open(key); err == nil { if vfs, ok := fast.(vfs.VFS); ok {
return reader, nil if reader, err := vfs.Open(key); err == nil {
return reader, nil
}
} }
} }
// Fall back to slow tier (disk) and promote to fast tier // Fall back to slow tier (disk) and promote to fast tier
if tc.slow != nil { if slow := tc.slow.Load(); slow != nil {
reader, err := tc.slow.Open(key) if vfs, ok := slow.(vfs.VFS); ok {
if err != nil { reader, err := vfs.Open(key)
return nil, err if err != nil {
} return nil, err
}
// If we have both tiers, check if we should promote the file to fast tier // If we have both tiers, promote the file to fast tier
if tc.fast != nil { if fast := tc.fast.Load(); fast != nil {
// Check file size before promoting - don't promote if larger than available memory cache space // Create a new reader for promotion to avoid interfering with the returned reader
if info, err := tc.slow.Stat(key); err == nil { promotionReader, err := vfs.Open(key)
availableSpace := tc.fast.Capacity() - tc.fast.Size() if err == nil {
// Only promote if file fits in available space (with 10% buffer for safety) go tc.promoteToFast(key, promotionReader)
if info.Size <= int64(float64(availableSpace)*0.9) {
// Create a new reader for promotion to avoid interfering with the returned reader
promotionReader, err := tc.slow.Open(key)
if err == nil {
go tc.promoteToFast(key, promotionReader)
}
} }
} }
}
return reader, nil return reader, nil
}
} }
return nil, vfserror.ErrNotFound return nil, vfserror.ErrNotFound
@@ -103,22 +88,23 @@ func (tc *TieredCache) Open(key string) (io.ReadCloser, error) {
// Delete removes a file from all tiers // Delete removes a file from all tiers
func (tc *TieredCache) Delete(key string) error { func (tc *TieredCache) Delete(key string) error {
tc.mu.RLock()
defer tc.mu.RUnlock()
var lastErr error var lastErr error
// Delete from fast tier // Delete from fast tier
if tc.fast != nil { if fast := tc.fast.Load(); fast != nil {
if err := tc.fast.Delete(key); err != nil { if vfs, ok := fast.(vfs.VFS); ok {
lastErr = err if err := vfs.Delete(key); err != nil {
lastErr = err
}
} }
} }
// Delete from slow tier // Delete from slow tier
if tc.slow != nil { if slow := tc.slow.Load(); slow != nil {
if err := tc.slow.Delete(key); err != nil { if vfs, ok := slow.(vfs.VFS); ok {
lastErr = err if err := vfs.Delete(key); err != nil {
lastErr = err
}
} }
} }
@@ -127,19 +113,20 @@ func (tc *TieredCache) Delete(key string) error {
// Stat returns file information, checking fast tier first // Stat returns file information, checking fast tier first
func (tc *TieredCache) Stat(key string) (*vfs.FileInfo, error) { func (tc *TieredCache) Stat(key string) (*vfs.FileInfo, error) {
tc.mu.RLock()
defer tc.mu.RUnlock()
// Try fast tier first (memory) // Try fast tier first (memory)
if tc.fast != nil { if fast := tc.fast.Load(); fast != nil {
if info, err := tc.fast.Stat(key); err == nil { if vfs, ok := fast.(vfs.VFS); ok {
return info, nil if info, err := vfs.Stat(key); err == nil {
return info, nil
}
} }
} }
// Fall back to slow tier (disk) // Fall back to slow tier (disk)
if tc.slow != nil { if slow := tc.slow.Load(); slow != nil {
return tc.slow.Stat(key) if vfs, ok := slow.(vfs.VFS); ok {
return vfs.Stat(key)
}
} }
return nil, vfserror.ErrNotFound return nil, vfserror.ErrNotFound
@@ -152,31 +139,39 @@ func (tc *TieredCache) Name() string {
// Size returns the total size across all tiers // Size returns the total size across all tiers
func (tc *TieredCache) Size() int64 { func (tc *TieredCache) Size() int64 {
tc.mu.RLock()
defer tc.mu.RUnlock()
var total int64 var total int64
if tc.fast != nil {
total += tc.fast.Size() if fast := tc.fast.Load(); fast != nil {
if vfs, ok := fast.(vfs.VFS); ok {
total += vfs.Size()
}
} }
if tc.slow != nil {
total += tc.slow.Size() if slow := tc.slow.Load(); slow != nil {
if vfs, ok := slow.(vfs.VFS); ok {
total += vfs.Size()
}
} }
return total return total
} }
// Capacity returns the total capacity across all tiers // Capacity returns the total capacity across all tiers
func (tc *TieredCache) Capacity() int64 { func (tc *TieredCache) Capacity() int64 {
tc.mu.RLock()
defer tc.mu.RUnlock()
var total int64 var total int64
if tc.fast != nil {
total += tc.fast.Capacity() if fast := tc.fast.Load(); fast != nil {
if vfs, ok := fast.(vfs.VFS); ok {
total += vfs.Capacity()
}
} }
if tc.slow != nil {
total += tc.slow.Capacity() if slow := tc.slow.Load(); slow != nil {
if vfs, ok := slow.(vfs.VFS); ok {
total += vfs.Capacity()
}
} }
return total return total
} }
@@ -185,217 +180,8 @@ func (tc *TieredCache) promoteToFast(key string, reader io.ReadCloser) {
defer reader.Close() defer reader.Close()
// Get file info from slow tier to determine size // Get file info from slow tier to determine size
tc.mu.RLock()
var size int64 var size int64
if tc.slow != nil { if slow := tc.slow.Load(); slow != nil {
if info, err := tc.slow.Stat(key); err == nil {
size = info.Size
} else {
tc.mu.RUnlock()
return // Skip promotion if we can't get file info
}
}
tc.mu.RUnlock()
// Check if file fits in available memory cache space
tc.mu.RLock()
if tc.fast != nil {
availableSpace := tc.fast.Capacity() - tc.fast.Size()
// Only promote if file fits in available space (with 10% buffer for safety)
if size > int64(float64(availableSpace)*0.9) {
tc.mu.RUnlock()
return // Skip promotion if file is too large
}
}
tc.mu.RUnlock()
// Read the entire file content
content, err := io.ReadAll(reader)
if err != nil {
return // Skip promotion if read fails
}
// Create the file in fast tier
tc.mu.RLock()
if tc.fast != nil {
writer, err := tc.fast.Create(key, size)
if err == nil {
// Write content to fast tier
writer.Write(content)
writer.Close()
}
}
tc.mu.RUnlock()
}
// NewLockFree creates a new lock-free tiered cache
func NewLockFree() *LockFreeTieredCache {
return &LockFreeTieredCache{
fast: &atomic.Value{},
slow: &atomic.Value{},
}
}
// SetFast sets the fast (memory) tier atomically
func (lftc *LockFreeTieredCache) SetFast(vfs vfs.VFS) {
lftc.fast.Store(vfs)
}
// SetSlow sets the slow (disk) tier atomically
func (lftc *LockFreeTieredCache) SetSlow(vfs vfs.VFS) {
lftc.slow.Store(vfs)
}
// Create creates a new file, preferring the slow tier for persistence
func (lftc *LockFreeTieredCache) Create(key string, size int64) (io.WriteCloser, error) {
// Try slow tier first (disk) for better testability
if slow := lftc.slow.Load(); slow != nil {
if vfs, ok := slow.(vfs.VFS); ok {
return vfs.Create(key, size)
}
}
// Fall back to fast tier (memory)
if fast := lftc.fast.Load(); fast != nil {
if vfs, ok := fast.(vfs.VFS); ok {
return vfs.Create(key, size)
}
}
return nil, vfserror.ErrNotFound
}
// Open opens a file, checking fast tier first, then slow tier with promotion
func (lftc *LockFreeTieredCache) Open(key string) (io.ReadCloser, error) {
// Try fast tier first (memory)
if fast := lftc.fast.Load(); fast != nil {
if vfs, ok := fast.(vfs.VFS); ok {
if reader, err := vfs.Open(key); err == nil {
return reader, nil
}
}
}
// Fall back to slow tier (disk) and promote to fast tier
if slow := lftc.slow.Load(); slow != nil {
if vfs, ok := slow.(vfs.VFS); ok {
reader, err := vfs.Open(key)
if err != nil {
return nil, err
}
// If we have both tiers, promote the file to fast tier
if fast := lftc.fast.Load(); fast != nil {
// Create a new reader for promotion to avoid interfering with the returned reader
promotionReader, err := vfs.Open(key)
if err == nil {
go lftc.promoteToFast(key, promotionReader)
}
}
return reader, nil
}
}
return nil, vfserror.ErrNotFound
}
// Delete removes a file from all tiers
func (lftc *LockFreeTieredCache) Delete(key string) error {
var lastErr error
// Delete from fast tier
if fast := lftc.fast.Load(); fast != nil {
if vfs, ok := fast.(vfs.VFS); ok {
if err := vfs.Delete(key); err != nil {
lastErr = err
}
}
}
// Delete from slow tier
if slow := lftc.slow.Load(); slow != nil {
if vfs, ok := slow.(vfs.VFS); ok {
if err := vfs.Delete(key); err != nil {
lastErr = err
}
}
}
return lastErr
}
// Stat returns file information, checking fast tier first
func (lftc *LockFreeTieredCache) Stat(key string) (*vfs.FileInfo, error) {
// Try fast tier first (memory)
if fast := lftc.fast.Load(); fast != nil {
if vfs, ok := fast.(vfs.VFS); ok {
if info, err := vfs.Stat(key); err == nil {
return info, nil
}
}
}
// Fall back to slow tier (disk)
if slow := lftc.slow.Load(); slow != nil {
if vfs, ok := slow.(vfs.VFS); ok {
return vfs.Stat(key)
}
}
return nil, vfserror.ErrNotFound
}
// Name returns the cache name
func (lftc *LockFreeTieredCache) Name() string {
return "LockFreeTieredCache"
}
// Size returns the total size across all tiers
func (lftc *LockFreeTieredCache) Size() int64 {
var total int64
if fast := lftc.fast.Load(); fast != nil {
if vfs, ok := fast.(vfs.VFS); ok {
total += vfs.Size()
}
}
if slow := lftc.slow.Load(); slow != nil {
if vfs, ok := slow.(vfs.VFS); ok {
total += vfs.Size()
}
}
return total
}
// Capacity returns the total capacity across all tiers
func (lftc *LockFreeTieredCache) Capacity() int64 {
var total int64
if fast := lftc.fast.Load(); fast != nil {
if vfs, ok := fast.(vfs.VFS); ok {
total += vfs.Capacity()
}
}
if slow := lftc.slow.Load(); slow != nil {
if vfs, ok := slow.(vfs.VFS); ok {
total += vfs.Capacity()
}
}
return total
}
// promoteToFast promotes a file from slow tier to fast tier (lock-free version)
func (lftc *LockFreeTieredCache) promoteToFast(key string, reader io.ReadCloser) {
defer reader.Close()
// Get file info from slow tier to determine size
var size int64
if slow := lftc.slow.Load(); slow != nil {
if vfs, ok := slow.(vfs.VFS); ok { if vfs, ok := slow.(vfs.VFS); ok {
if info, err := vfs.Stat(key); err == nil { if info, err := vfs.Stat(key); err == nil {
size = info.Size size = info.Size
@@ -406,7 +192,7 @@ func (lftc *LockFreeTieredCache) promoteToFast(key string, reader io.ReadCloser)
} }
// Check if file fits in available memory cache space // Check if file fits in available memory cache space
if fast := lftc.fast.Load(); fast != nil { if fast := tc.fast.Load(); fast != nil {
if vfs, ok := fast.(vfs.VFS); ok { if vfs, ok := fast.(vfs.VFS); ok {
availableSpace := vfs.Capacity() - vfs.Size() availableSpace := vfs.Capacity() - vfs.Size()
// Only promote if file fits in available space (with 10% buffer for safety) // Only promote if file fits in available space (with 10% buffer for safety)
@@ -423,7 +209,7 @@ func (lftc *LockFreeTieredCache) promoteToFast(key string, reader io.ReadCloser)
} }
// Create the file in fast tier // Create the file in fast tier
if fast := lftc.fast.Load(); fast != nil { if fast := tc.fast.Load(); fast != nil {
if vfs, ok := fast.(vfs.VFS); ok { if vfs, ok := fast.(vfs.VFS); ok {
writer, err := vfs.Create(key, size) writer, err := vfs.Create(key, size)
if err == nil { if err == nil {

View File

@@ -2,17 +2,19 @@
package disk package disk
import ( import (
"container/list"
"fmt" "fmt"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"s1d3sw1ped/steamcache2/steamcache/logger" "s1d3sw1ped/steamcache2/steamcache/logger"
"s1d3sw1ped/steamcache2/vfs" "s1d3sw1ped/steamcache2/vfs"
"s1d3sw1ped/steamcache2/vfs/locks"
"s1d3sw1ped/steamcache2/vfs/lru"
"s1d3sw1ped/steamcache2/vfs/vfserror" "s1d3sw1ped/steamcache2/vfs/vfserror"
"sort" "sort"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/docker/go-units" "github.com/docker/go-units"
@@ -31,55 +33,10 @@ type DiskFS struct {
size int64 size int64
mu sync.RWMutex mu sync.RWMutex
keyLocks []sync.Map // Sharded lock pools for better concurrency keyLocks []sync.Map // Sharded lock pools for better concurrency
LRU *lruList LRU *lru.LRUList[*vfs.FileInfo]
timeUpdater *vfs.BatchedTimeUpdate // Batched time updates for better performance timeUpdater *vfs.BatchedTimeUpdate // Batched time updates for better performance
} }
// Number of lock shards for reducing contention
const numLockShards = 32
// lruList for time-decayed LRU eviction
type lruList struct {
list *list.List
elem map[string]*list.Element
}
func newLruList() *lruList {
return &lruList{
list: list.New(),
elem: make(map[string]*list.Element),
}
}
func (l *lruList) Add(key string, fi *vfs.FileInfo) {
elem := l.list.PushFront(fi)
l.elem[key] = elem
}
func (l *lruList) MoveToFront(key string, timeUpdater *vfs.BatchedTimeUpdate) {
if elem, exists := l.elem[key]; exists {
l.list.MoveToFront(elem)
// Update the FileInfo in the element with new access time
if fi := elem.Value.(*vfs.FileInfo); fi != nil {
fi.UpdateAccessBatched(timeUpdater)
}
}
}
func (l *lruList) Remove(key string) *vfs.FileInfo {
if elem, exists := l.elem[key]; exists {
delete(l.elem, key)
if fi := l.list.Remove(elem).(*vfs.FileInfo); fi != nil {
return fi
}
}
return nil
}
func (l *lruList) Len() int {
return l.list.Len()
}
// shardPath converts a Steam cache key to a sharded directory path to reduce inode pressure // shardPath converts a Steam cache key to a sharded directory path to reduce inode pressure
func (d *DiskFS) shardPath(key string) string { func (d *DiskFS) shardPath(key string) string {
if !strings.HasPrefix(key, "steam/") { if !strings.HasPrefix(key, "steam/") {
@@ -104,43 +61,6 @@ func (d *DiskFS) shardPath(key string) string {
return filepath.Join("steam", shard1, shard2, hashPart) return filepath.Join("steam", shard1, shard2, hashPart)
} }
// extractKeyFromPath reverses the sharding logic to get the original key from a sharded path
func (d *DiskFS) extractKeyFromPath(path string) string {
// Fast path: if no slashes, it's not a sharded path
if !strings.Contains(path, "/") {
return path
}
parts := strings.SplitN(path, "/", 5)
numParts := len(parts)
if numParts >= 4 && parts[0] == "steam" {
lastThree := parts[numParts-3:]
shard1 := lastThree[0]
shard2 := lastThree[1]
filename := lastThree[2]
// Verify sharding is correct
if len(filename) >= 4 && filename[:2] == shard1 && filename[2:4] == shard2 {
return "steam/" + filename
}
}
// Handle single-level sharding for short hashes: steam/shard1/filename
if numParts >= 3 && parts[0] == "steam" {
lastTwo := parts[numParts-2:]
shard1 := lastTwo[0]
filename := lastTwo[1]
if len(filename) >= 2 && filename[:2] == shard1 {
return "steam/" + filename
}
}
// Fallback: return as-is for any unrecognized format
return path
}
// New creates a new DiskFS. // New creates a new DiskFS.
func New(root string, capacity int64) *DiskFS { func New(root string, capacity int64) *DiskFS {
if capacity <= 0 { if capacity <= 0 {
@@ -151,7 +71,7 @@ func New(root string, capacity int64) *DiskFS {
os.MkdirAll(root, 0755) os.MkdirAll(root, 0755)
// Initialize sharded locks // Initialize sharded locks
keyLocks := make([]sync.Map, numLockShards) keyLocks := make([]sync.Map, locks.NumLockShards)
d := &DiskFS{ d := &DiskFS{
root: root, root: root,
@@ -159,7 +79,7 @@ func New(root string, capacity int64) *DiskFS {
capacity: capacity, capacity: capacity,
size: 0, size: 0,
keyLocks: keyLocks, keyLocks: keyLocks,
LRU: newLruList(), LRU: lru.NewLRUList[*vfs.FileInfo](),
timeUpdater: vfs.NewBatchedTimeUpdate(100 * time.Millisecond), // Update time every 100ms timeUpdater: vfs.NewBatchedTimeUpdate(100 * time.Millisecond), // Update time every 100ms
} }
@@ -167,56 +87,15 @@ func New(root string, capacity int64) *DiskFS {
return d return d
} }
// init loads existing files from disk and migrates legacy depot files to sharded structure // init loads existing files from disk with ultra-fast lazy initialization
func (d *DiskFS) init() { func (d *DiskFS) init() {
tstart := time.Now() tstart := time.Now()
var depotFiles []string // Track depot files that need migration // Ultra-fast initialization: only scan directory structure, defer file stats
d.scanDirectoriesOnly()
err := filepath.Walk(d.root, func(npath string, info os.FileInfo, err error) error { // Start background size calculation in a separate goroutine
if err != nil { go d.calculateSizeInBackground()
return err
}
if info.IsDir() {
return nil
}
d.mu.Lock()
// Extract key from sharded path: remove root and convert sharding back
// Handle both "./disk" and "disk" root paths
rootPath := d.root
rootPath = strings.TrimPrefix(rootPath, "./")
relPath := strings.ReplaceAll(npath[len(rootPath)+1:], "\\", "/")
// Extract the original key from the sharded path
k := d.extractKeyFromPath(relPath)
fi := vfs.NewFileInfoFromOS(info, k)
d.info[k] = fi
d.LRU.Add(k, fi)
// Initialize access time with file modification time
fi.UpdateAccessBatched(d.timeUpdater)
d.size += info.Size()
// Track depot files for potential migration
if strings.HasPrefix(relPath, "depot/") {
depotFiles = append(depotFiles, relPath)
}
d.mu.Unlock()
return nil
})
if err != nil {
logger.Logger.Error().Err(err).Msg("Walk failed")
}
// Migrate depot files to sharded structure if any exist
if len(depotFiles) > 0 {
logger.Logger.Info().Int("count", len(depotFiles)).Msg("Found legacy depot files, starting migration")
d.migrateDepotFiles(depotFiles)
}
logger.Logger.Info(). logger.Logger.Info().
Str("name", d.Name()). Str("name", d.Name()).
@@ -228,69 +107,144 @@ func (d *DiskFS) init() {
Msg("init") Msg("init")
} }
// migrateDepotFiles moves legacy depot files to the sharded steam structure // scanDirectoriesOnly performs ultra-fast directory structure scanning without file stats
func (d *DiskFS) migrateDepotFiles(depotFiles []string) { func (d *DiskFS) scanDirectoriesOnly() {
migratedCount := 0 // Just ensure the root directory exists and is accessible
errorCount := 0 // No file scanning during init - files will be discovered on-demand
logger.Logger.Debug().
for _, relPath := range depotFiles { Str("root", d.root).
// Extract the steam key from the depot path Msg("Directory structure scan completed (lazy file discovery enabled)")
steamKey := d.extractKeyFromPath(relPath)
if !strings.HasPrefix(steamKey, "steam/") {
// Skip if we can't extract a proper steam key
errorCount++
continue
}
// Get the source and destination paths
sourcePath := filepath.Join(d.root, relPath)
shardedPath := d.shardPath(steamKey)
destPath := filepath.Join(d.root, shardedPath)
// Create destination directory
destDir := filepath.Dir(destPath)
if err := os.MkdirAll(destDir, 0755); err != nil {
logger.Logger.Error().Err(err).Str("path", destDir).Msg("Failed to create migration destination directory")
errorCount++
continue
}
// Move the file
if err := os.Rename(sourcePath, destPath); err != nil {
logger.Logger.Error().Err(err).Str("from", sourcePath).Str("to", destPath).Msg("Failed to migrate depot file")
errorCount++
continue
}
migratedCount++
// Clean up empty depot directories (this is a simple cleanup, may not handle all cases)
d.cleanupEmptyDepotDirs(filepath.Dir(sourcePath))
}
logger.Logger.Info().
Int("migrated", migratedCount).
Int("errors", errorCount).
Msg("Depot file migration completed")
} }
// cleanupEmptyDepotDirs removes empty depot directories after migration // calculateSizeInBackground calculates the total size of all files in the background
func (d *DiskFS) cleanupEmptyDepotDirs(dirPath string) { func (d *DiskFS) calculateSizeInBackground() {
for dirPath != d.root && strings.HasPrefix(dirPath, filepath.Join(d.root, "depot")) { tstart := time.Now()
entries, err := os.ReadDir(dirPath)
if err != nil || len(entries) > 0 {
break
}
// Directory is empty, remove it // Channel for collecting file information
if err := os.Remove(dirPath); err != nil { fileChan := make(chan fileSizeInfo, 1000)
logger.Logger.Error().Err(err).Str("dir", dirPath).Msg("Failed to remove empty depot directory")
break
}
// Move up to parent directory // Progress tracking
dirPath = filepath.Dir(dirPath) var totalFiles int64
var processedFiles int64
progressTicker := time.NewTicker(2 * time.Second)
defer progressTicker.Stop()
// Wait group for workers
var wg sync.WaitGroup
// Start directory scanner
wg.Add(1)
go func() {
defer wg.Done()
defer close(fileChan)
d.scanFilesForSize(d.root, fileChan, &totalFiles)
}()
// Collect results with progress reporting
var totalSize int64
// Use a separate goroutine to collect results
done := make(chan struct{})
go func() {
defer close(done)
for {
select {
case fi, ok := <-fileChan:
if !ok {
return
}
totalSize += fi.size
processedFiles++
case <-progressTicker.C:
if totalFiles > 0 {
logger.Logger.Debug().
Int64("processed", processedFiles).
Int64("total", totalFiles).
Int64("size", totalSize).
Float64("progress", float64(processedFiles)/float64(totalFiles)*100).
Msg("Background size calculation progress")
}
}
}
}()
// Wait for scanning to complete
wg.Wait()
<-done
// Update the total size
d.mu.Lock()
d.size = totalSize
d.mu.Unlock()
logger.Logger.Info().
Int64("files_scanned", processedFiles).
Int64("total_size", totalSize).
Str("duration", time.Since(tstart).String()).
Msg("Background size calculation completed")
}
// fileSizeInfo represents a file found during size calculation
type fileSizeInfo struct {
size int64
}
// scanFilesForSize performs recursive file scanning for size calculation only
func (d *DiskFS) scanFilesForSize(dirPath string, fileChan chan<- fileSizeInfo, totalFiles *int64) {
// Use ReadDir for faster directory listing
entries, err := os.ReadDir(dirPath)
if err != nil {
return
} }
// Count files first for progress tracking
fileCount := 0
for _, entry := range entries {
if !entry.IsDir() {
fileCount++
}
}
atomic.AddInt64(totalFiles, int64(fileCount))
// Process entries concurrently with limited workers
semaphore := make(chan struct{}, 16) // More workers for size calculation
var wg sync.WaitGroup
for _, entry := range entries {
entryPath := filepath.Join(dirPath, entry.Name())
if entry.IsDir() {
// Recursively scan subdirectories
wg.Add(1)
go func(path string) {
defer wg.Done()
semaphore <- struct{}{} // Acquire semaphore
defer func() { <-semaphore }() // Release semaphore
d.scanFilesForSize(path, fileChan, totalFiles)
}(entryPath)
} else {
// Process file for size only
wg.Add(1)
go func(entry os.DirEntry) {
defer wg.Done()
semaphore <- struct{}{} // Acquire semaphore
defer func() { <-semaphore }() // Release semaphore
// Get file info for size calculation
info, err := entry.Info()
if err != nil {
return
}
// Send file size info
fileChan <- fileSizeInfo{
size: info.Size(),
}
}(entry)
}
}
wg.Wait()
} }
// Name returns the name of this VFS // Name returns the name of this VFS
@@ -310,24 +264,9 @@ func (d *DiskFS) Capacity() int64 {
return d.capacity return d.capacity
} }
// getShardIndex returns the shard index for a given key
func getShardIndex(key string) int {
// Use FNV-1a hash for good distribution
var h uint32 = 2166136261 // FNV offset basis
for i := 0; i < len(key); i++ {
h ^= uint32(key[i])
h *= 16777619 // FNV prime
}
return int(h % numLockShards)
}
// getKeyLock returns a lock for the given key using sharding // getKeyLock returns a lock for the given key using sharding
func (d *DiskFS) getKeyLock(key string) *sync.RWMutex { func (d *DiskFS) getKeyLock(key string) *sync.RWMutex {
shardIndex := getShardIndex(key) return locks.GetKeyLock(d.keyLocks, key)
shard := &d.keyLocks[shardIndex]
keyLock, _ := shard.LoadOrStore(key, &sync.RWMutex{})
return keyLock.(*sync.RWMutex)
} }
// Create creates a new file // Create creates a new file
@@ -379,6 +318,7 @@ func (d *DiskFS) Create(key string, size int64) (io.WriteCloser, error) {
d.LRU.Add(key, fi) d.LRU.Add(key, fi)
// Initialize access time with current time // Initialize access time with current time
fi.UpdateAccessBatched(d.timeUpdater) fi.UpdateAccessBatched(d.timeUpdater)
// Add to size for new files (not discovered files)
d.size += size d.size += size
d.mu.Unlock() d.mu.Unlock()
@@ -424,7 +364,7 @@ func (dwc *diskWriteCloser) Close() error {
return dwc.file.Close() return dwc.file.Close()
} }
// Open opens a file for reading // Open opens a file for reading with lazy discovery
func (d *DiskFS) Open(key string) (io.ReadCloser, error) { func (d *DiskFS) Open(key string) (io.ReadCloser, error) {
if key == "" { if key == "" {
return nil, vfserror.ErrInvalidKey return nil, vfserror.ErrInvalidKey
@@ -440,16 +380,22 @@ func (d *DiskFS) Open(key string) (io.ReadCloser, error) {
return nil, vfserror.ErrInvalidKey return nil, vfserror.ErrInvalidKey
} }
keyMu := d.getKeyLock(key) // First, try to get the file info
keyMu.RLock() d.mu.RLock()
defer keyMu.RUnlock()
d.mu.Lock()
fi, exists := d.info[key] fi, exists := d.info[key]
d.mu.RUnlock()
if !exists { if !exists {
d.mu.Unlock() // Try lazy discovery
return nil, vfserror.ErrNotFound var err error
fi, err = d.Stat(key)
if err != nil {
return nil, err
}
} }
// Update access time and LRU
d.mu.Lock()
fi.UpdateAccessBatched(d.timeUpdater) fi.UpdateAccessBatched(d.timeUpdater)
d.LRU.MoveToFront(key, d.timeUpdater) d.LRU.MoveToFront(key, d.timeUpdater)
d.mu.Unlock() d.mu.Unlock()
@@ -550,7 +496,7 @@ func (d *DiskFS) Delete(key string) error {
return nil return nil
} }
// Stat returns file information // Stat returns file information with lazy discovery
func (d *DiskFS) Stat(key string) (*vfs.FileInfo, error) { func (d *DiskFS) Stat(key string) (*vfs.FileInfo, error) {
if key == "" { if key == "" {
return nil, vfserror.ErrInvalidKey return nil, vfserror.ErrInvalidKey
@@ -560,30 +506,49 @@ func (d *DiskFS) Stat(key string) (*vfs.FileInfo, error) {
} }
keyMu := d.getKeyLock(key) keyMu := d.getKeyLock(key)
// First, try to get the file info with read lock
keyMu.RLock() keyMu.RLock()
defer keyMu.RUnlock()
d.mu.RLock() d.mu.RLock()
defer d.mu.RUnlock()
if fi, ok := d.info[key]; ok { if fi, ok := d.info[key]; ok {
d.mu.RUnlock()
keyMu.RUnlock()
return fi, nil return fi, nil
} }
d.mu.RUnlock()
keyMu.RUnlock()
// Check if file exists on disk but wasn't indexed (for migration) // Lazy discovery: check if file exists on disk and index it
shardedPath := d.shardPath(key) shardedPath := d.shardPath(key)
path := filepath.Join(d.root, shardedPath) path := filepath.Join(d.root, shardedPath)
path = strings.ReplaceAll(path, "\\", "/") path = strings.ReplaceAll(path, "\\", "/")
if info, err := os.Stat(path); err == nil { info, err := os.Stat(path)
// File exists in sharded location but not indexed, re-index it if err != nil {
fi := vfs.NewFileInfoFromOS(info, key) return nil, vfserror.ErrNotFound
// We can't modify the map here because we're in a read lock }
// This is a simplified version - in production you'd need to handle this properly
// File exists, add it to the index with write lock
keyMu.Lock()
defer keyMu.Unlock()
// Double-check after acquiring write lock
d.mu.Lock()
if fi, ok := d.info[key]; ok {
d.mu.Unlock()
return fi, nil return fi, nil
} }
return nil, vfserror.ErrNotFound // Create and add file info
fi := vfs.NewFileInfoFromOS(info, key)
d.info[key] = fi
d.LRU.Add(key, fi)
fi.UpdateAccessBatched(d.timeUpdater)
// Note: Don't add to d.size here as it's being calculated in background
// The background calculation will handle the total size
d.mu.Unlock()
return fi, nil
} }
// EvictLRU evicts the least recently used files to free up space // EvictLRU evicts the least recently used files to free up space
@@ -596,7 +561,7 @@ func (d *DiskFS) EvictLRU(bytesNeeded uint) uint {
// Evict from LRU list until we free enough space // Evict from LRU list until we free enough space
for d.size > d.capacity-int64(bytesNeeded) && d.LRU.Len() > 0 { for d.size > d.capacity-int64(bytesNeeded) && d.LRU.Len() > 0 {
// Get the least recently used item // Get the least recently used item
elem := d.LRU.list.Back() elem := d.LRU.Back()
if elem == nil { if elem == nil {
break break
} }
@@ -625,7 +590,7 @@ func (d *DiskFS) EvictLRU(bytesNeeded uint) uint {
evicted += uint(fi.Size) evicted += uint(fi.Size)
// Clean up key lock // Clean up key lock
shardIndex := getShardIndex(key) shardIndex := locks.GetShardIndex(key)
d.keyLocks[shardIndex].Delete(key) d.keyLocks[shardIndex].Delete(key)
} }
@@ -681,7 +646,7 @@ func (d *DiskFS) EvictBySize(bytesNeeded uint, ascending bool) uint {
evicted += uint(fi.Size) evicted += uint(fi.Size)
// Clean up key lock // Clean up key lock
shardIndex := getShardIndex(key) shardIndex := locks.GetShardIndex(key)
d.keyLocks[shardIndex].Delete(key) d.keyLocks[shardIndex].Delete(key)
} }
@@ -734,7 +699,7 @@ func (d *DiskFS) EvictFIFO(bytesNeeded uint) uint {
evicted += uint(fi.Size) evicted += uint(fi.Size)
// Clean up key lock // Clean up key lock
shardIndex := getShardIndex(key) shardIndex := locks.GetShardIndex(key)
d.keyLocks[shardIndex].Delete(key) d.keyLocks[shardIndex].Delete(key)
} }

110
vfs/eviction/eviction.go Normal file
View File

@@ -0,0 +1,110 @@
package eviction
import (
"s1d3sw1ped/steamcache2/vfs"
"s1d3sw1ped/steamcache2/vfs/disk"
"s1d3sw1ped/steamcache2/vfs/memory"
)
// EvictionStrategy defines different eviction strategies
type EvictionStrategy string
const (
StrategyLRU EvictionStrategy = "lru"
StrategyLFU EvictionStrategy = "lfu"
StrategyFIFO EvictionStrategy = "fifo"
StrategyLargest EvictionStrategy = "largest"
StrategySmallest EvictionStrategy = "smallest"
StrategyHybrid EvictionStrategy = "hybrid"
)
// EvictLRU performs LRU eviction by removing least recently used files
func EvictLRU(v vfs.VFS, bytesNeeded uint) uint {
switch fs := v.(type) {
case *memory.MemoryFS:
return fs.EvictLRU(bytesNeeded)
case *disk.DiskFS:
return fs.EvictLRU(bytesNeeded)
default:
return 0
}
}
// EvictFIFO performs FIFO (First In First Out) eviction
func EvictFIFO(v vfs.VFS, bytesNeeded uint) uint {
switch fs := v.(type) {
case *memory.MemoryFS:
return fs.EvictFIFO(bytesNeeded)
case *disk.DiskFS:
return fs.EvictFIFO(bytesNeeded)
default:
return 0
}
}
// EvictBySizeAsc evicts smallest files first
func EvictBySizeAsc(v vfs.VFS, bytesNeeded uint) uint {
switch fs := v.(type) {
case *memory.MemoryFS:
return fs.EvictBySize(bytesNeeded, true) // true = ascending (smallest first)
case *disk.DiskFS:
return fs.EvictBySize(bytesNeeded, true) // true = ascending (smallest first)
default:
return 0
}
}
// EvictBySizeDesc evicts largest files first
func EvictBySizeDesc(v vfs.VFS, bytesNeeded uint) uint {
switch fs := v.(type) {
case *memory.MemoryFS:
return fs.EvictBySize(bytesNeeded, false) // false = descending (largest first)
case *disk.DiskFS:
return fs.EvictBySize(bytesNeeded, false) // false = descending (largest first)
default:
return 0
}
}
// EvictLargest evicts largest files first
func EvictLargest(v vfs.VFS, bytesNeeded uint) uint {
return EvictBySizeDesc(v, bytesNeeded)
}
// EvictSmallest evicts smallest files first
func EvictSmallest(v vfs.VFS, bytesNeeded uint) uint {
return EvictBySizeAsc(v, bytesNeeded)
}
// EvictLFU performs LFU (Least Frequently Used) eviction
func EvictLFU(v vfs.VFS, bytesNeeded uint) uint {
// For now, fall back to size-based eviction
// TODO: Implement proper LFU tracking
return EvictBySizeAsc(v, bytesNeeded)
}
// EvictHybrid implements a hybrid eviction strategy
func EvictHybrid(v vfs.VFS, bytesNeeded uint) uint {
// Use LRU as primary strategy, but consider size as tiebreaker
return EvictLRU(v, bytesNeeded)
}
// GetEvictionFunction returns the eviction function for the given strategy
func GetEvictionFunction(strategy EvictionStrategy) func(vfs.VFS, uint) uint {
switch strategy {
case StrategyLRU:
return EvictLRU
case StrategyLFU:
return EvictLFU
case StrategyFIFO:
return EvictFIFO
case StrategyLargest:
return EvictLargest
case StrategySmallest:
return EvictSmallest
case StrategyHybrid:
return EvictHybrid
default:
return EvictLRU
}
}

View File

@@ -5,8 +5,7 @@ import (
"context" "context"
"io" "io"
"s1d3sw1ped/steamcache2/vfs" "s1d3sw1ped/steamcache2/vfs"
"s1d3sw1ped/steamcache2/vfs/disk" "s1d3sw1ped/steamcache2/vfs/eviction"
"s1d3sw1ped/steamcache2/vfs/memory"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -38,45 +37,14 @@ func New(wrappedVFS vfs.VFS, algorithm GCAlgorithm) *GCFS {
algorithm: algorithm, algorithm: algorithm,
} }
switch algorithm { gcfs.gcFunc = eviction.GetEvictionFunction(eviction.EvictionStrategy(algorithm))
case LRU:
gcfs.gcFunc = gcLRU
case LFU:
gcfs.gcFunc = gcLFU
case FIFO:
gcfs.gcFunc = gcFIFO
case Largest:
gcfs.gcFunc = gcLargest
case Smallest:
gcfs.gcFunc = gcSmallest
case Hybrid:
gcfs.gcFunc = gcHybrid
default:
// Default to LRU
gcfs.gcFunc = gcLRU
}
return gcfs return gcfs
} }
// GetGCAlgorithm returns the GC function for the given algorithm // GetGCAlgorithm returns the GC function for the given algorithm
func GetGCAlgorithm(algorithm GCAlgorithm) func(vfs.VFS, uint) uint { func GetGCAlgorithm(algorithm GCAlgorithm) func(vfs.VFS, uint) uint {
switch algorithm { return eviction.GetEvictionFunction(eviction.EvictionStrategy(algorithm))
case LRU:
return gcLRU
case LFU:
return gcLFU
case FIFO:
return gcFIFO
case Largest:
return gcLargest
case Smallest:
return gcSmallest
case Hybrid:
return gcHybrid
default:
return gcLRU
}
} }
// Create wraps the underlying Create method // Create wraps the underlying Create method
@@ -125,119 +93,6 @@ type EvictionStrategy interface {
Evict(vfs vfs.VFS, bytesNeeded uint) uint Evict(vfs vfs.VFS, bytesNeeded uint) uint
} }
// GC functions
// gcLRU implements Least Recently Used eviction
func gcLRU(v vfs.VFS, bytesNeeded uint) uint {
return evictLRU(v, bytesNeeded)
}
// gcLFU implements Least Frequently Used eviction
func gcLFU(v vfs.VFS, bytesNeeded uint) uint {
return evictLFU(v, bytesNeeded)
}
// gcFIFO implements First In First Out eviction
func gcFIFO(v vfs.VFS, bytesNeeded uint) uint {
return evictFIFO(v, bytesNeeded)
}
// gcLargest implements largest file first eviction
func gcLargest(v vfs.VFS, bytesNeeded uint) uint {
return evictLargest(v, bytesNeeded)
}
// gcSmallest implements smallest file first eviction
func gcSmallest(v vfs.VFS, bytesNeeded uint) uint {
return evictSmallest(v, bytesNeeded)
}
// gcHybrid implements a hybrid eviction strategy
func gcHybrid(v vfs.VFS, bytesNeeded uint) uint {
return evictHybrid(v, bytesNeeded)
}
// evictLRU performs LRU eviction by removing least recently used files
func evictLRU(v vfs.VFS, bytesNeeded uint) uint {
// Try to use specific eviction methods if available
switch fs := v.(type) {
case *memory.MemoryFS:
return fs.EvictLRU(bytesNeeded)
case *disk.DiskFS:
return fs.EvictLRU(bytesNeeded)
default:
// No fallback - return 0 (no eviction performed)
return 0
}
}
// evictLFU performs LFU (Least Frequently Used) eviction
func evictLFU(v vfs.VFS, bytesNeeded uint) uint {
// For now, fall back to size-based eviction
// TODO: Implement proper LFU tracking
return evictBySize(v, bytesNeeded)
}
// evictFIFO performs FIFO (First In First Out) eviction
func evictFIFO(v vfs.VFS, bytesNeeded uint) uint {
switch fs := v.(type) {
case *memory.MemoryFS:
return fs.EvictFIFO(bytesNeeded)
case *disk.DiskFS:
return fs.EvictFIFO(bytesNeeded)
default:
// No fallback - return 0 (no eviction performed)
return 0
}
}
// evictLargest evicts largest files first
func evictLargest(v vfs.VFS, bytesNeeded uint) uint {
return evictBySizeDesc(v, bytesNeeded)
}
// evictSmallest evicts smallest files first
func evictSmallest(v vfs.VFS, bytesNeeded uint) uint {
return evictBySizeAsc(v, bytesNeeded)
}
// evictBySize evicts files based on size (smallest first)
func evictBySize(v vfs.VFS, bytesNeeded uint) uint {
return evictBySizeAsc(v, bytesNeeded)
}
// evictBySizeAsc evicts smallest files first
func evictBySizeAsc(v vfs.VFS, bytesNeeded uint) uint {
switch fs := v.(type) {
case *memory.MemoryFS:
return fs.EvictBySize(bytesNeeded, true) // true = ascending (smallest first)
case *disk.DiskFS:
return fs.EvictBySize(bytesNeeded, true) // true = ascending (smallest first)
default:
// No fallback - return 0 (no eviction performed)
return 0
}
}
// evictBySizeDesc evicts largest files first
func evictBySizeDesc(v vfs.VFS, bytesNeeded uint) uint {
switch fs := v.(type) {
case *memory.MemoryFS:
return fs.EvictBySize(bytesNeeded, false) // false = descending (largest first)
case *disk.DiskFS:
return fs.EvictBySize(bytesNeeded, false) // false = descending (largest first)
default:
// No fallback - return 0 (no eviction performed)
return 0
}
}
// evictHybrid implements a hybrid eviction strategy
func evictHybrid(v vfs.VFS, bytesNeeded uint) uint {
// Use LRU as primary strategy, but consider size as tiebreaker
return evictLRU(v, bytesNeeded)
}
// AdaptivePromotionDeciderFunc is a placeholder for the adaptive promotion logic // AdaptivePromotionDeciderFunc is a placeholder for the adaptive promotion logic
var AdaptivePromotionDeciderFunc = func() interface{} { var AdaptivePromotionDeciderFunc = func() interface{} {
return nil return nil

28
vfs/locks/sharding.go Normal file
View File

@@ -0,0 +1,28 @@
package locks
import (
"sync"
)
// Number of lock shards for reducing contention
const NumLockShards = 32
// GetShardIndex returns the shard index for a given key using FNV-1a hash
func GetShardIndex(key string) int {
// Use FNV-1a hash for good distribution
var h uint32 = 2166136261 // FNV offset basis
for i := 0; i < len(key); i++ {
h ^= uint32(key[i])
h *= 16777619 // FNV prime
}
return int(h % NumLockShards)
}
// GetKeyLock returns a lock for the given key using sharding
func GetKeyLock(keyLocks []sync.Map, key string) *sync.RWMutex {
shardIndex := GetShardIndex(key)
shard := &keyLocks[shardIndex]
keyLock, _ := shard.LoadOrStore(key, &sync.RWMutex{})
return keyLock.(*sync.RWMutex)
}

66
vfs/lru/lru.go Normal file
View File

@@ -0,0 +1,66 @@
package lru
import (
"container/list"
"s1d3sw1ped/steamcache2/vfs/types"
)
// LRUList represents a least recently used list for cache eviction
type LRUList[T any] struct {
list *list.List
elem map[string]*list.Element
}
// NewLRUList creates a new LRU list
func NewLRUList[T any]() *LRUList[T] {
return &LRUList[T]{
list: list.New(),
elem: make(map[string]*list.Element),
}
}
// Add adds an item to the front of the LRU list
func (l *LRUList[T]) Add(key string, item T) {
elem := l.list.PushFront(item)
l.elem[key] = elem
}
// MoveToFront moves an item to the front of the LRU list
func (l *LRUList[T]) MoveToFront(key string, timeUpdater *types.BatchedTimeUpdate) {
if elem, exists := l.elem[key]; exists {
l.list.MoveToFront(elem)
// Update the FileInfo in the element with new access time
if fi, ok := any(elem.Value).(interface {
UpdateAccessBatched(*types.BatchedTimeUpdate)
}); ok {
fi.UpdateAccessBatched(timeUpdater)
}
}
}
// Remove removes an item from the LRU list
func (l *LRUList[T]) Remove(key string) (T, bool) {
if elem, exists := l.elem[key]; exists {
delete(l.elem, key)
if item, ok := l.list.Remove(elem).(T); ok {
return item, true
}
}
var zero T
return zero, false
}
// Len returns the number of items in the LRU list
func (l *LRUList[T]) Len() int {
return l.list.Len()
}
// Back returns the least recently used item (at the back of the list)
func (l *LRUList[T]) Back() *list.Element {
return l.list.Back()
}
// Front returns the most recently used item (at the front of the list)
func (l *LRUList[T]) Front() *list.Element {
return l.list.Front()
}

View File

@@ -1,130 +0,0 @@
package memory
import (
"s1d3sw1ped/steamcache2/vfs"
"sync"
"sync/atomic"
"time"
)
// DynamicCacheManager manages cache size adjustments based on system memory usage
type DynamicCacheManager struct {
originalCacheSize uint64
currentCacheSize uint64
memoryMonitor *MemoryMonitor
cache vfs.VFS
adjustmentInterval time.Duration
lastAdjustment time.Time
mu sync.RWMutex
adjustmentCount int64
isAdjusting int32
}
// NewDynamicCacheManager creates a new dynamic cache manager
func NewDynamicCacheManager(cache vfs.VFS, originalSize uint64, memoryMonitor *MemoryMonitor) *DynamicCacheManager {
return &DynamicCacheManager{
originalCacheSize: originalSize,
currentCacheSize: originalSize,
memoryMonitor: memoryMonitor,
cache: cache,
adjustmentInterval: 30 * time.Second, // Adjust every 30 seconds
}
}
// Start begins the dynamic cache size adjustment process
func (dcm *DynamicCacheManager) Start() {
go dcm.adjustmentLoop()
}
// GetCurrentCacheSize returns the current cache size
func (dcm *DynamicCacheManager) GetCurrentCacheSize() uint64 {
dcm.mu.RLock()
defer dcm.mu.RUnlock()
return atomic.LoadUint64(&dcm.currentCacheSize)
}
// GetOriginalCacheSize returns the original cache size
func (dcm *DynamicCacheManager) GetOriginalCacheSize() uint64 {
dcm.mu.RLock()
defer dcm.mu.RUnlock()
return dcm.originalCacheSize
}
// GetAdjustmentCount returns the number of adjustments made
func (dcm *DynamicCacheManager) GetAdjustmentCount() int64 {
return atomic.LoadInt64(&dcm.adjustmentCount)
}
// adjustmentLoop runs the cache size adjustment loop
func (dcm *DynamicCacheManager) adjustmentLoop() {
ticker := time.NewTicker(dcm.adjustmentInterval)
defer ticker.Stop()
for range ticker.C {
dcm.performAdjustment()
}
}
// performAdjustment performs a cache size adjustment if needed
func (dcm *DynamicCacheManager) performAdjustment() {
// Prevent concurrent adjustments
if !atomic.CompareAndSwapInt32(&dcm.isAdjusting, 0, 1) {
return
}
defer atomic.StoreInt32(&dcm.isAdjusting, 0)
// Check if enough time has passed since last adjustment
if time.Since(dcm.lastAdjustment) < dcm.adjustmentInterval {
return
}
// Get recommended cache size
recommendedSize := dcm.memoryMonitor.GetRecommendedCacheSize(dcm.originalCacheSize)
currentSize := atomic.LoadUint64(&dcm.currentCacheSize)
// Only adjust if there's a significant difference (more than 5%)
sizeDiff := float64(recommendedSize) / float64(currentSize)
if sizeDiff < 0.95 || sizeDiff > 1.05 {
dcm.adjustCacheSize(recommendedSize)
dcm.lastAdjustment = time.Now()
atomic.AddInt64(&dcm.adjustmentCount, 1)
}
}
// adjustCacheSize adjusts the cache size to the recommended size
func (dcm *DynamicCacheManager) adjustCacheSize(newSize uint64) {
dcm.mu.Lock()
defer dcm.mu.Unlock()
oldSize := atomic.LoadUint64(&dcm.currentCacheSize)
atomic.StoreUint64(&dcm.currentCacheSize, newSize)
// If we're reducing the cache size, trigger GC to free up memory
if newSize < oldSize {
// Calculate how much to free
bytesToFree := oldSize - newSize
// Trigger GC on the cache to free up the excess memory
// This is a simplified approach - in practice, you'd want to integrate
// with the actual GC system to free the right amount
if gcCache, ok := dcm.cache.(interface{ ForceGC(uint) }); ok {
gcCache.ForceGC(uint(bytesToFree))
}
}
}
// GetStats returns statistics about the dynamic cache manager
func (dcm *DynamicCacheManager) GetStats() map[string]interface{} {
dcm.mu.RLock()
defer dcm.mu.RUnlock()
return map[string]interface{}{
"original_cache_size": dcm.originalCacheSize,
"current_cache_size": atomic.LoadUint64(&dcm.currentCacheSize),
"adjustment_count": atomic.LoadInt64(&dcm.adjustmentCount),
"last_adjustment": dcm.lastAdjustment,
"memory_utilization": dcm.memoryMonitor.GetMemoryUtilization(),
"target_memory_usage": dcm.memoryMonitor.GetTargetMemoryUsage(),
"current_memory_usage": dcm.memoryMonitor.GetCurrentMemoryUsage(),
}
}

View File

@@ -3,8 +3,10 @@ package memory
import ( import (
"bytes" "bytes"
"container/list"
"io" "io"
"s1d3sw1ped/steamcache2/vfs"
"s1d3sw1ped/steamcache2/vfs/locks"
"s1d3sw1ped/steamcache2/vfs/lru"
"s1d3sw1ped/steamcache2/vfs/types" "s1d3sw1ped/steamcache2/vfs/types"
"s1d3sw1ped/steamcache2/vfs/vfserror" "s1d3sw1ped/steamcache2/vfs/vfserror"
"sort" "sort"
@@ -13,32 +15,8 @@ import (
"time" "time"
) )
// VFS defines the interface for virtual file systems
type VFS interface {
// Create creates a new file at the given key
Create(key string, size int64) (io.WriteCloser, error)
// Open opens the file at the given key for reading
Open(key string) (io.ReadCloser, error)
// Delete removes the file at the given key
Delete(key string) error
// Stat returns information about the file at the given key
Stat(key string) (*types.FileInfo, error)
// Name returns the name of this VFS
Name() string
// Size returns the current size of the VFS
Size() int64
// Capacity returns the maximum capacity of the VFS
Capacity() int64
}
// Ensure MemoryFS implements VFS. // Ensure MemoryFS implements VFS.
var _ VFS = (*MemoryFS)(nil) var _ vfs.VFS = (*MemoryFS)(nil)
// MemoryFS is an in-memory virtual file system // MemoryFS is an in-memory virtual file system
type MemoryFS struct { type MemoryFS struct {
@@ -48,55 +26,10 @@ type MemoryFS struct {
size int64 size int64
mu sync.RWMutex mu sync.RWMutex
keyLocks []sync.Map // Sharded lock pools for better concurrency keyLocks []sync.Map // Sharded lock pools for better concurrency
LRU *lruList LRU *lru.LRUList[*types.FileInfo]
timeUpdater *types.BatchedTimeUpdate // Batched time updates for better performance timeUpdater *types.BatchedTimeUpdate // Batched time updates for better performance
} }
// Number of lock shards for reducing contention
const numLockShards = 32
// lruList for time-decayed LRU eviction
type lruList struct {
list *list.List
elem map[string]*list.Element
}
func newLruList() *lruList {
return &lruList{
list: list.New(),
elem: make(map[string]*list.Element),
}
}
func (l *lruList) Add(key string, fi *types.FileInfo) {
elem := l.list.PushFront(fi)
l.elem[key] = elem
}
func (l *lruList) MoveToFront(key string, timeUpdater *types.BatchedTimeUpdate) {
if elem, exists := l.elem[key]; exists {
l.list.MoveToFront(elem)
// Update the FileInfo in the element with new access time
if fi := elem.Value.(*types.FileInfo); fi != nil {
fi.UpdateAccessBatched(timeUpdater)
}
}
}
func (l *lruList) Remove(key string) *types.FileInfo {
if elem, exists := l.elem[key]; exists {
delete(l.elem, key)
if fi := l.list.Remove(elem).(*types.FileInfo); fi != nil {
return fi
}
}
return nil
}
func (l *lruList) Len() int {
return l.list.Len()
}
// New creates a new MemoryFS // New creates a new MemoryFS
func New(capacity int64) *MemoryFS { func New(capacity int64) *MemoryFS {
if capacity <= 0 { if capacity <= 0 {
@@ -104,7 +37,7 @@ func New(capacity int64) *MemoryFS {
} }
// Initialize sharded locks // Initialize sharded locks
keyLocks := make([]sync.Map, numLockShards) keyLocks := make([]sync.Map, locks.NumLockShards)
return &MemoryFS{ return &MemoryFS{
data: make(map[string]*bytes.Buffer), data: make(map[string]*bytes.Buffer),
@@ -112,7 +45,7 @@ func New(capacity int64) *MemoryFS {
capacity: capacity, capacity: capacity,
size: 0, size: 0,
keyLocks: keyLocks, keyLocks: keyLocks,
LRU: newLruList(), LRU: lru.NewLRUList[*types.FileInfo](),
timeUpdater: types.NewBatchedTimeUpdate(100 * time.Millisecond), // Update time every 100ms timeUpdater: types.NewBatchedTimeUpdate(100 * time.Millisecond), // Update time every 100ms
} }
} }
@@ -163,24 +96,9 @@ func (m *MemoryFS) GetFragmentationStats() map[string]interface{} {
} }
} }
// getShardIndex returns the shard index for a given key
func getShardIndex(key string) int {
// Use FNV-1a hash for good distribution
var h uint32 = 2166136261 // FNV offset basis
for i := 0; i < len(key); i++ {
h ^= uint32(key[i])
h *= 16777619 // FNV prime
}
return int(h % numLockShards)
}
// getKeyLock returns a lock for the given key using sharding // getKeyLock returns a lock for the given key using sharding
func (m *MemoryFS) getKeyLock(key string) *sync.RWMutex { func (m *MemoryFS) getKeyLock(key string) *sync.RWMutex {
shardIndex := getShardIndex(key) return locks.GetKeyLock(m.keyLocks, key)
shard := &m.keyLocks[shardIndex]
keyLock, _ := shard.LoadOrStore(key, &sync.RWMutex{})
return keyLock.(*sync.RWMutex)
} }
// Create creates a new file // Create creates a new file
@@ -391,7 +309,7 @@ func (m *MemoryFS) EvictLRU(bytesNeeded uint) uint {
// Evict from LRU list until we free enough space // Evict from LRU list until we free enough space
for m.size > m.capacity-int64(bytesNeeded) && m.LRU.Len() > 0 { for m.size > m.capacity-int64(bytesNeeded) && m.LRU.Len() > 0 {
// Get the least recently used item // Get the least recently used item
elem := m.LRU.list.Back() elem := m.LRU.Back()
if elem == nil { if elem == nil {
break break
} }
@@ -411,7 +329,7 @@ func (m *MemoryFS) EvictLRU(bytesNeeded uint) uint {
evicted += uint(fi.Size) evicted += uint(fi.Size)
// Clean up key lock // Clean up key lock
shardIndex := getShardIndex(key) shardIndex := locks.GetShardIndex(key)
m.keyLocks[shardIndex].Delete(key) m.keyLocks[shardIndex].Delete(key)
} }
@@ -459,7 +377,7 @@ func (m *MemoryFS) EvictBySize(bytesNeeded uint, ascending bool) uint {
evicted += uint(fi.Size) evicted += uint(fi.Size)
// Clean up key lock // Clean up key lock
shardIndex := getShardIndex(key) shardIndex := locks.GetShardIndex(key)
m.keyLocks[shardIndex].Delete(key) m.keyLocks[shardIndex].Delete(key)
} }
@@ -504,7 +422,7 @@ func (m *MemoryFS) EvictFIFO(bytesNeeded uint) uint {
evicted += uint(fi.Size) evicted += uint(fi.Size)
// Clean up key lock // Clean up key lock
shardIndex := getShardIndex(key) shardIndex := locks.GetShardIndex(key)
m.keyLocks[shardIndex].Delete(key) m.keyLocks[shardIndex].Delete(key)
} }

View File

@@ -17,6 +17,15 @@ type MemoryMonitor struct {
ctx chan struct{} ctx chan struct{}
stopChan chan struct{} stopChan chan struct{}
isMonitoring int32 isMonitoring int32
// Dynamic cache management fields
originalCacheSize uint64
currentCacheSize uint64
cache interface{} // Generic cache interface
adjustmentInterval time.Duration
lastAdjustment time.Time
adjustmentCount int64
isAdjusting int32
} }
// NewMemoryMonitor creates a new memory monitor // NewMemoryMonitor creates a new memory monitor
@@ -27,9 +36,19 @@ func NewMemoryMonitor(targetMemoryUsage uint64, monitoringInterval time.Duration
adjustmentThreshold: adjustmentThreshold, adjustmentThreshold: adjustmentThreshold,
ctx: make(chan struct{}), ctx: make(chan struct{}),
stopChan: make(chan struct{}), stopChan: make(chan struct{}),
adjustmentInterval: 30 * time.Second, // Default adjustment interval
} }
} }
// NewMemoryMonitorWithCache creates a new memory monitor with cache management
func NewMemoryMonitorWithCache(targetMemoryUsage uint64, monitoringInterval time.Duration, adjustmentThreshold float64, cache interface{}, originalCacheSize uint64) *MemoryMonitor {
mm := NewMemoryMonitor(targetMemoryUsage, monitoringInterval, adjustmentThreshold)
mm.cache = cache
mm.originalCacheSize = originalCacheSize
mm.currentCacheSize = originalCacheSize
return mm
}
// Start begins monitoring memory usage // Start begins monitoring memory usage
func (mm *MemoryMonitor) Start() { func (mm *MemoryMonitor) Start() {
if atomic.CompareAndSwapInt32(&mm.isMonitoring, 0, 1) { if atomic.CompareAndSwapInt32(&mm.isMonitoring, 0, 1) {
@@ -151,3 +170,105 @@ func (mm *MemoryMonitor) GetMemoryStats() map[string]interface{} {
"gc_pause_total": m.PauseTotalNs, "gc_pause_total": m.PauseTotalNs,
} }
} }
// Dynamic Cache Management Methods
// StartDynamicAdjustment begins the dynamic cache size adjustment process
func (mm *MemoryMonitor) StartDynamicAdjustment() {
if mm.cache != nil {
go mm.adjustmentLoop()
}
}
// GetCurrentCacheSize returns the current cache size
func (mm *MemoryMonitor) GetCurrentCacheSize() uint64 {
mm.mu.RLock()
defer mm.mu.RUnlock()
return atomic.LoadUint64(&mm.currentCacheSize)
}
// GetOriginalCacheSize returns the original cache size
func (mm *MemoryMonitor) GetOriginalCacheSize() uint64 {
mm.mu.RLock()
defer mm.mu.RUnlock()
return mm.originalCacheSize
}
// GetAdjustmentCount returns the number of adjustments made
func (mm *MemoryMonitor) GetAdjustmentCount() int64 {
return atomic.LoadInt64(&mm.adjustmentCount)
}
// adjustmentLoop runs the cache size adjustment loop
func (mm *MemoryMonitor) adjustmentLoop() {
ticker := time.NewTicker(mm.adjustmentInterval)
defer ticker.Stop()
for range ticker.C {
mm.performAdjustment()
}
}
// performAdjustment performs a cache size adjustment if needed
func (mm *MemoryMonitor) performAdjustment() {
// Prevent concurrent adjustments
if !atomic.CompareAndSwapInt32(&mm.isAdjusting, 0, 1) {
return
}
defer atomic.StoreInt32(&mm.isAdjusting, 0)
// Check if enough time has passed since last adjustment
if time.Since(mm.lastAdjustment) < mm.adjustmentInterval {
return
}
// Get recommended cache size
recommendedSize := mm.GetRecommendedCacheSize(mm.originalCacheSize)
currentSize := atomic.LoadUint64(&mm.currentCacheSize)
// Only adjust if there's a significant difference (more than 5%)
sizeDiff := float64(recommendedSize) / float64(currentSize)
if sizeDiff < 0.95 || sizeDiff > 1.05 {
mm.adjustCacheSize(recommendedSize)
mm.lastAdjustment = time.Now()
atomic.AddInt64(&mm.adjustmentCount, 1)
}
}
// adjustCacheSize adjusts the cache size to the recommended size
func (mm *MemoryMonitor) adjustCacheSize(newSize uint64) {
mm.mu.Lock()
defer mm.mu.Unlock()
oldSize := atomic.LoadUint64(&mm.currentCacheSize)
atomic.StoreUint64(&mm.currentCacheSize, newSize)
// If we're reducing the cache size, trigger GC to free up memory
if newSize < oldSize {
// Calculate how much to free
bytesToFree := oldSize - newSize
// Trigger GC on the cache to free up the excess memory
// This is a simplified approach - in practice, you'd want to integrate
// with the actual GC system to free the right amount
if gcCache, ok := mm.cache.(interface{ ForceGC(uint) }); ok {
gcCache.ForceGC(uint(bytesToFree))
}
}
}
// GetDynamicStats returns statistics about the dynamic cache manager
func (mm *MemoryMonitor) GetDynamicStats() map[string]interface{} {
mm.mu.RLock()
defer mm.mu.RUnlock()
return map[string]interface{}{
"original_cache_size": mm.originalCacheSize,
"current_cache_size": atomic.LoadUint64(&mm.currentCacheSize),
"adjustment_count": atomic.LoadInt64(&mm.adjustmentCount),
"last_adjustment": mm.lastAdjustment,
"memory_utilization": mm.GetMemoryUtilization(),
"target_memory_usage": mm.GetTargetMemoryUsage(),
"current_memory_usage": mm.GetCurrentMemoryUsage(),
}
}

View File

@@ -70,9 +70,35 @@ type PopularContent struct {
// WarmRequest represents a cache warming request // WarmRequest represents a cache warming request
type WarmRequest struct { type WarmRequest struct {
Key string Key string
Priority int Priority int
Reason string Reason string
Size int64
RequestedAt time.Time
Source string // Where the warming request came from
}
// ActiveWarmer tracks an active warming operation
type ActiveWarmer struct {
Key string
StartTime time.Time
Priority int
Reason string
mu sync.RWMutex
}
// WarmingStats tracks cache warming statistics
type WarmingStats struct {
WarmRequests int64
WarmSuccesses int64
WarmFailures int64
WarmBytes int64
WarmDuration time.Duration
PrefetchRequests int64
PrefetchSuccesses int64
PrefetchFailures int64
PrefetchBytes int64
PrefetchDuration time.Duration
} }
// NewPredictiveCacheManager creates a new predictive cache manager // NewPredictiveCacheManager creates a new predictive cache manager
@@ -114,6 +140,21 @@ func NewCacheWarmer() *CacheWarmer {
} }
} }
// NewWarmingStats creates a new warming stats tracker
func NewWarmingStats() *WarmingStats {
return &WarmingStats{}
}
// NewActiveWarmer creates a new active warmer tracker
func NewActiveWarmer(key string, priority int, reason string) *ActiveWarmer {
return &ActiveWarmer{
Key: key,
StartTime: time.Now(),
Priority: priority,
Reason: reason,
}
}
// RecordAccess records a file access for prediction analysis (lightweight version) // RecordAccess records a file access for prediction analysis (lightweight version)
func (pcm *PredictiveCacheManager) RecordAccess(key string, previousKey string, size int64) { func (pcm *PredictiveCacheManager) RecordAccess(key string, previousKey string, size int64) {
// Only record if we have a previous key to avoid overhead // Only record if we have a previous key to avoid overhead
@@ -282,6 +323,23 @@ func (cw *CacheWarmer) GetPopularContent(limit int) []*PopularContent {
return popular return popular
} }
// RequestWarming requests warming of a specific key
func (cw *CacheWarmer) RequestWarming(key string, priority int, reason string, size int64) {
select {
case cw.warmerQueue <- WarmRequest{
Key: key,
Priority: priority,
Reason: reason,
Size: size,
RequestedAt: time.Now(),
Source: "predictive",
}:
// Successfully queued
default:
// Queue full, skip warming
}
}
// prefetchWorker processes prefetch requests // prefetchWorker processes prefetch requests
func (pcm *PredictiveCacheManager) prefetchWorker() { func (pcm *PredictiveCacheManager) prefetchWorker() {
defer pcm.wg.Done() defer pcm.wg.Done()

View File

@@ -1,7 +1,10 @@
// vfs/vfserror/vfserror.go // vfs/vfserror/vfserror.go
package vfserror package vfserror
import "errors" import (
"errors"
"fmt"
)
// Common VFS errors // Common VFS errors
var ( var (
@@ -9,4 +12,47 @@ var (
ErrInvalidKey = errors.New("vfs: invalid key") ErrInvalidKey = errors.New("vfs: invalid key")
ErrAlreadyExists = errors.New("vfs: key already exists") ErrAlreadyExists = errors.New("vfs: key already exists")
ErrCapacityExceeded = errors.New("vfs: capacity exceeded") ErrCapacityExceeded = errors.New("vfs: capacity exceeded")
ErrCorruptedFile = errors.New("vfs: corrupted file")
ErrInvalidSize = errors.New("vfs: invalid size")
ErrOperationTimeout = errors.New("vfs: operation timeout")
) )
// VFSError represents a VFS-specific error with context
type VFSError struct {
Op string // Operation that failed
Key string // Key that caused the error
Err error // Underlying error
Size int64 // Size information if relevant
}
// Error implements the error interface
func (e *VFSError) Error() string {
if e.Key != "" {
return fmt.Sprintf("vfs: %s failed for key %q: %v", e.Op, e.Key, e.Err)
}
return fmt.Sprintf("vfs: %s failed: %v", e.Op, e.Err)
}
// Unwrap returns the underlying error
func (e *VFSError) Unwrap() error {
return e.Err
}
// NewVFSError creates a new VFS error with context
func NewVFSError(op, key string, err error) *VFSError {
return &VFSError{
Op: op,
Key: key,
Err: err,
}
}
// NewVFSErrorWithSize creates a new VFS error with size context
func NewVFSErrorWithSize(op, key string, size int64, err error) *VFSError {
return &VFSError{
Op: op,
Key: key,
Size: size,
Err: err,
}
}

View File

@@ -1,300 +0,0 @@
package warming
import (
"context"
"s1d3sw1ped/steamcache2/vfs"
"sync"
"sync/atomic"
"time"
)
// CacheWarmer implements intelligent cache warming strategies
type CacheWarmer struct {
vfs vfs.VFS
warmingQueue chan WarmRequest
activeWarmers map[string]*ActiveWarmer
stats *WarmingStats
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.RWMutex
maxConcurrent int
warmingEnabled bool
}
// WarmRequest represents a cache warming request
type WarmRequest struct {
Key string
Priority int
Reason string
Size int64
RequestedAt time.Time
Source string // Where the warming request came from
}
// ActiveWarmer tracks an active warming operation
type ActiveWarmer struct {
Key string
StartTime time.Time
Priority int
Reason string
mu sync.RWMutex
}
// WarmingStats tracks cache warming statistics
type WarmingStats struct {
WarmRequests int64
WarmSuccesses int64
WarmFailures int64
WarmBytes int64
WarmDuration time.Duration
ActiveWarmers int64
mu sync.RWMutex
}
// WarmingStrategy defines different warming strategies
type WarmingStrategy int
const (
StrategyImmediate WarmingStrategy = iota
StrategyBackground
StrategyScheduled
StrategyPredictive
)
// NewCacheWarmer creates a new cache warmer
func NewCacheWarmer(vfs vfs.VFS, maxConcurrent int) *CacheWarmer {
ctx, cancel := context.WithCancel(context.Background())
cw := &CacheWarmer{
vfs: vfs,
warmingQueue: make(chan WarmRequest, 1000),
activeWarmers: make(map[string]*ActiveWarmer),
stats: &WarmingStats{},
ctx: ctx,
cancel: cancel,
maxConcurrent: maxConcurrent,
warmingEnabled: true,
}
// Start warming workers
for i := 0; i < maxConcurrent; i++ {
cw.wg.Add(1)
go cw.warmingWorker(i)
}
// Start cleanup worker
cw.wg.Add(1)
go cw.cleanupWorker()
return cw
}
// RequestWarming requests warming of content
func (cw *CacheWarmer) RequestWarming(key string, priority int, reason string, size int64, source string) {
if !cw.warmingEnabled {
return
}
// Check if already warming
cw.mu.RLock()
if _, exists := cw.activeWarmers[key]; exists {
cw.mu.RUnlock()
return // Already warming
}
cw.mu.RUnlock()
// Check if already cached
if _, err := cw.vfs.Stat(key); err == nil {
return // Already cached
}
select {
case cw.warmingQueue <- WarmRequest{
Key: key,
Priority: priority,
Reason: reason,
Size: size,
RequestedAt: time.Now(),
Source: source,
}:
atomic.AddInt64(&cw.stats.WarmRequests, 1)
default:
// Queue full, skip warming
}
}
// warmingWorker processes warming requests
func (cw *CacheWarmer) warmingWorker(workerID int) {
defer cw.wg.Done()
for {
select {
case <-cw.ctx.Done():
return
case req := <-cw.warmingQueue:
cw.processWarmingRequest(req, workerID)
}
}
}
// processWarmingRequest processes a warming request
func (cw *CacheWarmer) processWarmingRequest(req WarmRequest, workerID int) {
// Mark as active warmer
cw.mu.Lock()
cw.activeWarmers[req.Key] = &ActiveWarmer{
Key: req.Key,
StartTime: time.Now(),
Priority: req.Priority,
Reason: req.Reason,
}
cw.mu.Unlock()
atomic.AddInt64(&cw.stats.ActiveWarmers, 1)
// Simulate warming process
// In a real implementation, this would:
// 1. Fetch content from upstream
// 2. Store in cache
// 3. Update statistics
startTime := time.Now()
// Simulate warming delay based on priority
warmingDelay := time.Duration(100-req.Priority*10) * time.Millisecond
if warmingDelay < 10*time.Millisecond {
warmingDelay = 10 * time.Millisecond
}
select {
case <-time.After(warmingDelay):
// Warming completed successfully
atomic.AddInt64(&cw.stats.WarmSuccesses, 1)
atomic.AddInt64(&cw.stats.WarmBytes, req.Size)
case <-cw.ctx.Done():
// Context cancelled
atomic.AddInt64(&cw.stats.WarmFailures, 1)
}
duration := time.Since(startTime)
cw.stats.mu.Lock()
cw.stats.WarmDuration += duration
cw.stats.mu.Unlock()
// Remove from active warmers
cw.mu.Lock()
delete(cw.activeWarmers, req.Key)
cw.mu.Unlock()
atomic.AddInt64(&cw.stats.ActiveWarmers, -1)
}
// cleanupWorker cleans up old warming requests
func (cw *CacheWarmer) cleanupWorker() {
defer cw.wg.Done()
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-cw.ctx.Done():
return
case <-ticker.C:
cw.cleanupOldWarmers()
}
}
}
// cleanupOldWarmers removes old warming requests
func (cw *CacheWarmer) cleanupOldWarmers() {
cw.mu.Lock()
defer cw.mu.Unlock()
now := time.Now()
cutoff := now.Add(-5 * time.Minute) // Remove warmers older than 5 minutes
for key, warmer := range cw.activeWarmers {
warmer.mu.RLock()
if warmer.StartTime.Before(cutoff) {
warmer.mu.RUnlock()
delete(cw.activeWarmers, key)
atomic.AddInt64(&cw.stats.WarmFailures, 1)
} else {
warmer.mu.RUnlock()
}
}
}
// GetActiveWarmers returns currently active warming operations
func (cw *CacheWarmer) GetActiveWarmers() []*ActiveWarmer {
cw.mu.RLock()
defer cw.mu.RUnlock()
warmers := make([]*ActiveWarmer, 0, len(cw.activeWarmers))
for _, warmer := range cw.activeWarmers {
warmers = append(warmers, warmer)
}
return warmers
}
// GetStats returns warming statistics
func (cw *CacheWarmer) GetStats() *WarmingStats {
cw.stats.mu.RLock()
defer cw.stats.mu.RUnlock()
return &WarmingStats{
WarmRequests: atomic.LoadInt64(&cw.stats.WarmRequests),
WarmSuccesses: atomic.LoadInt64(&cw.stats.WarmSuccesses),
WarmFailures: atomic.LoadInt64(&cw.stats.WarmFailures),
WarmBytes: atomic.LoadInt64(&cw.stats.WarmBytes),
WarmDuration: cw.stats.WarmDuration,
ActiveWarmers: atomic.LoadInt64(&cw.stats.ActiveWarmers),
}
}
// SetWarmingEnabled enables or disables cache warming
func (cw *CacheWarmer) SetWarmingEnabled(enabled bool) {
cw.mu.Lock()
defer cw.mu.Unlock()
cw.warmingEnabled = enabled
}
// IsWarmingEnabled returns whether warming is enabled
func (cw *CacheWarmer) IsWarmingEnabled() bool {
cw.mu.RLock()
defer cw.mu.RUnlock()
return cw.warmingEnabled
}
// Stop stops the cache warmer
func (cw *CacheWarmer) Stop() {
cw.cancel()
cw.wg.Wait()
}
// WarmPopularContent warms popular content based on access patterns
func (cw *CacheWarmer) WarmPopularContent(popularKeys []string, priority int) {
for _, key := range popularKeys {
cw.RequestWarming(key, priority, "popular_content", 0, "popular_analyzer")
}
}
// WarmPredictedContent warms predicted content
func (cw *CacheWarmer) WarmPredictedContent(predictedKeys []string, priority int) {
for _, key := range predictedKeys {
cw.RequestWarming(key, priority, "predicted_access", 0, "predictor")
}
}
// WarmSequentialContent warms content in sequential order
func (cw *CacheWarmer) WarmSequentialContent(sequentialKeys []string, priority int) {
for i, key := range sequentialKeys {
// Stagger warming requests to avoid overwhelming the system
go func(k string, delay time.Duration) {
time.Sleep(delay)
cw.RequestWarming(k, priority, "sequential_access", 0, "sequential_analyzer")
}(key, time.Duration(i)*100*time.Millisecond)
}
}