Refactor disk initialization and file processing in DiskFS
All checks were successful
Release Tag / release (push) Successful in 9s

- Replaced legacy depot file migration logic with concurrent directory scanning for improved performance.
- Introduced batch processing of files to minimize lock contention during initialization.
- Simplified the init function by removing unnecessary complexity and focusing on efficient file handling.
- Enhanced logging to provide better insights into directory scan progress and completion.
This commit is contained in:
2025-09-22 00:51:51 -05:00
parent bd123bc63a
commit 9b2affe95a

View File

@@ -13,6 +13,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/docker/go-units"
@@ -167,56 +168,15 @@ func New(root string, capacity int64) *DiskFS {
return d
}
// init loads existing files from disk and migrates legacy depot files to sharded structure
// init loads existing files from disk
func (d *DiskFS) init() {
tstart := time.Now()
var depotFiles []string // Track depot files that need migration
// Use concurrent directory scanning for blazing fast initialization
fileInfos := d.scanDirectoryConcurrently()
err := filepath.Walk(d.root, func(npath string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
d.mu.Lock()
// Extract key from sharded path: remove root and convert sharding back
// Handle both "./disk" and "disk" root paths
rootPath := d.root
rootPath = strings.TrimPrefix(rootPath, "./")
relPath := strings.ReplaceAll(npath[len(rootPath)+1:], "\\", "/")
// Extract the original key from the sharded path
k := d.extractKeyFromPath(relPath)
fi := vfs.NewFileInfoFromOS(info, k)
d.info[k] = fi
d.LRU.Add(k, fi)
// Initialize access time with file modification time
fi.UpdateAccessBatched(d.timeUpdater)
d.size += info.Size()
// Track depot files for potential migration
if strings.HasPrefix(relPath, "depot/") {
depotFiles = append(depotFiles, relPath)
}
d.mu.Unlock()
return nil
})
if err != nil {
logger.Logger.Error().Err(err).Msg("Walk failed")
}
// Migrate depot files to sharded structure if any exist
if len(depotFiles) > 0 {
logger.Logger.Info().Int("count", len(depotFiles)).Msg("Found legacy depot files, starting migration")
d.migrateDepotFiles(depotFiles)
}
// Batch process all files to minimize lock contention
d.batchProcessFiles(fileInfos)
logger.Logger.Info().
Str("name", d.Name()).
@@ -228,68 +188,201 @@ func (d *DiskFS) init() {
Msg("init")
}
// migrateDepotFiles moves legacy depot files to the sharded steam structure
func (d *DiskFS) migrateDepotFiles(depotFiles []string) {
migratedCount := 0
errorCount := 0
for _, relPath := range depotFiles {
// Extract the steam key from the depot path
steamKey := d.extractKeyFromPath(relPath)
if !strings.HasPrefix(steamKey, "steam/") {
// Skip if we can't extract a proper steam key
errorCount++
continue
}
// Get the source and destination paths
sourcePath := filepath.Join(d.root, relPath)
shardedPath := d.shardPath(steamKey)
destPath := filepath.Join(d.root, shardedPath)
// Create destination directory
destDir := filepath.Dir(destPath)
if err := os.MkdirAll(destDir, 0755); err != nil {
logger.Logger.Error().Err(err).Str("path", destDir).Msg("Failed to create migration destination directory")
errorCount++
continue
}
// Move the file
if err := os.Rename(sourcePath, destPath); err != nil {
logger.Logger.Error().Err(err).Str("from", sourcePath).Str("to", destPath).Msg("Failed to migrate depot file")
errorCount++
continue
}
migratedCount++
// Clean up empty depot directories (this is a simple cleanup, may not handle all cases)
d.cleanupEmptyDepotDirs(filepath.Dir(sourcePath))
}
logger.Logger.Info().
Int("migrated", migratedCount).
Int("errors", errorCount).
Msg("Depot file migration completed")
// fileInfo represents a file found during directory scanning
type fileInfo struct {
path string
relPath string
key string
size int64
modTime time.Time
isDepot bool
}
// cleanupEmptyDepotDirs removes empty depot directories after migration
func (d *DiskFS) cleanupEmptyDepotDirs(dirPath string) {
for dirPath != d.root && strings.HasPrefix(dirPath, filepath.Join(d.root, "depot")) {
entries, err := os.ReadDir(dirPath)
if err != nil || len(entries) > 0 {
break
// scanDirectoryConcurrently performs fast concurrent directory scanning
func (d *DiskFS) scanDirectoryConcurrently() []fileInfo {
// Channel for collecting file information
fileChan := make(chan fileInfo, 1000)
// Progress tracking
var totalFiles int64
var processedFiles int64
progressTicker := time.NewTicker(500 * time.Millisecond)
defer progressTicker.Stop()
// Wait group for workers
var wg sync.WaitGroup
// Start directory scanner
wg.Add(1)
go func() {
defer wg.Done()
defer close(fileChan)
d.scanDirectoryRecursive(d.root, fileChan, &totalFiles)
}()
// Collect results with progress reporting
var fileInfos []fileInfo
// Use a separate goroutine to collect results
done := make(chan struct{})
go func() {
defer close(done)
for {
select {
case fi, ok := <-fileChan:
if !ok {
return
}
fileInfos = append(fileInfos, fi)
processedFiles++
case <-progressTicker.C:
if totalFiles > 0 {
logger.Logger.Debug().
Int64("processed", processedFiles).
Int64("total", totalFiles).
Float64("progress", float64(processedFiles)/float64(totalFiles)*100).
Msg("Directory scan progress")
}
}
}
}()
// Wait for scanning to complete
wg.Wait()
<-done
logger.Logger.Info().
Int64("files_scanned", processedFiles).
Msg("Directory scan completed")
return fileInfos
}
// scanDirectoryRecursive performs recursive directory scanning with early termination
func (d *DiskFS) scanDirectoryRecursive(dirPath string, fileChan chan<- fileInfo, totalFiles *int64) {
// Use ReadDir for faster directory listing (no stat calls)
entries, err := os.ReadDir(dirPath)
if err != nil {
return
}
// Count files first for progress tracking
fileCount := 0
for _, entry := range entries {
if !entry.IsDir() {
fileCount++
}
}
atomic.AddInt64(totalFiles, int64(fileCount))
// Process entries concurrently with limited workers
semaphore := make(chan struct{}, 8) // Limit concurrent processing
var wg sync.WaitGroup
for _, entry := range entries {
entryPath := filepath.Join(dirPath, entry.Name())
if entry.IsDir() {
// Recursively scan subdirectories
wg.Add(1)
go func(path string) {
defer wg.Done()
semaphore <- struct{}{} // Acquire semaphore
defer func() { <-semaphore }() // Release semaphore
d.scanDirectoryRecursive(path, fileChan, totalFiles)
}(entryPath)
} else {
// Process file with lazy loading
wg.Add(1)
go func(path string, name string, entry os.DirEntry) {
defer wg.Done()
semaphore <- struct{}{} // Acquire semaphore
defer func() { <-semaphore }() // Release semaphore
// Extract relative path and key first (no stat call)
rootPath := d.root
rootPath = strings.TrimPrefix(rootPath, "./")
relPath := strings.ReplaceAll(path[len(rootPath)+1:], "\\", "/")
key := d.extractKeyFromPath(relPath)
// Get file info only when needed (lazy loading)
info, err := entry.Info()
if err != nil {
return
}
// Send file info
fileChan <- fileInfo{
path: path,
relPath: relPath,
key: key,
size: info.Size(),
modTime: info.ModTime(),
isDepot: false, // No longer tracking depot files
}
}(entryPath, entry.Name(), entry)
}
}
wg.Wait()
}
// batchProcessFiles processes all files in batches to minimize lock contention
func (d *DiskFS) batchProcessFiles(fileInfos []fileInfo) {
const batchSize = 1000 // Process files in batches
// Sort files by key for consistent ordering
sort.Slice(fileInfos, func(i, j int) bool {
return fileInfos[i].key < fileInfos[j].key
})
// Process in batches with progress reporting
totalBatches := (len(fileInfos) + batchSize - 1) / batchSize
for i := 0; i < len(fileInfos); i += batchSize {
end := i + batchSize
if end > len(fileInfos) {
end = len(fileInfos)
}
// Directory is empty, remove it
if err := os.Remove(dirPath); err != nil {
logger.Logger.Error().Err(err).Str("dir", dirPath).Msg("Failed to remove empty depot directory")
break
batch := fileInfos[i:end]
d.processBatch(batch)
// Log progress every 10 batches
if (i/batchSize+1)%10 == 0 || i+batchSize >= len(fileInfos) {
logger.Logger.Debug().
Int("batch", i/batchSize+1).
Int("total_batches", totalBatches).
Int("files_processed", end).
Int("total_files", len(fileInfos)).
Msg("Batch processing progress")
}
}
}
// processBatch processes a batch of files with a single lock acquisition
func (d *DiskFS) processBatch(batch []fileInfo) {
d.mu.Lock()
defer d.mu.Unlock()
for _, fi := range batch {
// Create FileInfo from batch data
fileInfo := &vfs.FileInfo{
Key: fi.key,
Size: fi.size,
CTime: fi.modTime,
ATime: fi.modTime,
AccessCount: 1,
}
// Move up to parent directory
dirPath = filepath.Dir(dirPath)
// Add to maps
d.info[fi.key] = fileInfo
d.LRU.Add(fi.key, fileInfo)
// Initialize access time
fileInfo.UpdateAccessBatched(d.timeUpdater)
// Update total size
d.size += fi.size
}
}