Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 9b2affe95a |
279
vfs/disk/disk.go
279
vfs/disk/disk.go
@@ -13,6 +13,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/go-units"
|
"github.com/docker/go-units"
|
||||||
@@ -167,56 +168,15 @@ func New(root string, capacity int64) *DiskFS {
|
|||||||
return d
|
return d
|
||||||
}
|
}
|
||||||
|
|
||||||
// init loads existing files from disk and migrates legacy depot files to sharded structure
|
// init loads existing files from disk
|
||||||
func (d *DiskFS) init() {
|
func (d *DiskFS) init() {
|
||||||
tstart := time.Now()
|
tstart := time.Now()
|
||||||
|
|
||||||
var depotFiles []string // Track depot files that need migration
|
// Use concurrent directory scanning for blazing fast initialization
|
||||||
|
fileInfos := d.scanDirectoryConcurrently()
|
||||||
|
|
||||||
err := filepath.Walk(d.root, func(npath string, info os.FileInfo, err error) error {
|
// Batch process all files to minimize lock contention
|
||||||
if err != nil {
|
d.batchProcessFiles(fileInfos)
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if info.IsDir() {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
d.mu.Lock()
|
|
||||||
// Extract key from sharded path: remove root and convert sharding back
|
|
||||||
// Handle both "./disk" and "disk" root paths
|
|
||||||
rootPath := d.root
|
|
||||||
rootPath = strings.TrimPrefix(rootPath, "./")
|
|
||||||
relPath := strings.ReplaceAll(npath[len(rootPath)+1:], "\\", "/")
|
|
||||||
|
|
||||||
// Extract the original key from the sharded path
|
|
||||||
k := d.extractKeyFromPath(relPath)
|
|
||||||
|
|
||||||
fi := vfs.NewFileInfoFromOS(info, k)
|
|
||||||
d.info[k] = fi
|
|
||||||
d.LRU.Add(k, fi)
|
|
||||||
// Initialize access time with file modification time
|
|
||||||
fi.UpdateAccessBatched(d.timeUpdater)
|
|
||||||
d.size += info.Size()
|
|
||||||
|
|
||||||
// Track depot files for potential migration
|
|
||||||
if strings.HasPrefix(relPath, "depot/") {
|
|
||||||
depotFiles = append(depotFiles, relPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
d.mu.Unlock()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
logger.Logger.Error().Err(err).Msg("Walk failed")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Migrate depot files to sharded structure if any exist
|
|
||||||
if len(depotFiles) > 0 {
|
|
||||||
logger.Logger.Info().Int("count", len(depotFiles)).Msg("Found legacy depot files, starting migration")
|
|
||||||
d.migrateDepotFiles(depotFiles)
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Logger.Info().
|
logger.Logger.Info().
|
||||||
Str("name", d.Name()).
|
Str("name", d.Name()).
|
||||||
@@ -228,68 +188,201 @@ func (d *DiskFS) init() {
|
|||||||
Msg("init")
|
Msg("init")
|
||||||
}
|
}
|
||||||
|
|
||||||
// migrateDepotFiles moves legacy depot files to the sharded steam structure
|
// fileInfo represents a file found during directory scanning
|
||||||
func (d *DiskFS) migrateDepotFiles(depotFiles []string) {
|
type fileInfo struct {
|
||||||
migratedCount := 0
|
path string
|
||||||
errorCount := 0
|
relPath string
|
||||||
|
key string
|
||||||
for _, relPath := range depotFiles {
|
size int64
|
||||||
// Extract the steam key from the depot path
|
modTime time.Time
|
||||||
steamKey := d.extractKeyFromPath(relPath)
|
isDepot bool
|
||||||
if !strings.HasPrefix(steamKey, "steam/") {
|
|
||||||
// Skip if we can't extract a proper steam key
|
|
||||||
errorCount++
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the source and destination paths
|
// scanDirectoryConcurrently performs fast concurrent directory scanning
|
||||||
sourcePath := filepath.Join(d.root, relPath)
|
func (d *DiskFS) scanDirectoryConcurrently() []fileInfo {
|
||||||
shardedPath := d.shardPath(steamKey)
|
// Channel for collecting file information
|
||||||
destPath := filepath.Join(d.root, shardedPath)
|
fileChan := make(chan fileInfo, 1000)
|
||||||
|
|
||||||
// Create destination directory
|
// Progress tracking
|
||||||
destDir := filepath.Dir(destPath)
|
var totalFiles int64
|
||||||
if err := os.MkdirAll(destDir, 0755); err != nil {
|
var processedFiles int64
|
||||||
logger.Logger.Error().Err(err).Str("path", destDir).Msg("Failed to create migration destination directory")
|
progressTicker := time.NewTicker(500 * time.Millisecond)
|
||||||
errorCount++
|
defer progressTicker.Stop()
|
||||||
continue
|
|
||||||
|
// Wait group for workers
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// Start directory scanner
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
defer close(fileChan)
|
||||||
|
d.scanDirectoryRecursive(d.root, fileChan, &totalFiles)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Collect results with progress reporting
|
||||||
|
var fileInfos []fileInfo
|
||||||
|
|
||||||
|
// Use a separate goroutine to collect results
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(done)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case fi, ok := <-fileChan:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
fileInfos = append(fileInfos, fi)
|
||||||
// Move the file
|
processedFiles++
|
||||||
if err := os.Rename(sourcePath, destPath); err != nil {
|
case <-progressTicker.C:
|
||||||
logger.Logger.Error().Err(err).Str("from", sourcePath).Str("to", destPath).Msg("Failed to migrate depot file")
|
if totalFiles > 0 {
|
||||||
errorCount++
|
logger.Logger.Debug().
|
||||||
continue
|
Int64("processed", processedFiles).
|
||||||
|
Int64("total", totalFiles).
|
||||||
|
Float64("progress", float64(processedFiles)/float64(totalFiles)*100).
|
||||||
|
Msg("Directory scan progress")
|
||||||
}
|
}
|
||||||
|
|
||||||
migratedCount++
|
|
||||||
|
|
||||||
// Clean up empty depot directories (this is a simple cleanup, may not handle all cases)
|
|
||||||
d.cleanupEmptyDepotDirs(filepath.Dir(sourcePath))
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for scanning to complete
|
||||||
|
wg.Wait()
|
||||||
|
<-done
|
||||||
|
|
||||||
logger.Logger.Info().
|
logger.Logger.Info().
|
||||||
Int("migrated", migratedCount).
|
Int64("files_scanned", processedFiles).
|
||||||
Int("errors", errorCount).
|
Msg("Directory scan completed")
|
||||||
Msg("Depot file migration completed")
|
|
||||||
|
return fileInfos
|
||||||
}
|
}
|
||||||
|
|
||||||
// cleanupEmptyDepotDirs removes empty depot directories after migration
|
// scanDirectoryRecursive performs recursive directory scanning with early termination
|
||||||
func (d *DiskFS) cleanupEmptyDepotDirs(dirPath string) {
|
func (d *DiskFS) scanDirectoryRecursive(dirPath string, fileChan chan<- fileInfo, totalFiles *int64) {
|
||||||
for dirPath != d.root && strings.HasPrefix(dirPath, filepath.Join(d.root, "depot")) {
|
// Use ReadDir for faster directory listing (no stat calls)
|
||||||
entries, err := os.ReadDir(dirPath)
|
entries, err := os.ReadDir(dirPath)
|
||||||
if err != nil || len(entries) > 0 {
|
if err != nil {
|
||||||
break
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Directory is empty, remove it
|
// Count files first for progress tracking
|
||||||
if err := os.Remove(dirPath); err != nil {
|
fileCount := 0
|
||||||
logger.Logger.Error().Err(err).Str("dir", dirPath).Msg("Failed to remove empty depot directory")
|
for _, entry := range entries {
|
||||||
break
|
if !entry.IsDir() {
|
||||||
|
fileCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
atomic.AddInt64(totalFiles, int64(fileCount))
|
||||||
|
|
||||||
|
// Process entries concurrently with limited workers
|
||||||
|
semaphore := make(chan struct{}, 8) // Limit concurrent processing
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for _, entry := range entries {
|
||||||
|
entryPath := filepath.Join(dirPath, entry.Name())
|
||||||
|
|
||||||
|
if entry.IsDir() {
|
||||||
|
// Recursively scan subdirectories
|
||||||
|
wg.Add(1)
|
||||||
|
go func(path string) {
|
||||||
|
defer wg.Done()
|
||||||
|
semaphore <- struct{}{} // Acquire semaphore
|
||||||
|
defer func() { <-semaphore }() // Release semaphore
|
||||||
|
d.scanDirectoryRecursive(path, fileChan, totalFiles)
|
||||||
|
}(entryPath)
|
||||||
|
} else {
|
||||||
|
// Process file with lazy loading
|
||||||
|
wg.Add(1)
|
||||||
|
go func(path string, name string, entry os.DirEntry) {
|
||||||
|
defer wg.Done()
|
||||||
|
semaphore <- struct{}{} // Acquire semaphore
|
||||||
|
defer func() { <-semaphore }() // Release semaphore
|
||||||
|
|
||||||
|
// Extract relative path and key first (no stat call)
|
||||||
|
rootPath := d.root
|
||||||
|
rootPath = strings.TrimPrefix(rootPath, "./")
|
||||||
|
relPath := strings.ReplaceAll(path[len(rootPath)+1:], "\\", "/")
|
||||||
|
key := d.extractKeyFromPath(relPath)
|
||||||
|
|
||||||
|
// Get file info only when needed (lazy loading)
|
||||||
|
info, err := entry.Info()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Move up to parent directory
|
// Send file info
|
||||||
dirPath = filepath.Dir(dirPath)
|
fileChan <- fileInfo{
|
||||||
|
path: path,
|
||||||
|
relPath: relPath,
|
||||||
|
key: key,
|
||||||
|
size: info.Size(),
|
||||||
|
modTime: info.ModTime(),
|
||||||
|
isDepot: false, // No longer tracking depot files
|
||||||
|
}
|
||||||
|
}(entryPath, entry.Name(), entry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// batchProcessFiles processes all files in batches to minimize lock contention
|
||||||
|
func (d *DiskFS) batchProcessFiles(fileInfos []fileInfo) {
|
||||||
|
const batchSize = 1000 // Process files in batches
|
||||||
|
|
||||||
|
// Sort files by key for consistent ordering
|
||||||
|
sort.Slice(fileInfos, func(i, j int) bool {
|
||||||
|
return fileInfos[i].key < fileInfos[j].key
|
||||||
|
})
|
||||||
|
|
||||||
|
// Process in batches with progress reporting
|
||||||
|
totalBatches := (len(fileInfos) + batchSize - 1) / batchSize
|
||||||
|
for i := 0; i < len(fileInfos); i += batchSize {
|
||||||
|
end := i + batchSize
|
||||||
|
if end > len(fileInfos) {
|
||||||
|
end = len(fileInfos)
|
||||||
|
}
|
||||||
|
|
||||||
|
batch := fileInfos[i:end]
|
||||||
|
d.processBatch(batch)
|
||||||
|
|
||||||
|
// Log progress every 10 batches
|
||||||
|
if (i/batchSize+1)%10 == 0 || i+batchSize >= len(fileInfos) {
|
||||||
|
logger.Logger.Debug().
|
||||||
|
Int("batch", i/batchSize+1).
|
||||||
|
Int("total_batches", totalBatches).
|
||||||
|
Int("files_processed", end).
|
||||||
|
Int("total_files", len(fileInfos)).
|
||||||
|
Msg("Batch processing progress")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// processBatch processes a batch of files with a single lock acquisition
|
||||||
|
func (d *DiskFS) processBatch(batch []fileInfo) {
|
||||||
|
d.mu.Lock()
|
||||||
|
defer d.mu.Unlock()
|
||||||
|
|
||||||
|
for _, fi := range batch {
|
||||||
|
// Create FileInfo from batch data
|
||||||
|
fileInfo := &vfs.FileInfo{
|
||||||
|
Key: fi.key,
|
||||||
|
Size: fi.size,
|
||||||
|
CTime: fi.modTime,
|
||||||
|
ATime: fi.modTime,
|
||||||
|
AccessCount: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add to maps
|
||||||
|
d.info[fi.key] = fileInfo
|
||||||
|
d.LRU.Add(fi.key, fileInfo)
|
||||||
|
|
||||||
|
// Initialize access time
|
||||||
|
fileInfo.UpdateAccessBatched(d.timeUpdater)
|
||||||
|
|
||||||
|
// Update total size
|
||||||
|
d.size += fi.size
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user