Merge pull request 'fix: gc was being stupid allowing another thread to take the space it made before it could not anymore' (#5) from fix/gc-breaking-downloads into main
All checks were successful
Release Tag / release (push) Successful in 13s
All checks were successful
Release Tag / release (push) Successful in 13s
Reviewed-on: s1d3sw1ped/SteamCache2#5
This commit is contained in:
9
.vscode/launch.json
vendored
9
.vscode/launch.json
vendored
@@ -17,7 +17,8 @@
|
|||||||
"10G",
|
"10G",
|
||||||
"--disk-path",
|
"--disk-path",
|
||||||
"tmp/disk",
|
"tmp/disk",
|
||||||
"--verbose",
|
"--log-level",
|
||||||
|
"debug",
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -31,7 +32,8 @@
|
|||||||
"10G",
|
"10G",
|
||||||
"--disk-path",
|
"--disk-path",
|
||||||
"tmp/disk",
|
"tmp/disk",
|
||||||
"--verbose",
|
"--log-level",
|
||||||
|
"debug",
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -43,7 +45,8 @@
|
|||||||
"args": [
|
"args": [
|
||||||
"--memory",
|
"--memory",
|
||||||
"1G",
|
"1G",
|
||||||
"--verbose",
|
"--log-level",
|
||||||
|
"debug",
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|||||||
14
cmd/root.go
14
cmd/root.go
@@ -15,12 +15,10 @@ import (
|
|||||||
var (
|
var (
|
||||||
threads int
|
threads int
|
||||||
|
|
||||||
memory string
|
memory string
|
||||||
memorymultiplier int
|
disk string
|
||||||
disk string
|
diskpath string
|
||||||
diskmultiplier int
|
upstream string
|
||||||
diskpath string
|
|
||||||
upstream string
|
|
||||||
|
|
||||||
logLevel string
|
logLevel string
|
||||||
logFormat string
|
logFormat string
|
||||||
@@ -69,9 +67,7 @@ var rootCmd = &cobra.Command{
|
|||||||
sc := steamcache.New(
|
sc := steamcache.New(
|
||||||
address,
|
address,
|
||||||
memory,
|
memory,
|
||||||
memorymultiplier,
|
|
||||||
disk,
|
disk,
|
||||||
diskmultiplier,
|
|
||||||
diskpath,
|
diskpath,
|
||||||
upstream,
|
upstream,
|
||||||
)
|
)
|
||||||
@@ -99,9 +95,7 @@ func init() {
|
|||||||
rootCmd.Flags().IntVarP(&threads, "threads", "t", runtime.GOMAXPROCS(-1), "Number of worker threads to use for processing requests")
|
rootCmd.Flags().IntVarP(&threads, "threads", "t", runtime.GOMAXPROCS(-1), "Number of worker threads to use for processing requests")
|
||||||
|
|
||||||
rootCmd.Flags().StringVarP(&memory, "memory", "m", "0", "The size of the memory cache")
|
rootCmd.Flags().StringVarP(&memory, "memory", "m", "0", "The size of the memory cache")
|
||||||
rootCmd.Flags().IntVarP(&memorymultiplier, "memory-gc", "M", 10, "The gc value for the memory cache")
|
|
||||||
rootCmd.Flags().StringVarP(&disk, "disk", "d", "0", "The size of the disk cache")
|
rootCmd.Flags().StringVarP(&disk, "disk", "d", "0", "The size of the disk cache")
|
||||||
rootCmd.Flags().IntVarP(&diskmultiplier, "disk-gc", "D", 100, "The gc value for the disk cache")
|
|
||||||
rootCmd.Flags().StringVarP(&diskpath, "disk-path", "p", "", "The path to the disk cache")
|
rootCmd.Flags().StringVarP(&diskpath, "disk-path", "p", "", "The path to the disk cache")
|
||||||
|
|
||||||
rootCmd.Flags().StringVarP(&upstream, "upstream", "u", "", "The upstream server to proxy requests overrides the host header from the client but forwards the original host header to the upstream server")
|
rootCmd.Flags().StringVarP(&upstream, "upstream", "u", "", "The upstream server to proxy requests overrides the host header from the client but forwards the original host header to the upstream server")
|
||||||
|
|||||||
@@ -68,7 +68,7 @@ type SteamCache struct {
|
|||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(address string, memorySize string, memoryMultiplier int, diskSize string, diskMultiplier int, diskPath, upstream string) *SteamCache {
|
func New(address string, memorySize string, diskSize string, diskPath, upstream string) *SteamCache {
|
||||||
memorysize, err := units.FromHumanSize(memorySize)
|
memorysize, err := units.FromHumanSize(memorySize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@@ -87,14 +87,14 @@ func New(address string, memorySize string, memoryMultiplier int, diskSize strin
|
|||||||
var mgc *gc.GCFS
|
var mgc *gc.GCFS
|
||||||
if memorysize > 0 {
|
if memorysize > 0 {
|
||||||
m = memory.New(memorysize)
|
m = memory.New(memorysize)
|
||||||
mgc = gc.New(m, memoryMultiplier, gc.LRUGC)
|
mgc = gc.New(m, gc.LRUGC)
|
||||||
}
|
}
|
||||||
|
|
||||||
var d *disk.DiskFS
|
var d *disk.DiskFS
|
||||||
var dgc *gc.GCFS
|
var dgc *gc.GCFS
|
||||||
if disksize > 0 {
|
if disksize > 0 {
|
||||||
d = disk.New(diskPath, disksize)
|
d = disk.New(diskPath, disksize)
|
||||||
dgc = gc.New(d, diskMultiplier, gc.LRUGC)
|
dgc = gc.New(d, gc.LRUGC)
|
||||||
}
|
}
|
||||||
|
|
||||||
// configure the cache to match the specified mode (memory only, disk only, or memory and disk) based on the provided sizes
|
// configure the cache to match the specified mode (memory only, disk only, or memory and disk) based on the provided sizes
|
||||||
@@ -220,16 +220,16 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
if strings.HasPrefix(r.URL.String(), "/depot/") {
|
if strings.HasPrefix(r.URL.String(), "/depot/") {
|
||||||
// trim the query parameters from the URL path
|
// trim the query parameters from the URL path
|
||||||
// this is necessary because the cache key should not include query parameters
|
// this is necessary because the cache key should not include query parameters
|
||||||
r.URL.Path = strings.Split(r.URL.Path, "?")[0]
|
path := strings.Split(r.URL.String(), "?")[0]
|
||||||
|
|
||||||
tstart := time.Now()
|
tstart := time.Now()
|
||||||
defer func() { responseTime.Observe(time.Since(tstart).Seconds()) }()
|
defer func() { responseTime.Observe(time.Since(tstart).Seconds()) }()
|
||||||
|
|
||||||
cacheKey := strings.ReplaceAll(r.URL.String()[1:], "\\", "/") // replace all backslashes with forward slashes shouldn't be necessary but just in case
|
cacheKey := strings.ReplaceAll(path[1:], "\\", "/") // replace all backslashes with forward slashes shouldn't be necessary but just in case
|
||||||
|
|
||||||
if cacheKey == "" {
|
if cacheKey == "" {
|
||||||
requestsTotal.WithLabelValues(r.Method, "400").Inc()
|
requestsTotal.WithLabelValues(r.Method, "400").Inc()
|
||||||
logger.Logger.Warn().Str("url", r.URL.String()).Msg("Invalid URL")
|
logger.Logger.Warn().Str("url", path).Msg("Invalid URL")
|
||||||
http.Error(w, "Invalid URL", http.StatusBadRequest)
|
http.Error(w, "Invalid URL", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -258,7 +258,7 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
var req *http.Request
|
var req *http.Request
|
||||||
if sc.upstream != "" { // if an upstream server is configured, proxy the request to the upstream server
|
if sc.upstream != "" { // if an upstream server is configured, proxy the request to the upstream server
|
||||||
ur, err := url.JoinPath(sc.upstream, r.URL.String())
|
ur, err := url.JoinPath(sc.upstream, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
requestsTotal.WithLabelValues(r.Method, "500").Inc()
|
requestsTotal.WithLabelValues(r.Method, "500").Inc()
|
||||||
logger.Logger.Error().Err(err).Str("upstream", sc.upstream).Msg("Failed to join URL path")
|
logger.Logger.Error().Err(err).Str("upstream", sc.upstream).Msg("Failed to join URL path")
|
||||||
@@ -282,7 +282,7 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
host = "http://" + host
|
host = "http://" + host
|
||||||
}
|
}
|
||||||
|
|
||||||
ur, err := url.JoinPath(host, r.URL.String())
|
ur, err := url.JoinPath(host, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
requestsTotal.WithLabelValues(r.Method, "500").Inc()
|
requestsTotal.WithLabelValues(r.Method, "500").Inc()
|
||||||
logger.Logger.Error().Err(err).Str("host", host).Msg("Failed to join URL path")
|
logger.Logger.Error().Err(err).Str("host", host).Msg("Failed to join URL path")
|
||||||
@@ -328,16 +328,17 @@ func (sc *SteamCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
size := resp.ContentLength
|
size := resp.ContentLength
|
||||||
|
|
||||||
writer, err := sc.vfs.Create(cacheKey, size)
|
// this is sortof not needed as we should always be able to get a writer from the cache as long as the gc is able to reclaim enough space aka the file is not bigger than the disk can handle
|
||||||
if err != nil {
|
ww := w.(io.Writer) // default writer to write to the response writer
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
writer, _ := sc.vfs.Create(cacheKey, size) // create a writer to write to the cache
|
||||||
return
|
if writer != nil { // if the writer is not nil, it means the cache is writable
|
||||||
|
defer writer.Close() // close the writer when done
|
||||||
|
ww = io.MultiWriter(w, writer) // write to both the response writer and the cache writer
|
||||||
}
|
}
|
||||||
defer writer.Close()
|
|
||||||
|
|
||||||
w.Header().Add("X-LanCache-Status", "MISS")
|
w.Header().Add("X-LanCache-Status", "MISS")
|
||||||
|
|
||||||
io.Copy(io.MultiWriter(w, writer), resp.Body)
|
io.Copy(ww, resp.Body)
|
||||||
|
|
||||||
logger.Logger.Info().
|
logger.Logger.Info().
|
||||||
Str("key", cacheKey).
|
Str("key", cacheKey).
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ func TestCaching(t *testing.T) {
|
|||||||
|
|
||||||
os.WriteFile(filepath.Join(td, "key2"), []byte("value2"), 0644)
|
os.WriteFile(filepath.Join(td, "key2"), []byte("value2"), 0644)
|
||||||
|
|
||||||
sc := New("localhost:8080", "1G", 10, "1G", 100, td, "")
|
sc := New("localhost:8080", "1G", "1G", td, "")
|
||||||
|
|
||||||
w, err := sc.vfs.Create("key", 5)
|
w, err := sc.vfs.Create("key", 5)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -84,7 +84,7 @@ func TestCaching(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestCacheMissAndHit(t *testing.T) {
|
func TestCacheMissAndHit(t *testing.T) {
|
||||||
sc := New("localhost:8080", "0", 0, "1G", 100, t.TempDir(), "")
|
sc := New("localhost:8080", "0", "1G", t.TempDir(), "")
|
||||||
|
|
||||||
key := "testkey"
|
key := "testkey"
|
||||||
value := []byte("testvalue")
|
value := []byte("testvalue")
|
||||||
|
|||||||
82
vfs/gc/gc.go
82
vfs/gc/gc.go
@@ -13,59 +13,48 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LRUGC deletes files in LRU order until enough space is reclaimed.
|
var (
|
||||||
func LRUGC(vfss vfs.VFS, size uint) {
|
// ErrInsufficientSpace is returned when there are no files to delete in the VFS.
|
||||||
attempts := 0
|
ErrInsufficientSpace = fmt.Errorf("no files to delete")
|
||||||
deletions := 0
|
)
|
||||||
var reclaimed uint
|
|
||||||
|
|
||||||
for reclaimed < size {
|
// LRUGC deletes files in LRU order until enough space is reclaimed.
|
||||||
if attempts > 10 {
|
func LRUGC(vfss vfs.VFS, size uint) error {
|
||||||
logger.Logger.Debug().
|
logger.Logger.Debug().Uint("target", size).Msg("Attempting to reclaim space using LRU GC")
|
||||||
Int("attempts", attempts).
|
|
||||||
Msg("GC: Too many attempts to reclaim space, giving up")
|
var reclaimed uint // reclaimed space in bytes
|
||||||
return
|
|
||||||
}
|
for {
|
||||||
attempts++
|
|
||||||
switch fs := vfss.(type) {
|
switch fs := vfss.(type) {
|
||||||
case *disk.DiskFS:
|
case *disk.DiskFS:
|
||||||
fi := fs.LRU.Back()
|
fi := fs.LRU.Back()
|
||||||
if fi == nil {
|
if fi == nil {
|
||||||
break
|
return ErrInsufficientSpace // No files to delete
|
||||||
}
|
}
|
||||||
sz := uint(fi.Size())
|
sz := uint(fi.Size())
|
||||||
err := fs.Delete(fi.Name())
|
err := fs.Delete(fi.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue // If delete fails, try the next file
|
||||||
}
|
}
|
||||||
reclaimed += sz
|
reclaimed += sz
|
||||||
deletions++
|
|
||||||
case *memory.MemoryFS:
|
case *memory.MemoryFS:
|
||||||
fi := fs.LRU.Back()
|
fi := fs.LRU.Back()
|
||||||
if fi == nil {
|
if fi == nil {
|
||||||
break
|
return ErrInsufficientSpace // No files to delete
|
||||||
}
|
}
|
||||||
sz := uint(fi.Size())
|
sz := uint(fi.Size())
|
||||||
err := fs.Delete(fi.Name())
|
err := fs.Delete(fi.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue // If delete fails, try the next file
|
||||||
}
|
}
|
||||||
reclaimed += sz
|
reclaimed += sz
|
||||||
deletions++
|
|
||||||
default:
|
default:
|
||||||
// Fallback to old method if not supported
|
panic("unreachable: unsupported VFS type for LRU GC") // panic if the VFS is not disk or memory
|
||||||
stats := vfss.StatAll()
|
}
|
||||||
if len(stats) == 0 {
|
|
||||||
break
|
if reclaimed >= size {
|
||||||
}
|
logger.Logger.Debug().Uint("target", size).Uint("achieved", reclaimed).Msg("Reclaimed enough space using LRU GC")
|
||||||
fi := stats[0] // Assume sorted or pick first
|
return nil // stop if enough space is reclaimed
|
||||||
sz := uint(fi.Size())
|
|
||||||
err := vfss.Delete(fi.Name())
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
reclaimed += sz
|
|
||||||
deletions++
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -80,36 +69,39 @@ var _ vfs.VFS = (*GCFS)(nil)
|
|||||||
// GCFS is a virtual file system that calls a GC handler when the disk is full. The GC handler is responsible for freeing up space on the disk. The GCFS is a wrapper around another VFS.
|
// GCFS is a virtual file system that calls a GC handler when the disk is full. The GC handler is responsible for freeing up space on the disk. The GCFS is a wrapper around another VFS.
|
||||||
type GCFS struct {
|
type GCFS struct {
|
||||||
vfs.VFS
|
vfs.VFS
|
||||||
multiplier int
|
|
||||||
|
|
||||||
// protected by mu
|
|
||||||
gcHanderFunc GCHandlerFunc
|
gcHanderFunc GCHandlerFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// GCHandlerFunc is a function that is called when the disk is full and the GCFS needs to free up space. It is passed the VFS and the size of the file that needs to be written. Its up to the implementation to free up space. How much space is freed is also up to the implementation.
|
// GCHandlerFunc is a function that is called when the disk is full and the GCFS needs to free up space. It is passed the VFS and the size of the file that needs to be written. Its up to the implementation to free up space. How much space is freed is also up to the implementation.
|
||||||
type GCHandlerFunc func(vfs vfs.VFS, size uint)
|
type GCHandlerFunc func(vfs vfs.VFS, size uint) error
|
||||||
|
|
||||||
func New(vfs vfs.VFS, multiplier int, gcHandlerFunc GCHandlerFunc) *GCFS {
|
func New(vfs vfs.VFS, gcHandlerFunc GCHandlerFunc) *GCFS {
|
||||||
if multiplier <= 0 {
|
|
||||||
multiplier = 1 // if the multiplier is less than or equal to 0 set it to 1 will be slow but the user can set it to a higher value if they want
|
|
||||||
}
|
|
||||||
return &GCFS{
|
return &GCFS{
|
||||||
VFS: vfs,
|
VFS: vfs,
|
||||||
multiplier: multiplier,
|
|
||||||
gcHanderFunc: gcHandlerFunc,
|
gcHanderFunc: gcHandlerFunc,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create overrides the Create method of the VFS interface. It tries to create the key, if it fails due to disk full error, it calls the GC handler and tries again. If it still fails it returns the error.
|
// Create overrides the Create method of the VFS interface. It tries to create the key, if it fails due to disk full error, it calls the GC handler and tries again. If it still fails it returns the error.
|
||||||
func (g *GCFS) Create(key string, size int64) (io.WriteCloser, error) {
|
func (g *GCFS) Create(key string, size int64) (io.WriteCloser, error) {
|
||||||
w, err := g.VFS.Create(key, size) // try to create the key
|
w, err := g.VFS.Create(key, size) // try to create the key
|
||||||
|
for err == vfserror.ErrDiskFull && g.gcHanderFunc != nil { // if the error is disk full and there is a GC handler
|
||||||
// if it fails due to disk full error, call the GC handler and try again in a loop that will continue until it succeeds or the error is not disk full
|
errr := g.gcHanderFunc(g.VFS, uint(size)) // call the GC handler
|
||||||
if err == vfserror.ErrDiskFull && g.gcHanderFunc != nil { // if the error is disk full and there is a GC handler
|
if errr == ErrInsufficientSpace {
|
||||||
g.gcHanderFunc(g.VFS, uint(size*int64(g.multiplier))) // call the GC handler
|
return nil, errr // if the GC handler returns no files to delete, return the error
|
||||||
|
}
|
||||||
w, err = g.VFS.Create(key, size)
|
w, err = g.VFS.Create(key, size)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if err == vfserror.ErrDiskFull {
|
||||||
|
logger.Logger.Error().Str("key", key).Int64("size", size).Msg("Failed to create file due to disk full, even after GC")
|
||||||
|
} else {
|
||||||
|
logger.Logger.Error().Str("key", key).Int64("size", size).Err(err).Msg("Failed to create file")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return w, err
|
return w, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,13 +5,12 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"s1d3sw1ped/SteamCache2/vfs/memory"
|
"s1d3sw1ped/SteamCache2/vfs/memory"
|
||||||
"s1d3sw1ped/SteamCache2/vfs/vfserror"
|
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGCOnFull(t *testing.T) {
|
func TestGCOnFull(t *testing.T) {
|
||||||
m := memory.New(10)
|
m := memory.New(10)
|
||||||
gc := New(m, 2, LRUGC)
|
gc := New(m, LRUGC)
|
||||||
|
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
w, err := gc.Create(fmt.Sprintf("key%d", i), 2)
|
w, err := gc.Create(fmt.Sprintf("key%d", i), 2)
|
||||||
@@ -43,7 +42,7 @@ func TestGCOnFull(t *testing.T) {
|
|||||||
|
|
||||||
func TestNoGCNeeded(t *testing.T) {
|
func TestNoGCNeeded(t *testing.T) {
|
||||||
m := memory.New(20)
|
m := memory.New(20)
|
||||||
gc := New(m, 2, LRUGC)
|
gc := New(m, LRUGC)
|
||||||
|
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
w, err := gc.Create(fmt.Sprintf("key%d", i), 2)
|
w, err := gc.Create(fmt.Sprintf("key%d", i), 2)
|
||||||
@@ -61,13 +60,13 @@ func TestNoGCNeeded(t *testing.T) {
|
|||||||
|
|
||||||
func TestGCInsufficientSpace(t *testing.T) {
|
func TestGCInsufficientSpace(t *testing.T) {
|
||||||
m := memory.New(5)
|
m := memory.New(5)
|
||||||
gc := New(m, 1, LRUGC)
|
gc := New(m, LRUGC)
|
||||||
|
|
||||||
w, err := gc.Create("key0", 10)
|
w, err := gc.Create("key0", 10)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
w.Close()
|
w.Close()
|
||||||
t.Error("Expected ErrDiskFull")
|
t.Error("Expected ErrDiskFull")
|
||||||
} else if !errors.Is(err, vfserror.ErrDiskFull) {
|
} else if !errors.Is(err, ErrInsufficientSpace) {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user