2 Commits
1.0.6 ... 1.0.7

Author SHA1 Message Date
00792d87a5 Merge pull request 'fix: gc was being stupid allowing another thread to take the space it made before it could not anymore' (#5) from fix/gc-breaking-downloads into main
All checks were successful
Release Tag / release (push) Successful in 13s
Reviewed-on: s1d3sw1ped/SteamCache2#5
2025-07-13 12:51:17 +00:00
3427b8f5bc fix: gc was being stupid allowing another thread to take the space it made before it could not anymore
All checks were successful
PR Check / check-and-test (pull_request) Successful in 12s
2025-07-13 07:50:22 -05:00
6 changed files with 68 additions and 79 deletions

9
.vscode/launch.json vendored
View File

@@ -17,7 +17,8 @@
"10G",
"--disk-path",
"tmp/disk",
"--verbose",
"--log-level",
"debug",
],
},
{
@@ -31,7 +32,8 @@
"10G",
"--disk-path",
"tmp/disk",
"--verbose",
"--log-level",
"debug",
],
},
{
@@ -43,7 +45,8 @@
"args": [
"--memory",
"1G",
"--verbose",
"--log-level",
"debug",
],
}
]

View File

@@ -15,12 +15,10 @@ import (
var (
threads int
memory string
memorymultiplier int
disk string
diskmultiplier int
diskpath string
upstream string
memory string
disk string
diskpath string
upstream string
logLevel string
logFormat string
@@ -69,9 +67,7 @@ var rootCmd = &cobra.Command{
sc := steamcache.New(
address,
memory,
memorymultiplier,
disk,
diskmultiplier,
diskpath,
upstream,
)
@@ -99,9 +95,7 @@ func init() {
rootCmd.Flags().IntVarP(&threads, "threads", "t", runtime.GOMAXPROCS(-1), "Number of worker threads to use for processing requests")
rootCmd.Flags().StringVarP(&memory, "memory", "m", "0", "The size of the memory cache")
rootCmd.Flags().IntVarP(&memorymultiplier, "memory-gc", "M", 10, "The gc value for the memory cache")
rootCmd.Flags().StringVarP(&disk, "disk", "d", "0", "The size of the disk cache")
rootCmd.Flags().IntVarP(&diskmultiplier, "disk-gc", "D", 100, "The gc value for the disk cache")
rootCmd.Flags().StringVarP(&diskpath, "disk-path", "p", "", "The path to the disk cache")
rootCmd.Flags().StringVarP(&upstream, "upstream", "u", "", "The upstream server to proxy requests overrides the host header from the client but forwards the original host header to the upstream server")

View File

@@ -68,7 +68,7 @@ type SteamCache struct {
wg sync.WaitGroup
}
func New(address string, memorySize string, memoryMultiplier int, diskSize string, diskMultiplier int, diskPath, upstream string) *SteamCache {
func New(address string, memorySize string, diskSize string, diskPath, upstream string) *SteamCache {
memorysize, err := units.FromHumanSize(memorySize)
if err != nil {
panic(err)
@@ -87,14 +87,14 @@ func New(address string, memorySize string, memoryMultiplier int, diskSize strin
var mgc *gc.GCFS
if memorysize > 0 {
m = memory.New(memorysize)
mgc = gc.New(m, memoryMultiplier, gc.LRUGC)
mgc = gc.New(m, gc.LRUGC)
}
var d *disk.DiskFS
var dgc *gc.GCFS
if disksize > 0 {
d = disk.New(diskPath, disksize)
dgc = gc.New(d, diskMultiplier, gc.LRUGC)
dgc = gc.New(d, gc.LRUGC)
}
// configure the cache to match the specified mode (memory only, disk only, or memory and disk) based on the provided sizes
@@ -220,16 +220,16 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.String(), "/depot/") {
// trim the query parameters from the URL path
// this is necessary because the cache key should not include query parameters
r.URL.Path = strings.Split(r.URL.Path, "?")[0]
path := strings.Split(r.URL.String(), "?")[0]
tstart := time.Now()
defer func() { responseTime.Observe(time.Since(tstart).Seconds()) }()
cacheKey := strings.ReplaceAll(r.URL.String()[1:], "\\", "/") // replace all backslashes with forward slashes shouldn't be necessary but just in case
cacheKey := strings.ReplaceAll(path[1:], "\\", "/") // replace all backslashes with forward slashes shouldn't be necessary but just in case
if cacheKey == "" {
requestsTotal.WithLabelValues(r.Method, "400").Inc()
logger.Logger.Warn().Str("url", r.URL.String()).Msg("Invalid URL")
logger.Logger.Warn().Str("url", path).Msg("Invalid URL")
http.Error(w, "Invalid URL", http.StatusBadRequest)
return
}
@@ -258,7 +258,7 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var req *http.Request
if sc.upstream != "" { // if an upstream server is configured, proxy the request to the upstream server
ur, err := url.JoinPath(sc.upstream, r.URL.String())
ur, err := url.JoinPath(sc.upstream, path)
if err != nil {
requestsTotal.WithLabelValues(r.Method, "500").Inc()
logger.Logger.Error().Err(err).Str("upstream", sc.upstream).Msg("Failed to join URL path")
@@ -282,7 +282,7 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
host = "http://" + host
}
ur, err := url.JoinPath(host, r.URL.String())
ur, err := url.JoinPath(host, path)
if err != nil {
requestsTotal.WithLabelValues(r.Method, "500").Inc()
logger.Logger.Error().Err(err).Str("host", host).Msg("Failed to join URL path")
@@ -328,16 +328,17 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
size := resp.ContentLength
writer, err := sc.vfs.Create(cacheKey, size)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
// this is sortof not needed as we should always be able to get a writer from the cache as long as the gc is able to reclaim enough space aka the file is not bigger than the disk can handle
ww := w.(io.Writer) // default writer to write to the response writer
writer, _ := sc.vfs.Create(cacheKey, size) // create a writer to write to the cache
if writer != nil { // if the writer is not nil, it means the cache is writable
defer writer.Close() // close the writer when done
ww = io.MultiWriter(w, writer) // write to both the response writer and the cache writer
}
defer writer.Close()
w.Header().Add("X-LanCache-Status", "MISS")
io.Copy(io.MultiWriter(w, writer), resp.Body)
io.Copy(ww, resp.Body)
logger.Logger.Info().
Str("key", cacheKey).

View File

@@ -13,7 +13,7 @@ func TestCaching(t *testing.T) {
os.WriteFile(filepath.Join(td, "key2"), []byte("value2"), 0644)
sc := New("localhost:8080", "1G", 10, "1G", 100, td, "")
sc := New("localhost:8080", "1G", "1G", td, "")
w, err := sc.vfs.Create("key", 5)
if err != nil {
@@ -84,7 +84,7 @@ func TestCaching(t *testing.T) {
}
func TestCacheMissAndHit(t *testing.T) {
sc := New("localhost:8080", "0", 0, "1G", 100, t.TempDir(), "")
sc := New("localhost:8080", "0", "1G", t.TempDir(), "")
key := "testkey"
value := []byte("testvalue")

View File

@@ -13,59 +13,48 @@ import (
"time"
)
// LRUGC deletes files in LRU order until enough space is reclaimed.
func LRUGC(vfss vfs.VFS, size uint) {
attempts := 0
deletions := 0
var reclaimed uint
var (
// ErrInsufficientSpace is returned when there are no files to delete in the VFS.
ErrInsufficientSpace = fmt.Errorf("no files to delete")
)
for reclaimed < size {
if attempts > 10 {
logger.Logger.Debug().
Int("attempts", attempts).
Msg("GC: Too many attempts to reclaim space, giving up")
return
}
attempts++
// LRUGC deletes files in LRU order until enough space is reclaimed.
func LRUGC(vfss vfs.VFS, size uint) error {
logger.Logger.Debug().Uint("target", size).Msg("Attempting to reclaim space using LRU GC")
var reclaimed uint // reclaimed space in bytes
for {
switch fs := vfss.(type) {
case *disk.DiskFS:
fi := fs.LRU.Back()
if fi == nil {
break
return ErrInsufficientSpace // No files to delete
}
sz := uint(fi.Size())
err := fs.Delete(fi.Name())
if err != nil {
continue
continue // If delete fails, try the next file
}
reclaimed += sz
deletions++
case *memory.MemoryFS:
fi := fs.LRU.Back()
if fi == nil {
break
return ErrInsufficientSpace // No files to delete
}
sz := uint(fi.Size())
err := fs.Delete(fi.Name())
if err != nil {
continue
continue // If delete fails, try the next file
}
reclaimed += sz
deletions++
default:
// Fallback to old method if not supported
stats := vfss.StatAll()
if len(stats) == 0 {
break
}
fi := stats[0] // Assume sorted or pick first
sz := uint(fi.Size())
err := vfss.Delete(fi.Name())
if err != nil {
continue
}
reclaimed += sz
deletions++
panic("unreachable: unsupported VFS type for LRU GC") // panic if the VFS is not disk or memory
}
if reclaimed >= size {
logger.Logger.Debug().Uint("target", size).Uint("achieved", reclaimed).Msg("Reclaimed enough space using LRU GC")
return nil // stop if enough space is reclaimed
}
}
}
@@ -80,36 +69,39 @@ var _ vfs.VFS = (*GCFS)(nil)
// GCFS is a virtual file system that calls a GC handler when the disk is full. The GC handler is responsible for freeing up space on the disk. The GCFS is a wrapper around another VFS.
type GCFS struct {
vfs.VFS
multiplier int
// protected by mu
gcHanderFunc GCHandlerFunc
}
// GCHandlerFunc is a function that is called when the disk is full and the GCFS needs to free up space. It is passed the VFS and the size of the file that needs to be written. Its up to the implementation to free up space. How much space is freed is also up to the implementation.
type GCHandlerFunc func(vfs vfs.VFS, size uint)
type GCHandlerFunc func(vfs vfs.VFS, size uint) error
func New(vfs vfs.VFS, multiplier int, gcHandlerFunc GCHandlerFunc) *GCFS {
if multiplier <= 0 {
multiplier = 1 // if the multiplier is less than or equal to 0 set it to 1 will be slow but the user can set it to a higher value if they want
}
func New(vfs vfs.VFS, gcHandlerFunc GCHandlerFunc) *GCFS {
return &GCFS{
VFS: vfs,
multiplier: multiplier,
gcHanderFunc: gcHandlerFunc,
}
}
// Create overrides the Create method of the VFS interface. It tries to create the key, if it fails due to disk full error, it calls the GC handler and tries again. If it still fails it returns the error.
func (g *GCFS) Create(key string, size int64) (io.WriteCloser, error) {
w, err := g.VFS.Create(key, size) // try to create the key
// if it fails due to disk full error, call the GC handler and try again in a loop that will continue until it succeeds or the error is not disk full
if err == vfserror.ErrDiskFull && g.gcHanderFunc != nil { // if the error is disk full and there is a GC handler
g.gcHanderFunc(g.VFS, uint(size*int64(g.multiplier))) // call the GC handler
w, err := g.VFS.Create(key, size) // try to create the key
for err == vfserror.ErrDiskFull && g.gcHanderFunc != nil { // if the error is disk full and there is a GC handler
errr := g.gcHanderFunc(g.VFS, uint(size)) // call the GC handler
if errr == ErrInsufficientSpace {
return nil, errr // if the GC handler returns no files to delete, return the error
}
w, err = g.VFS.Create(key, size)
}
if err != nil {
if err == vfserror.ErrDiskFull {
logger.Logger.Error().Str("key", key).Int64("size", size).Msg("Failed to create file due to disk full, even after GC")
} else {
logger.Logger.Error().Str("key", key).Int64("size", size).Err(err).Msg("Failed to create file")
}
}
return w, err
}

View File

@@ -5,13 +5,12 @@ import (
"errors"
"fmt"
"s1d3sw1ped/SteamCache2/vfs/memory"
"s1d3sw1ped/SteamCache2/vfs/vfserror"
"testing"
)
func TestGCOnFull(t *testing.T) {
m := memory.New(10)
gc := New(m, 2, LRUGC)
gc := New(m, LRUGC)
for i := 0; i < 5; i++ {
w, err := gc.Create(fmt.Sprintf("key%d", i), 2)
@@ -43,7 +42,7 @@ func TestGCOnFull(t *testing.T) {
func TestNoGCNeeded(t *testing.T) {
m := memory.New(20)
gc := New(m, 2, LRUGC)
gc := New(m, LRUGC)
for i := 0; i < 5; i++ {
w, err := gc.Create(fmt.Sprintf("key%d", i), 2)
@@ -61,13 +60,13 @@ func TestNoGCNeeded(t *testing.T) {
func TestGCInsufficientSpace(t *testing.T) {
m := memory.New(5)
gc := New(m, 1, LRUGC)
gc := New(m, LRUGC)
w, err := gc.Create("key0", 10)
if err == nil {
w.Close()
t.Error("Expected ErrDiskFull")
} else if !errors.Is(err, vfserror.ErrDiskFull) {
} else if !errors.Is(err, ErrInsufficientSpace) {
t.Errorf("Unexpected error: %v", err)
}
}