Enhance error handling and metrics tracking in SteamCache

- Introduced a new error handling system with custom error types for better context and clarity in error reporting.
- Implemented URL validation to prevent invalid requests and enhance security.
- Updated cache key generation functions to return errors, improving robustness in handling invalid inputs.
- Added comprehensive metrics tracking for requests, cache hits, misses, and performance metrics, allowing for better monitoring and analysis of the caching system.
- Enhanced logging to include detailed metrics and error information for improved debugging and operational insights.
This commit is contained in:
2025-09-22 17:29:41 -05:00
parent 3703e40442
commit f945ccef05
5 changed files with 694 additions and 30 deletions

View File

@@ -13,7 +13,9 @@ import (
"net/url"
"os"
"regexp"
"s1d3sw1ped/steamcache2/steamcache/errors"
"s1d3sw1ped/steamcache2/steamcache/logger"
"s1d3sw1ped/steamcache2/steamcache/metrics"
"s1d3sw1ped/steamcache2/vfs"
"s1d3sw1ped/steamcache2/vfs/adaptive"
"s1d3sw1ped/steamcache2/vfs/cache"
@@ -360,13 +362,14 @@ func (sc *SteamCache) streamCachedResponse(w http.ResponseWriter, r *http.Reques
w.Write(rangeData)
logger.Logger.Info().
Str("key", cacheKey).
Str("cache_key", cacheKey).
Str("url", r.URL.String()).
Str("host", r.Host).
Str("client_ip", clientIP).
Str("status", "HIT").
Str("cache_status", "HIT").
Str("range", fmt.Sprintf("%d-%d/%d", start, end, totalSize)).
Dur("zduration", time.Since(tstart)).
Int64("range_size", end-start+1).
Dur("response_time", time.Since(tstart)).
Msg("cache request")
return
@@ -394,12 +397,13 @@ func (sc *SteamCache) streamCachedResponse(w http.ResponseWriter, r *http.Reques
w.Write(bodyData)
logger.Logger.Info().
Str("key", cacheKey).
Str("cache_key", cacheKey).
Str("url", r.URL.String()).
Str("host", r.Host).
Str("client_ip", clientIP).
Str("status", "HIT").
Dur("zduration", time.Since(tstart)).
Str("cache_status", "HIT").
Int64("file_size", int64(len(bodyData))).
Dur("response_time", time.Since(tstart)).
Msg("cache request")
}
@@ -495,14 +499,19 @@ func parseRangeHeader(rangeHeader string, totalSize int64) (start, end, total in
}
// generateURLHash creates a SHA256 hash of the entire URL path for cache key
func generateURLHash(urlPath string) string {
func generateURLHash(urlPath string) (string, error) {
// Validate input to prevent cache key pollution
if urlPath == "" {
return ""
return "", errors.NewSteamCacheError("generateURLHash", urlPath, "", errors.ErrInvalidURL)
}
// Additional validation for suspicious patterns
if strings.Contains(urlPath, "..") || strings.Contains(urlPath, "//") {
return "", errors.NewSteamCacheError("generateURLHash", urlPath, "", errors.ErrInvalidURL)
}
hash := sha256.Sum256([]byte(urlPath))
return hex.EncodeToString(hash[:])
return hex.EncodeToString(hash[:]), nil
}
// calculateSHA256 calculates SHA256 hash of the given data
@@ -512,6 +521,35 @@ func calculateSHA256(data []byte) string {
return hex.EncodeToString(hasher.Sum(nil))
}
// validateURLPath validates URL path for security concerns
func validateURLPath(urlPath string) error {
if urlPath == "" {
return errors.NewSteamCacheError("validateURLPath", urlPath, "", errors.ErrInvalidURL)
}
// Check for directory traversal attempts
if strings.Contains(urlPath, "..") {
return errors.NewSteamCacheError("validateURLPath", urlPath, "", errors.ErrInvalidURL)
}
// Check for double slashes (potential path manipulation)
if strings.Contains(urlPath, "//") {
return errors.NewSteamCacheError("validateURLPath", urlPath, "", errors.ErrInvalidURL)
}
// Check for suspicious characters
if strings.ContainsAny(urlPath, "<>\"'&") {
return errors.NewSteamCacheError("validateURLPath", urlPath, "", errors.ErrInvalidURL)
}
// Check for reasonable length (prevent DoS)
if len(urlPath) > 2048 {
return errors.NewSteamCacheError("validateURLPath", urlPath, "", errors.ErrInvalidURL)
}
return nil
}
// verifyCompleteFile verifies that we received the complete file by checking Content-Length
// Returns true if the file is complete, false if it's incomplete (allowing retry)
func (sc *SteamCache) verifyCompleteFile(bodyData []byte, resp *http.Response, urlPath string, cacheKey string) bool {
@@ -571,9 +609,20 @@ func (sc *SteamCache) detectService(r *http.Request) (*ServiceConfig, bool) {
// The prefix indicates which service the request came from (detected via User-Agent)
// Input: /depot/1684171/chunk/0016cfc5019b8baa6026aa1cce93e685d6e06c6e, "steam"
// Output: steam/a1b2c3d4e5f678901234567890123456789012345678901234567890
func generateServiceCacheKey(urlPath string, servicePrefix string) string {
func generateServiceCacheKey(urlPath string, servicePrefix string) (string, error) {
// Validate service prefix
if servicePrefix == "" {
return "", errors.NewSteamCacheError("generateServiceCacheKey", urlPath, "", errors.ErrUnsupportedService)
}
// Generate hash for URL path
hash, err := generateURLHash(urlPath)
if err != nil {
return "", err
}
// Create a SHA256 hash of the entire path for all service client requests
return servicePrefix + "/" + generateURLHash(urlPath)
return servicePrefix + "/" + hash, nil
}
var hopByHopHeaders = map[string]struct{}{
@@ -788,6 +837,9 @@ type SteamCache struct {
// Dynamic memory management
memoryMonitor *memory.MemoryMonitor
dynamicCacheMgr *memory.MemoryMonitor
// Metrics
metrics *metrics.Metrics
}
func New(address string, memorySize string, diskSize string, diskPath, upstream, memoryGC, diskGC string, maxConcurrentRequests int64, maxRequestsPerClient int64) *SteamCache {
@@ -930,6 +982,9 @@ func New(address string, memorySize string, diskSize string, diskPath, upstream,
// Initialize dynamic memory management
memoryMonitor: memory.NewMemoryMonitor(uint64(memorysize), 10*time.Second, 0.1), // 10% threshold
dynamicCacheMgr: nil, // Will be set after cache creation
// Initialize metrics
metrics: metrics.NewMetrics(),
}
// Initialize dynamic cache manager if we have memory cache
@@ -1000,21 +1055,44 @@ func (sc *SteamCache) Shutdown() {
sc.wg.Wait()
}
// GetMetrics returns current metrics
func (sc *SteamCache) GetMetrics() *metrics.Stats {
// Update cache sizes
if sc.memory != nil {
sc.metrics.SetMemoryCacheSize(sc.memory.Size())
}
if sc.disk != nil {
sc.metrics.SetDiskCacheSize(sc.disk.Size())
}
return sc.metrics.GetStats()
}
// ResetMetrics resets all metrics to zero
func (sc *SteamCache) ResetMetrics() {
sc.metrics.Reset()
}
func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
clientIP := getClientIP(r)
// Set keep-alive headers for better performance
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Keep-Alive", "timeout=300, max=1000")
// Apply global concurrency limit first
if err := sc.requestSemaphore.Acquire(context.Background(), 1); err != nil {
logger.Logger.Warn().Str("client_ip", getClientIP(r)).Msg("Server at capacity, rejecting request")
sc.metrics.IncrementRateLimited()
logger.Logger.Warn().Str("client_ip", clientIP).Msg("Server at capacity, rejecting request")
http.Error(w, "Server busy, please try again later", http.StatusServiceUnavailable)
return
}
defer sc.requestSemaphore.Release(1)
// Track total requests
sc.metrics.IncrementTotalRequests()
// Apply per-client rate limiting
clientIP := getClientIP(r)
clientLimiter := sc.getOrCreateClientLimiter(clientIP)
if err := clientLimiter.semaphore.Acquire(context.Background(), 1); err != nil {
@@ -1054,19 +1132,56 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
if r.URL.String() == "/metrics" {
// Return metrics in a simple text format
stats := sc.GetMetrics()
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "# SteamCache2 Metrics\n")
fmt.Fprintf(w, "total_requests %d\n", stats.TotalRequests)
fmt.Fprintf(w, "cache_hits %d\n", stats.CacheHits)
fmt.Fprintf(w, "cache_misses %d\n", stats.CacheMisses)
fmt.Fprintf(w, "cache_coalesced %d\n", stats.CacheCoalesced)
fmt.Fprintf(w, "errors %d\n", stats.Errors)
fmt.Fprintf(w, "rate_limited %d\n", stats.RateLimited)
fmt.Fprintf(w, "hit_rate %.4f\n", stats.HitRate)
fmt.Fprintf(w, "avg_response_time_ms %.2f\n", float64(stats.AvgResponseTime.Nanoseconds())/1e6)
fmt.Fprintf(w, "total_bytes_served %d\n", stats.TotalBytesServed)
fmt.Fprintf(w, "total_bytes_cached %d\n", stats.TotalBytesCached)
fmt.Fprintf(w, "memory_cache_size %d\n", stats.MemoryCacheSize)
fmt.Fprintf(w, "disk_cache_size %d\n", stats.DiskCacheSize)
fmt.Fprintf(w, "uptime_seconds %.2f\n", stats.Uptime.Seconds())
return
}
// Check if this is a request from a supported service
if service, isSupported := sc.detectService(r); isSupported {
// trim the query parameters from the URL path
// this is necessary because the cache key should not include query parameters
urlPath, _, _ := strings.Cut(r.URL.String(), "?")
// Validate URL path for security
if err := validateURLPath(urlPath); err != nil {
logger.Logger.Warn().
Err(err).
Str("url", urlPath).
Str("client_ip", clientIP).
Msg("Invalid URL path detected")
http.Error(w, "Invalid URL", http.StatusBadRequest)
return
}
tstart := time.Now()
// Generate service cache key: {service}/{hash} (prefix indicates service via User-Agent)
cacheKey := generateServiceCacheKey(urlPath, service.Prefix)
if cacheKey == "" {
logger.Logger.Warn().Str("url", urlPath).Msg("Invalid URL")
cacheKey, err := generateServiceCacheKey(urlPath, service.Prefix)
if err != nil {
logger.Logger.Warn().
Err(err).
Str("url", urlPath).
Str("service", service.Name).
Str("client_ip", clientIP).
Msg("Failed to generate cache key")
http.Error(w, "Invalid URL", http.StatusBadRequest)
return
}
@@ -1110,6 +1225,12 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Cache validation passed - record access for adaptive/predictive analysis
sc.recordCacheAccess(cacheKey, int64(len(cachedData)))
// Track cache hit metrics
sc.metrics.IncrementCacheHits()
sc.metrics.AddResponseTime(time.Since(tstart))
sc.metrics.AddBytesServed(int64(len(cachedData)))
sc.metrics.IncrementServiceRequests(service.Name)
logger.Logger.Debug().
Str("key", cacheKey).
Str("url", urlPath).
@@ -1175,13 +1296,21 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(coalescedReq.statusCode)
w.Write(responseData)
// Track coalesced cache hit metrics
sc.metrics.IncrementCacheCoalesced()
sc.metrics.AddResponseTime(time.Since(tstart))
sc.metrics.AddBytesServed(int64(len(responseData)))
sc.metrics.IncrementServiceRequests(service.Name)
logger.Logger.Info().
Str("key", cacheKey).
Str("cache_key", cacheKey).
Str("url", urlPath).
Str("host", r.Host).
Str("client_ip", clientIP).
Str("status", "HIT-COALESCED").
Dur("zduration", time.Since(tstart)).
Str("cache_status", "HIT-COALESCED").
Int("waiting_clients", coalescedReq.waitingCount).
Int64("file_size", int64(len(responseData))).
Dur("response_time", time.Since(tstart)).
Msg("cache request")
return
@@ -1359,6 +1488,12 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(resp.StatusCode)
w.Write(bodyData)
// Track cache miss metrics
sc.metrics.IncrementCacheMisses()
sc.metrics.AddResponseTime(time.Since(tstart))
sc.metrics.AddBytesServed(int64(len(bodyData)))
sc.metrics.IncrementServiceRequests(service.Name)
// Cache the file if validation passed
if validationPassed {
// Verify we received the complete file by checking Content-Length
@@ -1399,6 +1534,8 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Msg("Cache write failed or incomplete - removing corrupted entry")
sc.vfs.Delete(cachePath)
} else {
// Track successful cache write
sc.metrics.AddBytesCached(int64(len(cacheData)))
logger.Logger.Debug().
Str("key", cacheKey).
Str("url", urlPath).
@@ -1456,12 +1593,14 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
logger.Logger.Info().
Str("key", cacheKey).
Str("cache_key", cacheKey).
Str("url", urlPath).
Str("host", r.Host).
Str("client_ip", clientIP).
Str("status", "MISS").
Dur("zduration", time.Since(tstart)).
Str("service", service.Name).
Str("cache_status", "MISS").
Int64("file_size", int64(len(bodyData))).
Dur("response_time", time.Since(tstart)).
Msg("cache request")
return