From 9b2affe95a93d915ebe9fe6f92bc3a8ed7feaa92 Mon Sep 17 00:00:00 2001 From: Justin Harms Date: Mon, 22 Sep 2025 00:51:51 -0500 Subject: [PATCH] Refactor disk initialization and file processing in DiskFS - Replaced legacy depot file migration logic with concurrent directory scanning for improved performance. - Introduced batch processing of files to minimize lock contention during initialization. - Simplified the init function by removing unnecessary complexity and focusing on efficient file handling. - Enhanced logging to provide better insights into directory scan progress and completion. --- vfs/disk/disk.go | 297 +++++++++++++++++++++++++++++++---------------- 1 file changed, 195 insertions(+), 102 deletions(-) diff --git a/vfs/disk/disk.go b/vfs/disk/disk.go index 04dc4e3..409a244 100644 --- a/vfs/disk/disk.go +++ b/vfs/disk/disk.go @@ -13,6 +13,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/docker/go-units" @@ -167,56 +168,15 @@ func New(root string, capacity int64) *DiskFS { return d } -// init loads existing files from disk and migrates legacy depot files to sharded structure +// init loads existing files from disk func (d *DiskFS) init() { tstart := time.Now() - var depotFiles []string // Track depot files that need migration + // Use concurrent directory scanning for blazing fast initialization + fileInfos := d.scanDirectoryConcurrently() - err := filepath.Walk(d.root, func(npath string, info os.FileInfo, err error) error { - if err != nil { - return err - } - - if info.IsDir() { - return nil - } - - d.mu.Lock() - // Extract key from sharded path: remove root and convert sharding back - // Handle both "./disk" and "disk" root paths - rootPath := d.root - rootPath = strings.TrimPrefix(rootPath, "./") - relPath := strings.ReplaceAll(npath[len(rootPath)+1:], "\\", "/") - - // Extract the original key from the sharded path - k := d.extractKeyFromPath(relPath) - - fi := vfs.NewFileInfoFromOS(info, k) - d.info[k] = fi - d.LRU.Add(k, fi) - // Initialize access time with file modification time - fi.UpdateAccessBatched(d.timeUpdater) - d.size += info.Size() - - // Track depot files for potential migration - if strings.HasPrefix(relPath, "depot/") { - depotFiles = append(depotFiles, relPath) - } - - d.mu.Unlock() - - return nil - }) - if err != nil { - logger.Logger.Error().Err(err).Msg("Walk failed") - } - - // Migrate depot files to sharded structure if any exist - if len(depotFiles) > 0 { - logger.Logger.Info().Int("count", len(depotFiles)).Msg("Found legacy depot files, starting migration") - d.migrateDepotFiles(depotFiles) - } + // Batch process all files to minimize lock contention + d.batchProcessFiles(fileInfos) logger.Logger.Info(). Str("name", d.Name()). @@ -228,68 +188,201 @@ func (d *DiskFS) init() { Msg("init") } -// migrateDepotFiles moves legacy depot files to the sharded steam structure -func (d *DiskFS) migrateDepotFiles(depotFiles []string) { - migratedCount := 0 - errorCount := 0 - - for _, relPath := range depotFiles { - // Extract the steam key from the depot path - steamKey := d.extractKeyFromPath(relPath) - if !strings.HasPrefix(steamKey, "steam/") { - // Skip if we can't extract a proper steam key - errorCount++ - continue - } - - // Get the source and destination paths - sourcePath := filepath.Join(d.root, relPath) - shardedPath := d.shardPath(steamKey) - destPath := filepath.Join(d.root, shardedPath) - - // Create destination directory - destDir := filepath.Dir(destPath) - if err := os.MkdirAll(destDir, 0755); err != nil { - logger.Logger.Error().Err(err).Str("path", destDir).Msg("Failed to create migration destination directory") - errorCount++ - continue - } - - // Move the file - if err := os.Rename(sourcePath, destPath); err != nil { - logger.Logger.Error().Err(err).Str("from", sourcePath).Str("to", destPath).Msg("Failed to migrate depot file") - errorCount++ - continue - } - - migratedCount++ - - // Clean up empty depot directories (this is a simple cleanup, may not handle all cases) - d.cleanupEmptyDepotDirs(filepath.Dir(sourcePath)) - } - - logger.Logger.Info(). - Int("migrated", migratedCount). - Int("errors", errorCount). - Msg("Depot file migration completed") +// fileInfo represents a file found during directory scanning +type fileInfo struct { + path string + relPath string + key string + size int64 + modTime time.Time + isDepot bool } -// cleanupEmptyDepotDirs removes empty depot directories after migration -func (d *DiskFS) cleanupEmptyDepotDirs(dirPath string) { - for dirPath != d.root && strings.HasPrefix(dirPath, filepath.Join(d.root, "depot")) { - entries, err := os.ReadDir(dirPath) - if err != nil || len(entries) > 0 { - break +// scanDirectoryConcurrently performs fast concurrent directory scanning +func (d *DiskFS) scanDirectoryConcurrently() []fileInfo { + // Channel for collecting file information + fileChan := make(chan fileInfo, 1000) + + // Progress tracking + var totalFiles int64 + var processedFiles int64 + progressTicker := time.NewTicker(500 * time.Millisecond) + defer progressTicker.Stop() + + // Wait group for workers + var wg sync.WaitGroup + + // Start directory scanner + wg.Add(1) + go func() { + defer wg.Done() + defer close(fileChan) + d.scanDirectoryRecursive(d.root, fileChan, &totalFiles) + }() + + // Collect results with progress reporting + var fileInfos []fileInfo + + // Use a separate goroutine to collect results + done := make(chan struct{}) + go func() { + defer close(done) + for { + select { + case fi, ok := <-fileChan: + if !ok { + return + } + fileInfos = append(fileInfos, fi) + processedFiles++ + case <-progressTicker.C: + if totalFiles > 0 { + logger.Logger.Debug(). + Int64("processed", processedFiles). + Int64("total", totalFiles). + Float64("progress", float64(processedFiles)/float64(totalFiles)*100). + Msg("Directory scan progress") + } + } + } + }() + + // Wait for scanning to complete + wg.Wait() + <-done + + logger.Logger.Info(). + Int64("files_scanned", processedFiles). + Msg("Directory scan completed") + + return fileInfos +} + +// scanDirectoryRecursive performs recursive directory scanning with early termination +func (d *DiskFS) scanDirectoryRecursive(dirPath string, fileChan chan<- fileInfo, totalFiles *int64) { + // Use ReadDir for faster directory listing (no stat calls) + entries, err := os.ReadDir(dirPath) + if err != nil { + return + } + + // Count files first for progress tracking + fileCount := 0 + for _, entry := range entries { + if !entry.IsDir() { + fileCount++ + } + } + atomic.AddInt64(totalFiles, int64(fileCount)) + + // Process entries concurrently with limited workers + semaphore := make(chan struct{}, 8) // Limit concurrent processing + var wg sync.WaitGroup + + for _, entry := range entries { + entryPath := filepath.Join(dirPath, entry.Name()) + + if entry.IsDir() { + // Recursively scan subdirectories + wg.Add(1) + go func(path string) { + defer wg.Done() + semaphore <- struct{}{} // Acquire semaphore + defer func() { <-semaphore }() // Release semaphore + d.scanDirectoryRecursive(path, fileChan, totalFiles) + }(entryPath) + } else { + // Process file with lazy loading + wg.Add(1) + go func(path string, name string, entry os.DirEntry) { + defer wg.Done() + semaphore <- struct{}{} // Acquire semaphore + defer func() { <-semaphore }() // Release semaphore + + // Extract relative path and key first (no stat call) + rootPath := d.root + rootPath = strings.TrimPrefix(rootPath, "./") + relPath := strings.ReplaceAll(path[len(rootPath)+1:], "\\", "/") + key := d.extractKeyFromPath(relPath) + + // Get file info only when needed (lazy loading) + info, err := entry.Info() + if err != nil { + return + } + + // Send file info + fileChan <- fileInfo{ + path: path, + relPath: relPath, + key: key, + size: info.Size(), + modTime: info.ModTime(), + isDepot: false, // No longer tracking depot files + } + }(entryPath, entry.Name(), entry) + } + } + + wg.Wait() +} + +// batchProcessFiles processes all files in batches to minimize lock contention +func (d *DiskFS) batchProcessFiles(fileInfos []fileInfo) { + const batchSize = 1000 // Process files in batches + + // Sort files by key for consistent ordering + sort.Slice(fileInfos, func(i, j int) bool { + return fileInfos[i].key < fileInfos[j].key + }) + + // Process in batches with progress reporting + totalBatches := (len(fileInfos) + batchSize - 1) / batchSize + for i := 0; i < len(fileInfos); i += batchSize { + end := i + batchSize + if end > len(fileInfos) { + end = len(fileInfos) } - // Directory is empty, remove it - if err := os.Remove(dirPath); err != nil { - logger.Logger.Error().Err(err).Str("dir", dirPath).Msg("Failed to remove empty depot directory") - break + batch := fileInfos[i:end] + d.processBatch(batch) + + // Log progress every 10 batches + if (i/batchSize+1)%10 == 0 || i+batchSize >= len(fileInfos) { + logger.Logger.Debug(). + Int("batch", i/batchSize+1). + Int("total_batches", totalBatches). + Int("files_processed", end). + Int("total_files", len(fileInfos)). + Msg("Batch processing progress") + } + } +} + +// processBatch processes a batch of files with a single lock acquisition +func (d *DiskFS) processBatch(batch []fileInfo) { + d.mu.Lock() + defer d.mu.Unlock() + + for _, fi := range batch { + // Create FileInfo from batch data + fileInfo := &vfs.FileInfo{ + Key: fi.key, + Size: fi.size, + CTime: fi.modTime, + ATime: fi.modTime, + AccessCount: 1, } - // Move up to parent directory - dirPath = filepath.Dir(dirPath) + // Add to maps + d.info[fi.key] = fileInfo + d.LRU.Add(fi.key, fileInfo) + + // Initialize access time + fileInfo.UpdateAccessBatched(d.timeUpdater) + + // Update total size + d.size += fi.size } }