fix: gc was being stupid allowing another thread to take the space it made before it could not anymore
All checks were successful
PR Check / check-and-test (pull_request) Successful in 12s

This commit is contained in:
2025-07-13 07:50:22 -05:00
parent 7f744d04b0
commit 3427b8f5bc
6 changed files with 68 additions and 79 deletions

9
.vscode/launch.json vendored
View File

@@ -17,7 +17,8 @@
"10G", "10G",
"--disk-path", "--disk-path",
"tmp/disk", "tmp/disk",
"--verbose", "--log-level",
"debug",
], ],
}, },
{ {
@@ -31,7 +32,8 @@
"10G", "10G",
"--disk-path", "--disk-path",
"tmp/disk", "tmp/disk",
"--verbose", "--log-level",
"debug",
], ],
}, },
{ {
@@ -43,7 +45,8 @@
"args": [ "args": [
"--memory", "--memory",
"1G", "1G",
"--verbose", "--log-level",
"debug",
], ],
} }
] ]

View File

@@ -15,12 +15,10 @@ import (
var ( var (
threads int threads int
memory string memory string
memorymultiplier int disk string
disk string diskpath string
diskmultiplier int upstream string
diskpath string
upstream string
logLevel string logLevel string
logFormat string logFormat string
@@ -69,9 +67,7 @@ var rootCmd = &cobra.Command{
sc := steamcache.New( sc := steamcache.New(
address, address,
memory, memory,
memorymultiplier,
disk, disk,
diskmultiplier,
diskpath, diskpath,
upstream, upstream,
) )
@@ -99,9 +95,7 @@ func init() {
rootCmd.Flags().IntVarP(&threads, "threads", "t", runtime.GOMAXPROCS(-1), "Number of worker threads to use for processing requests") rootCmd.Flags().IntVarP(&threads, "threads", "t", runtime.GOMAXPROCS(-1), "Number of worker threads to use for processing requests")
rootCmd.Flags().StringVarP(&memory, "memory", "m", "0", "The size of the memory cache") rootCmd.Flags().StringVarP(&memory, "memory", "m", "0", "The size of the memory cache")
rootCmd.Flags().IntVarP(&memorymultiplier, "memory-gc", "M", 10, "The gc value for the memory cache")
rootCmd.Flags().StringVarP(&disk, "disk", "d", "0", "The size of the disk cache") rootCmd.Flags().StringVarP(&disk, "disk", "d", "0", "The size of the disk cache")
rootCmd.Flags().IntVarP(&diskmultiplier, "disk-gc", "D", 100, "The gc value for the disk cache")
rootCmd.Flags().StringVarP(&diskpath, "disk-path", "p", "", "The path to the disk cache") rootCmd.Flags().StringVarP(&diskpath, "disk-path", "p", "", "The path to the disk cache")
rootCmd.Flags().StringVarP(&upstream, "upstream", "u", "", "The upstream server to proxy requests overrides the host header from the client but forwards the original host header to the upstream server") rootCmd.Flags().StringVarP(&upstream, "upstream", "u", "", "The upstream server to proxy requests overrides the host header from the client but forwards the original host header to the upstream server")

View File

@@ -68,7 +68,7 @@ type SteamCache struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
func New(address string, memorySize string, memoryMultiplier int, diskSize string, diskMultiplier int, diskPath, upstream string) *SteamCache { func New(address string, memorySize string, diskSize string, diskPath, upstream string) *SteamCache {
memorysize, err := units.FromHumanSize(memorySize) memorysize, err := units.FromHumanSize(memorySize)
if err != nil { if err != nil {
panic(err) panic(err)
@@ -87,14 +87,14 @@ func New(address string, memorySize string, memoryMultiplier int, diskSize strin
var mgc *gc.GCFS var mgc *gc.GCFS
if memorysize > 0 { if memorysize > 0 {
m = memory.New(memorysize) m = memory.New(memorysize)
mgc = gc.New(m, memoryMultiplier, gc.LRUGC) mgc = gc.New(m, gc.LRUGC)
} }
var d *disk.DiskFS var d *disk.DiskFS
var dgc *gc.GCFS var dgc *gc.GCFS
if disksize > 0 { if disksize > 0 {
d = disk.New(diskPath, disksize) d = disk.New(diskPath, disksize)
dgc = gc.New(d, diskMultiplier, gc.LRUGC) dgc = gc.New(d, gc.LRUGC)
} }
// configure the cache to match the specified mode (memory only, disk only, or memory and disk) based on the provided sizes // configure the cache to match the specified mode (memory only, disk only, or memory and disk) based on the provided sizes
@@ -220,16 +220,16 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.String(), "/depot/") { if strings.HasPrefix(r.URL.String(), "/depot/") {
// trim the query parameters from the URL path // trim the query parameters from the URL path
// this is necessary because the cache key should not include query parameters // this is necessary because the cache key should not include query parameters
r.URL.Path = strings.Split(r.URL.Path, "?")[0] path := strings.Split(r.URL.String(), "?")[0]
tstart := time.Now() tstart := time.Now()
defer func() { responseTime.Observe(time.Since(tstart).Seconds()) }() defer func() { responseTime.Observe(time.Since(tstart).Seconds()) }()
cacheKey := strings.ReplaceAll(r.URL.String()[1:], "\\", "/") // replace all backslashes with forward slashes shouldn't be necessary but just in case cacheKey := strings.ReplaceAll(path[1:], "\\", "/") // replace all backslashes with forward slashes shouldn't be necessary but just in case
if cacheKey == "" { if cacheKey == "" {
requestsTotal.WithLabelValues(r.Method, "400").Inc() requestsTotal.WithLabelValues(r.Method, "400").Inc()
logger.Logger.Warn().Str("url", r.URL.String()).Msg("Invalid URL") logger.Logger.Warn().Str("url", path).Msg("Invalid URL")
http.Error(w, "Invalid URL", http.StatusBadRequest) http.Error(w, "Invalid URL", http.StatusBadRequest)
return return
} }
@@ -258,7 +258,7 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var req *http.Request var req *http.Request
if sc.upstream != "" { // if an upstream server is configured, proxy the request to the upstream server if sc.upstream != "" { // if an upstream server is configured, proxy the request to the upstream server
ur, err := url.JoinPath(sc.upstream, r.URL.String()) ur, err := url.JoinPath(sc.upstream, path)
if err != nil { if err != nil {
requestsTotal.WithLabelValues(r.Method, "500").Inc() requestsTotal.WithLabelValues(r.Method, "500").Inc()
logger.Logger.Error().Err(err).Str("upstream", sc.upstream).Msg("Failed to join URL path") logger.Logger.Error().Err(err).Str("upstream", sc.upstream).Msg("Failed to join URL path")
@@ -282,7 +282,7 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
host = "http://" + host host = "http://" + host
} }
ur, err := url.JoinPath(host, r.URL.String()) ur, err := url.JoinPath(host, path)
if err != nil { if err != nil {
requestsTotal.WithLabelValues(r.Method, "500").Inc() requestsTotal.WithLabelValues(r.Method, "500").Inc()
logger.Logger.Error().Err(err).Str("host", host).Msg("Failed to join URL path") logger.Logger.Error().Err(err).Str("host", host).Msg("Failed to join URL path")
@@ -328,16 +328,17 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
size := resp.ContentLength size := resp.ContentLength
writer, err := sc.vfs.Create(cacheKey, size) // this is sortof not needed as we should always be able to get a writer from the cache as long as the gc is able to reclaim enough space aka the file is not bigger than the disk can handle
if err != nil { ww := w.(io.Writer) // default writer to write to the response writer
http.Error(w, err.Error(), http.StatusInternalServerError) writer, _ := sc.vfs.Create(cacheKey, size) // create a writer to write to the cache
return if writer != nil { // if the writer is not nil, it means the cache is writable
defer writer.Close() // close the writer when done
ww = io.MultiWriter(w, writer) // write to both the response writer and the cache writer
} }
defer writer.Close()
w.Header().Add("X-LanCache-Status", "MISS") w.Header().Add("X-LanCache-Status", "MISS")
io.Copy(io.MultiWriter(w, writer), resp.Body) io.Copy(ww, resp.Body)
logger.Logger.Info(). logger.Logger.Info().
Str("key", cacheKey). Str("key", cacheKey).

View File

@@ -13,7 +13,7 @@ func TestCaching(t *testing.T) {
os.WriteFile(filepath.Join(td, "key2"), []byte("value2"), 0644) os.WriteFile(filepath.Join(td, "key2"), []byte("value2"), 0644)
sc := New("localhost:8080", "1G", 10, "1G", 100, td, "") sc := New("localhost:8080", "1G", "1G", td, "")
w, err := sc.vfs.Create("key", 5) w, err := sc.vfs.Create("key", 5)
if err != nil { if err != nil {
@@ -84,7 +84,7 @@ func TestCaching(t *testing.T) {
} }
func TestCacheMissAndHit(t *testing.T) { func TestCacheMissAndHit(t *testing.T) {
sc := New("localhost:8080", "0", 0, "1G", 100, t.TempDir(), "") sc := New("localhost:8080", "0", "1G", t.TempDir(), "")
key := "testkey" key := "testkey"
value := []byte("testvalue") value := []byte("testvalue")

View File

@@ -13,59 +13,48 @@ import (
"time" "time"
) )
// LRUGC deletes files in LRU order until enough space is reclaimed. var (
func LRUGC(vfss vfs.VFS, size uint) { // ErrInsufficientSpace is returned when there are no files to delete in the VFS.
attempts := 0 ErrInsufficientSpace = fmt.Errorf("no files to delete")
deletions := 0 )
var reclaimed uint
for reclaimed < size { // LRUGC deletes files in LRU order until enough space is reclaimed.
if attempts > 10 { func LRUGC(vfss vfs.VFS, size uint) error {
logger.Logger.Debug(). logger.Logger.Debug().Uint("target", size).Msg("Attempting to reclaim space using LRU GC")
Int("attempts", attempts).
Msg("GC: Too many attempts to reclaim space, giving up") var reclaimed uint // reclaimed space in bytes
return
} for {
attempts++
switch fs := vfss.(type) { switch fs := vfss.(type) {
case *disk.DiskFS: case *disk.DiskFS:
fi := fs.LRU.Back() fi := fs.LRU.Back()
if fi == nil { if fi == nil {
break return ErrInsufficientSpace // No files to delete
} }
sz := uint(fi.Size()) sz := uint(fi.Size())
err := fs.Delete(fi.Name()) err := fs.Delete(fi.Name())
if err != nil { if err != nil {
continue continue // If delete fails, try the next file
} }
reclaimed += sz reclaimed += sz
deletions++
case *memory.MemoryFS: case *memory.MemoryFS:
fi := fs.LRU.Back() fi := fs.LRU.Back()
if fi == nil { if fi == nil {
break return ErrInsufficientSpace // No files to delete
} }
sz := uint(fi.Size()) sz := uint(fi.Size())
err := fs.Delete(fi.Name()) err := fs.Delete(fi.Name())
if err != nil { if err != nil {
continue continue // If delete fails, try the next file
} }
reclaimed += sz reclaimed += sz
deletions++
default: default:
// Fallback to old method if not supported panic("unreachable: unsupported VFS type for LRU GC") // panic if the VFS is not disk or memory
stats := vfss.StatAll() }
if len(stats) == 0 {
break if reclaimed >= size {
} logger.Logger.Debug().Uint("target", size).Uint("achieved", reclaimed).Msg("Reclaimed enough space using LRU GC")
fi := stats[0] // Assume sorted or pick first return nil // stop if enough space is reclaimed
sz := uint(fi.Size())
err := vfss.Delete(fi.Name())
if err != nil {
continue
}
reclaimed += sz
deletions++
} }
} }
} }
@@ -80,36 +69,39 @@ var _ vfs.VFS = (*GCFS)(nil)
// GCFS is a virtual file system that calls a GC handler when the disk is full. The GC handler is responsible for freeing up space on the disk. The GCFS is a wrapper around another VFS. // GCFS is a virtual file system that calls a GC handler when the disk is full. The GC handler is responsible for freeing up space on the disk. The GCFS is a wrapper around another VFS.
type GCFS struct { type GCFS struct {
vfs.VFS vfs.VFS
multiplier int
// protected by mu
gcHanderFunc GCHandlerFunc gcHanderFunc GCHandlerFunc
} }
// GCHandlerFunc is a function that is called when the disk is full and the GCFS needs to free up space. It is passed the VFS and the size of the file that needs to be written. Its up to the implementation to free up space. How much space is freed is also up to the implementation. // GCHandlerFunc is a function that is called when the disk is full and the GCFS needs to free up space. It is passed the VFS and the size of the file that needs to be written. Its up to the implementation to free up space. How much space is freed is also up to the implementation.
type GCHandlerFunc func(vfs vfs.VFS, size uint) type GCHandlerFunc func(vfs vfs.VFS, size uint) error
func New(vfs vfs.VFS, multiplier int, gcHandlerFunc GCHandlerFunc) *GCFS { func New(vfs vfs.VFS, gcHandlerFunc GCHandlerFunc) *GCFS {
if multiplier <= 0 {
multiplier = 1 // if the multiplier is less than or equal to 0 set it to 1 will be slow but the user can set it to a higher value if they want
}
return &GCFS{ return &GCFS{
VFS: vfs, VFS: vfs,
multiplier: multiplier,
gcHanderFunc: gcHandlerFunc, gcHanderFunc: gcHandlerFunc,
} }
} }
// Create overrides the Create method of the VFS interface. It tries to create the key, if it fails due to disk full error, it calls the GC handler and tries again. If it still fails it returns the error. // Create overrides the Create method of the VFS interface. It tries to create the key, if it fails due to disk full error, it calls the GC handler and tries again. If it still fails it returns the error.
func (g *GCFS) Create(key string, size int64) (io.WriteCloser, error) { func (g *GCFS) Create(key string, size int64) (io.WriteCloser, error) {
w, err := g.VFS.Create(key, size) // try to create the key w, err := g.VFS.Create(key, size) // try to create the key
for err == vfserror.ErrDiskFull && g.gcHanderFunc != nil { // if the error is disk full and there is a GC handler
// if it fails due to disk full error, call the GC handler and try again in a loop that will continue until it succeeds or the error is not disk full errr := g.gcHanderFunc(g.VFS, uint(size)) // call the GC handler
if err == vfserror.ErrDiskFull && g.gcHanderFunc != nil { // if the error is disk full and there is a GC handler if errr == ErrInsufficientSpace {
g.gcHanderFunc(g.VFS, uint(size*int64(g.multiplier))) // call the GC handler return nil, errr // if the GC handler returns no files to delete, return the error
}
w, err = g.VFS.Create(key, size) w, err = g.VFS.Create(key, size)
} }
if err != nil {
if err == vfserror.ErrDiskFull {
logger.Logger.Error().Str("key", key).Int64("size", size).Msg("Failed to create file due to disk full, even after GC")
} else {
logger.Logger.Error().Str("key", key).Int64("size", size).Err(err).Msg("Failed to create file")
}
}
return w, err return w, err
} }

View File

@@ -5,13 +5,12 @@ import (
"errors" "errors"
"fmt" "fmt"
"s1d3sw1ped/SteamCache2/vfs/memory" "s1d3sw1ped/SteamCache2/vfs/memory"
"s1d3sw1ped/SteamCache2/vfs/vfserror"
"testing" "testing"
) )
func TestGCOnFull(t *testing.T) { func TestGCOnFull(t *testing.T) {
m := memory.New(10) m := memory.New(10)
gc := New(m, 2, LRUGC) gc := New(m, LRUGC)
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
w, err := gc.Create(fmt.Sprintf("key%d", i), 2) w, err := gc.Create(fmt.Sprintf("key%d", i), 2)
@@ -43,7 +42,7 @@ func TestGCOnFull(t *testing.T) {
func TestNoGCNeeded(t *testing.T) { func TestNoGCNeeded(t *testing.T) {
m := memory.New(20) m := memory.New(20)
gc := New(m, 2, LRUGC) gc := New(m, LRUGC)
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
w, err := gc.Create(fmt.Sprintf("key%d", i), 2) w, err := gc.Create(fmt.Sprintf("key%d", i), 2)
@@ -61,13 +60,13 @@ func TestNoGCNeeded(t *testing.T) {
func TestGCInsufficientSpace(t *testing.T) { func TestGCInsufficientSpace(t *testing.T) {
m := memory.New(5) m := memory.New(5)
gc := New(m, 1, LRUGC) gc := New(m, LRUGC)
w, err := gc.Create("key0", 10) w, err := gc.Create("key0", 10)
if err == nil { if err == nil {
w.Close() w.Close()
t.Error("Expected ErrDiskFull") t.Error("Expected ErrDiskFull")
} else if !errors.Is(err, vfserror.ErrDiskFull) { } else if !errors.Is(err, ErrInsufficientSpace) {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
} }