Enhance logging and context handling in job management. Introduce a logger initialization with configurable parameters in the manager and runner commands. Update job context handling to use tar files instead of tar.gz, and implement ETag generation for improved caching. Refactor API endpoints to support new context file structure and enhance error handling in job submissions. Add support for unhide objects and auto-execution options in job creation requests.

This commit is contained in:
2025-11-24 21:48:05 -06:00
parent a029714e08
commit 4ac05d50a1
23 changed files with 4133 additions and 1311 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -4,8 +4,8 @@ import (
"archive/tar"
"bufio"
"bytes"
"compress/gzip"
"database/sql"
_ "embed"
"encoding/json"
"errors"
"fmt"
@@ -17,6 +17,7 @@ import (
"path/filepath"
"strings"
"jiggablend/pkg/scripts"
"jiggablend/pkg/types"
)
@@ -169,22 +170,26 @@ func (s *Server) handleGetJobMetadata(w http.ResponseWriter, r *http.Request) {
// extractMetadataFromContext extracts metadata from the blend file in a context archive
// Returns the extracted metadata or an error
func (s *Server) extractMetadataFromContext(jobID int64) (*types.BlendMetadata, error) {
contextPath := filepath.Join(s.storage.JobPath(jobID), "context.tar.gz")
contextPath := filepath.Join(s.storage.JobPath(jobID), "context.tar")
// Check if context exists
if _, err := os.Stat(contextPath); err != nil {
return nil, fmt.Errorf("context archive not found: %w", err)
}
// Create temporary directory for extraction
tmpDir, err := os.MkdirTemp("", fmt.Sprintf("fuego-metadata-%d-*", jobID))
// Create temporary directory for extraction under storage base path
tmpDir, err := s.storage.TempDir(fmt.Sprintf("jiggablend-metadata-%d-*", jobID))
if err != nil {
return nil, fmt.Errorf("failed to create temporary directory: %w", err)
}
defer os.RemoveAll(tmpDir)
defer func() {
if err := os.RemoveAll(tmpDir); err != nil {
log.Printf("Warning: Failed to clean up temp directory %s: %v", tmpDir, err)
}
}()
// Extract context archive
if err := s.extractTarGz(contextPath, tmpDir); err != nil {
if err := s.extractTar(contextPath, tmpDir); err != nil {
return nil, fmt.Errorf("failed to extract context: %w", err)
}
@@ -228,188 +233,20 @@ func (s *Server) extractMetadataFromContext(jobID int64) (*types.BlendMetadata,
return nil, fmt.Errorf("no .blend file found in context")
}
// Create Python script to extract metadata
// Use embedded Python script
scriptPath := filepath.Join(tmpDir, "extract_metadata.py")
scriptContent := `import bpy
import json
import sys
# Make all file paths relative to the blend file location FIRST
# This must be done immediately after file load, before any other operations
# to prevent Blender from trying to access external files with absolute paths
try:
bpy.ops.file.make_paths_relative()
print("Made all file paths relative to blend file")
except Exception as e:
print(f"Warning: Could not make paths relative: {e}")
# Check for missing addons that the blend file requires
# Blender marks missing addons with "_missing" suffix in preferences
missing_files_info = {
"checked": False,
"has_missing": False,
"missing_files": [],
"missing_addons": []
}
try:
missing = []
for mod in bpy.context.preferences.addons:
if mod.module.endswith("_missing"):
missing.append(mod.module.rsplit("_", 1)[0])
missing_files_info["checked"] = True
if missing:
missing_files_info["has_missing"] = True
missing_files_info["missing_addons"] = missing
print("Missing add-ons required by this .blend:")
for name in missing:
print(" -", name)
else:
print("No missing add-ons detected file is headless-safe")
except Exception as e:
print(f"Warning: Could not check for missing addons: {e}")
missing_files_info["error"] = str(e)
# Get scene
scene = bpy.context.scene
# Extract frame range from scene settings
frame_start = scene.frame_start
frame_end = scene.frame_end
# Also check for actual animation range (keyframes)
# Find the earliest and latest keyframes across all objects
animation_start = None
animation_end = None
for obj in scene.objects:
if obj.animation_data and obj.animation_data.action:
action = obj.animation_data.action
if action.fcurves:
for fcurve in action.fcurves:
if fcurve.keyframe_points:
for keyframe in fcurve.keyframe_points:
frame = int(keyframe.co[0])
if animation_start is None or frame < animation_start:
animation_start = frame
if animation_end is None or frame > animation_end:
animation_end = frame
# Use animation range if available, otherwise use scene frame range
# If scene range seems wrong (start == end), prefer animation range
if animation_start is not None and animation_end is not None:
if frame_start == frame_end or (animation_start < frame_start or animation_end > frame_end):
# Use animation range if scene range is invalid or animation extends beyond it
frame_start = animation_start
frame_end = animation_end
# Extract render settings
render = scene.render
resolution_x = render.resolution_x
resolution_y = render.resolution_y
engine = scene.render.engine.upper()
# Determine output format from file format
output_format = render.image_settings.file_format
# Extract engine-specific settings
engine_settings = {}
if engine == 'CYCLES':
cycles = scene.cycles
engine_settings = {
"samples": getattr(cycles, 'samples', 128),
"use_denoising": getattr(cycles, 'use_denoising', False),
"denoising_radius": getattr(cycles, 'denoising_radius', 0),
"denoising_strength": getattr(cycles, 'denoising_strength', 0.0),
"device": getattr(cycles, 'device', 'CPU'),
"use_adaptive_sampling": getattr(cycles, 'use_adaptive_sampling', False),
"adaptive_threshold": getattr(cycles, 'adaptive_threshold', 0.01) if getattr(cycles, 'use_adaptive_sampling', False) else 0.01,
"use_fast_gi": getattr(cycles, 'use_fast_gi', False),
"light_tree": getattr(cycles, 'use_light_tree', False),
"use_light_linking": getattr(cycles, 'use_light_linking', False),
"caustics_reflective": getattr(cycles, 'caustics_reflective', False),
"caustics_refractive": getattr(cycles, 'caustics_refractive', False),
"blur_glossy": getattr(cycles, 'blur_glossy', 0.0),
"max_bounces": getattr(cycles, 'max_bounces', 12),
"diffuse_bounces": getattr(cycles, 'diffuse_bounces', 4),
"glossy_bounces": getattr(cycles, 'glossy_bounces', 4),
"transmission_bounces": getattr(cycles, 'transmission_bounces', 12),
"volume_bounces": getattr(cycles, 'volume_bounces', 0),
"transparent_max_bounces": getattr(cycles, 'transparent_max_bounces', 8),
"film_transparent": getattr(cycles, 'film_transparent', False),
"use_layer_samples": getattr(cycles, 'use_layer_samples', False),
}
elif engine == 'EEVEE' or engine == 'EEVEE_NEXT':
eevee = scene.eevee
engine_settings = {
"taa_render_samples": getattr(eevee, 'taa_render_samples', 64),
"use_bloom": getattr(eevee, 'use_bloom', False),
"bloom_threshold": getattr(eevee, 'bloom_threshold', 0.8),
"bloom_intensity": getattr(eevee, 'bloom_intensity', 0.05),
"bloom_radius": getattr(eevee, 'bloom_radius', 6.5),
"use_ssr": getattr(eevee, 'use_ssr', True),
"use_ssr_refraction": getattr(eevee, 'use_ssr_refraction', False),
"ssr_quality": getattr(eevee, 'ssr_quality', 'MEDIUM'),
"use_ssao": getattr(eevee, 'use_ssao', True),
"ssao_quality": getattr(eevee, 'ssao_quality', 'MEDIUM'),
"ssao_distance": getattr(eevee, 'ssao_distance', 0.2),
"ssao_factor": getattr(eevee, 'ssao_factor', 1.0),
"use_soft_shadows": getattr(eevee, 'use_soft_shadows', True),
"use_shadow_high_bitdepth": getattr(eevee, 'use_shadow_high_bitdepth', True),
"use_volumetric": getattr(eevee, 'use_volumetric', False),
"volumetric_tile_size": getattr(eevee, 'volumetric_tile_size', '8'),
"volumetric_samples": getattr(eevee, 'volumetric_samples', 64),
"volumetric_start": getattr(eevee, 'volumetric_start', 0.0),
"volumetric_end": getattr(eevee, 'volumetric_end', 100.0),
"use_volumetric_lights": getattr(eevee, 'use_volumetric_lights', True),
"use_volumetric_shadows": getattr(eevee, 'use_volumetric_shadows', True),
"use_gtao": getattr(eevee, 'use_gtao', False),
"gtao_quality": getattr(eevee, 'gtao_quality', 'MEDIUM'),
"use_overscan": getattr(eevee, 'use_overscan', False),
}
else:
# For other engines, extract basic samples if available
engine_settings = {
"samples": getattr(scene, 'samples', 128) if hasattr(scene, 'samples') else 128
}
# Extract scene info
camera_count = len([obj for obj in scene.objects if obj.type == 'CAMERA'])
object_count = len(scene.objects)
material_count = len(bpy.data.materials)
# Build metadata dictionary
metadata = {
"frame_start": frame_start,
"frame_end": frame_end,
"render_settings": {
"resolution_x": resolution_x,
"resolution_y": resolution_y,
"output_format": output_format,
"engine": engine.lower(),
"engine_settings": engine_settings
},
"scene_info": {
"camera_count": camera_count,
"object_count": object_count,
"material_count": material_count
},
"missing_files_info": missing_files_info
}
# Output as JSON
print(json.dumps(metadata))
sys.stdout.flush()
`
if err := os.WriteFile(scriptPath, []byte(scriptContent), 0644); err != nil {
if err := os.WriteFile(scriptPath, []byte(scripts.ExtractMetadata), 0644); err != nil {
return nil, fmt.Errorf("failed to create extraction script: %w", err)
}
// Make blend file path relative to tmpDir to avoid path resolution issues
blendFileRel, err := filepath.Rel(tmpDir, blendFile)
if err != nil {
return nil, fmt.Errorf("failed to get relative path for blend file: %w", err)
}
// Execute Blender with Python script
cmd := exec.Command("blender", "-b", blendFile, "--python", scriptPath)
cmd := exec.Command("blender", "-b", blendFileRel, "--python", "extract_metadata.py")
cmd.Dir = tmpDir
// Capture stdout and stderr
@@ -443,14 +280,16 @@ sys.stdout.flush()
}
}()
// Stream stderr (discard for now, but could log if needed)
// Capture stderr for error reporting
var stderrBuffer bytes.Buffer
stderrDone := make(chan bool)
go func() {
defer close(stderrDone)
scanner := bufio.NewScanner(stderrPipe)
for scanner.Scan() {
// Could log stderr if needed
_ = scanner.Text()
line := scanner.Text()
stderrBuffer.WriteString(line)
stderrBuffer.WriteString("\n")
}
}()
@@ -462,6 +301,18 @@ sys.stdout.flush()
<-stderrDone
if err != nil {
stderrOutput := strings.TrimSpace(stderrBuffer.String())
stdoutOutput := strings.TrimSpace(stdoutBuffer.String())
log.Printf("Blender metadata extraction failed for job %d:", jobID)
if stderrOutput != "" {
log.Printf("Blender stderr: %s", stderrOutput)
}
if stdoutOutput != "" {
log.Printf("Blender stdout (last 500 chars): %s", truncateString(stdoutOutput, 500))
}
if stderrOutput != "" {
return nil, fmt.Errorf("blender metadata extraction failed: %w (stderr: %s)", err, truncateString(stderrOutput, 200))
}
return nil, fmt.Errorf("blender metadata extraction failed: %w", err)
}
@@ -484,21 +335,25 @@ sys.stdout.flush()
return &metadata, nil
}
// extractTarGz extracts a tar.gz archive to a destination directory
func (s *Server) extractTarGz(tarGzPath, destDir string) error {
file, err := os.Open(tarGzPath)
// extractTar extracts a tar archive to a destination directory
func (s *Server) extractTar(tarPath, destDir string) error {
log.Printf("Extracting tar archive: %s -> %s", tarPath, destDir)
// Ensure destination directory exists
if err := os.MkdirAll(destDir, 0755); err != nil {
return fmt.Errorf("failed to create destination directory: %w", err)
}
file, err := os.Open(tarPath)
if err != nil {
return fmt.Errorf("failed to open archive: %w", err)
}
defer file.Close()
gzr, err := gzip.NewReader(file)
if err != nil {
return fmt.Errorf("failed to create gzip reader: %w", err)
}
defer gzr.Close()
tr := tar.NewReader(file)
tr := tar.NewReader(gzr)
fileCount := 0
dirCount := 0
for {
header, err := tr.Next()
@@ -511,9 +366,13 @@ func (s *Server) extractTarGz(tarGzPath, destDir string) error {
// Sanitize path to prevent directory traversal
target := filepath.Join(destDir, header.Name)
// Ensure target is within destDir
if !strings.HasPrefix(filepath.Clean(target), filepath.Clean(destDir)+string(os.PathSeparator)) {
return fmt.Errorf("invalid file path in archive: %s", header.Name)
cleanTarget := filepath.Clean(target)
cleanDestDir := filepath.Clean(destDir)
if !strings.HasPrefix(cleanTarget, cleanDestDir+string(os.PathSeparator)) && cleanTarget != cleanDestDir {
log.Printf("ERROR: Invalid file path in TAR - target: %s, destDir: %s", cleanTarget, cleanDestDir)
return fmt.Errorf("invalid file path in archive: %s (target: %s, destDir: %s)", header.Name, cleanTarget, cleanDestDir)
}
// Create parent directories
@@ -527,14 +386,18 @@ func (s *Server) extractTarGz(tarGzPath, destDir string) error {
if err != nil {
return fmt.Errorf("failed to create file: %w", err)
}
if _, err := io.Copy(outFile, tr); err != nil {
_, err = io.Copy(outFile, tr)
if err != nil {
outFile.Close()
return fmt.Errorf("failed to write file: %w", err)
}
outFile.Close()
fileCount++
} else if header.Typeflag == tar.TypeDir {
dirCount++
}
}
log.Printf("Extraction complete: %d files, %d directories extracted to %s", fileCount, dirCount, destDir)
return nil
}

View File

@@ -9,6 +9,7 @@ import (
"log"
"math/rand"
"net/http"
"net/url"
"path/filepath"
"sort"
"strconv"
@@ -17,6 +18,7 @@ import (
"jiggablend/pkg/types"
"github.com/go-chi/chi/v5"
"github.com/gorilla/websocket"
)
@@ -287,13 +289,27 @@ func (s *Server) handleUpdateTaskStep(w http.ResponseWriter, r *http.Request) {
}
}
// Get job ID for broadcasting
var jobID int64
err = s.db.QueryRow("SELECT job_id FROM tasks WHERE id = ?", taskID).Scan(&jobID)
if err == nil {
// Broadcast step update to frontend
s.broadcastTaskUpdate(jobID, taskID, "step_update", map[string]interface{}{
"step_id": stepID,
"step_name": req.StepName,
"status": req.Status,
"duration_ms": req.DurationMs,
"error_message": req.ErrorMessage,
})
}
s.respondJSON(w, http.StatusOK, map[string]interface{}{
"step_id": stepID,
"message": "Step updated successfully",
})
}
// handleDownloadJobContext allows runners to download the job context tar.gz
// handleDownloadJobContext allows runners to download the job context tar
func (s *Server) handleDownloadJobContext(w http.ResponseWriter, r *http.Request) {
jobID, err := parseID(r, "jobId")
if err != nil {
@@ -302,7 +318,7 @@ func (s *Server) handleDownloadJobContext(w http.ResponseWriter, r *http.Request
}
// Construct the context file path
contextPath := filepath.Join(s.storage.JobPath(jobID), "context.tar.gz")
contextPath := filepath.Join(s.storage.JobPath(jobID), "context.tar")
// Check if context file exists
if !s.storage.FileExists(contextPath) {
@@ -319,9 +335,9 @@ func (s *Server) handleDownloadJobContext(w http.ResponseWriter, r *http.Request
}
defer file.Close()
// Set appropriate headers for tar.gz file
w.Header().Set("Content-Type", "application/gzip")
w.Header().Set("Content-Disposition", "attachment; filename=context.tar.gz")
// Set appropriate headers for tar file
w.Header().Set("Content-Type", "application/x-tar")
w.Header().Set("Content-Disposition", "attachment; filename=context.tar")
// Stream the file to the response
io.Copy(w, file)
@@ -356,16 +372,26 @@ func (s *Server) handleUploadFileFromRunner(w http.ResponseWriter, r *http.Reque
}
// Record in database
_, err = s.db.Exec(
var fileID int64
err = s.db.QueryRow(
`INSERT INTO job_files (job_id, file_type, file_path, file_name, file_size)
VALUES (?, ?, ?, ?, ?)`,
VALUES (?, ?, ?, ?, ?)
RETURNING id`,
jobID, types.JobFileTypeOutput, filePath, header.Filename, header.Size,
)
).Scan(&fileID)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to record file: %v", err))
return
}
// Broadcast file addition
s.broadcastJobUpdate(jobID, "file_added", map[string]interface{}{
"file_id": fileID,
"file_type": types.JobFileTypeOutput,
"file_name": header.Filename,
"file_size": header.Size,
})
s.respondJSON(w, http.StatusCreated, map[string]interface{}{
"file_path": filePath,
"file_name": header.Filename,
@@ -510,6 +536,79 @@ func (s *Server) handleGetJobMetadataForRunner(w http.ResponseWriter, r *http.Re
s.respondJSON(w, http.StatusOK, metadata)
}
// handleDownloadFileForRunner allows runners to download a file by fileName
func (s *Server) handleDownloadFileForRunner(w http.ResponseWriter, r *http.Request) {
jobID, err := parseID(r, "jobId")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Get fileName from URL path (may need URL decoding)
fileName := chi.URLParam(r, "fileName")
if fileName == "" {
s.respondError(w, http.StatusBadRequest, "fileName is required")
return
}
// URL decode the fileName in case it contains encoded characters
decodedFileName, err := url.QueryUnescape(fileName)
if err != nil {
// If decoding fails, use original fileName
decodedFileName = fileName
}
// Get file info from database
var filePath string
err = s.db.QueryRow(
`SELECT file_path FROM job_files WHERE job_id = ? AND file_name = ?`,
jobID, decodedFileName,
).Scan(&filePath)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "File not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query file: %v", err))
return
}
// Open file
file, err := s.storage.GetFile(filePath)
if err != nil {
s.respondError(w, http.StatusNotFound, "File not found on disk")
return
}
defer file.Close()
// Determine content type based on file extension
contentType := "application/octet-stream"
fileNameLower := strings.ToLower(decodedFileName)
switch {
case strings.HasSuffix(fileNameLower, ".png"):
contentType = "image/png"
case strings.HasSuffix(fileNameLower, ".jpg") || strings.HasSuffix(fileNameLower, ".jpeg"):
contentType = "image/jpeg"
case strings.HasSuffix(fileNameLower, ".gif"):
contentType = "image/gif"
case strings.HasSuffix(fileNameLower, ".webp"):
contentType = "image/webp"
case strings.HasSuffix(fileNameLower, ".exr") || strings.HasSuffix(fileNameLower, ".EXR"):
contentType = "image/x-exr"
case strings.HasSuffix(fileNameLower, ".mp4"):
contentType = "video/mp4"
case strings.HasSuffix(fileNameLower, ".webm"):
contentType = "video/webm"
}
// Set headers
w.Header().Set("Content-Type", contentType)
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", decodedFileName))
// Stream file
io.Copy(w, file)
}
// WebSocket message types
type WSMessage struct {
Type string `json:"type"`
@@ -785,6 +884,13 @@ func (s *Server) handleWebSocketTaskComplete(runnerID int64, taskUpdate WSTaskUp
taskUpdate.TaskID,
).Scan(&jobID)
if err == nil {
// Broadcast task update
s.broadcastTaskUpdate(jobID, taskUpdate.TaskID, "task_update", map[string]interface{}{
"status": status,
"output_path": taskUpdate.OutputPath,
"completed_at": now,
"error": taskUpdate.Error,
})
s.updateJobStatusFromTasks(jobID)
}
}
@@ -840,6 +946,7 @@ func (s *Server) getCurrentFrameFromLogs(jobID int64) (int, bool) {
for rows.Next() {
var taskID int64
if err := rows.Scan(&taskID); err != nil {
log.Printf("Failed to scan task ID in getCurrentFrameFromLogs: %v", err)
continue
}
@@ -895,6 +1002,14 @@ func (s *Server) updateJobStatusFromTasks(jobID int64) {
allowParallelRunners.Valid && !allowParallelRunners.Bool &&
frameStart.Valid && frameEnd.Valid
// Get current job status to detect changes
var currentStatus string
err = s.db.QueryRow(`SELECT status FROM jobs WHERE id = ?`, jobID).Scan(&currentStatus)
if err != nil {
log.Printf("Failed to get current job status for job %d: %v", jobID, err)
return
}
// Count total tasks and completed tasks
var totalTasks, completedTasks int
err = s.db.QueryRow(
@@ -914,8 +1029,6 @@ func (s *Server) updateJobStatusFromTasks(jobID int64) {
return
}
log.Printf("updateJobStatusFromTasks: job %d - total: %d, completed: %d", jobID, totalTasks, completedTasks)
// Calculate progress
var progress float64
if totalTasks == 0 {
@@ -985,9 +1098,6 @@ func (s *Server) updateJobStatusFromTasks(jobID int64) {
} else {
progress = renderProgress
}
log.Printf("updateJobStatusFromTasks: job %d - frame-based progress: current_frame=%d, render_progress=%.1f%%, non_render_progress=%.1f%%, total_progress=%.1f%%",
jobID, currentFrame, renderProgress, nonRenderProgress, progress)
} else {
// Standard task-based progress
progress = float64(completedTasks) / float64(totalTasks) * 100.0
@@ -1013,8 +1123,6 @@ func (s *Server) updateJobStatusFromTasks(jobID int64) {
return
}
log.Printf("updateJobStatusFromTasks: job %d - pending/running: %d", jobID, pendingOrRunningTasks)
if pendingOrRunningTasks == 0 && totalTasks > 0 {
// All tasks are either completed or failed/cancelled
// Check if any tasks failed
@@ -1039,7 +1147,16 @@ func (s *Server) updateJobStatusFromTasks(jobID int64) {
if err != nil {
log.Printf("Failed to update job %d status to %s: %v", jobID, jobStatus, err)
} else {
log.Printf("Updated job %d status to %s (progress: %.1f%%, completed tasks: %d/%d)", jobID, jobStatus, progress, completedTasks, totalTasks)
// Only log if status actually changed
if currentStatus != jobStatus {
log.Printf("Updated job %d status from %s to %s (progress: %.1f%%, completed tasks: %d/%d)", jobID, currentStatus, jobStatus, progress, completedTasks, totalTasks)
}
// Broadcast job update via WebSocket
s.broadcastJobUpdate(jobID, "job_update", map[string]interface{}{
"status": jobStatus,
"progress": progress,
"completed_at": now,
})
}
if outputFormatStr == "EXR_264_MP4" || outputFormatStr == "EXR_AV1_MP4" {
@@ -1054,14 +1171,22 @@ func (s *Server) updateJobStatusFromTasks(jobID int64) {
// Create a video generation task instead of calling generateMP4Video directly
// This prevents race conditions when multiple runners complete frames simultaneously
videoTaskTimeout := 86400 // 24 hours for video generation
_, err := s.db.Exec(
var videoTaskID int64
err := s.db.QueryRow(
`INSERT INTO tasks (job_id, frame_start, frame_end, task_type, status, timeout_seconds, max_retries)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
VALUES (?, ?, ?, ?, ?, ?, ?)
RETURNING id`,
jobID, 0, 0, types.TaskTypeVideoGeneration, types.TaskStatusPending, videoTaskTimeout, 1,
)
).Scan(&videoTaskID)
if err != nil {
log.Printf("Failed to create video generation task for job %d: %v", jobID, err)
} else {
// Broadcast that a new task was added
log.Printf("Broadcasting task_added for job %d: video generation task %d", jobID, videoTaskID)
s.broadcastTaskUpdate(jobID, videoTaskID, "task_added", map[string]interface{}{
"task_id": videoTaskID,
"task_type": types.TaskTypeVideoGeneration,
})
// Update job status to ensure it's marked as running (has pending video task)
s.updateJobStatusFromTasks(jobID)
// Try to distribute the task immediately
@@ -1099,7 +1224,10 @@ func (s *Server) updateJobStatusFromTasks(jobID int64) {
if err != nil {
log.Printf("Failed to update job %d status to %s: %v", jobID, jobStatus, err)
} else {
log.Printf("Updated job %d status to %s (progress: %.1f%%, completed: %d/%d, pending: %d, running: %d)", jobID, jobStatus, progress, completedTasks, totalTasks, pendingOrRunningTasks-runningTasks, runningTasks)
// Only log if status actually changed
if currentStatus != jobStatus {
log.Printf("Updated job %d status from %s to %s (progress: %.1f%%, completed: %d/%d, pending: %d, running: %d)", jobID, currentStatus, jobStatus, progress, completedTasks, totalTasks, pendingOrRunningTasks-runningTasks, runningTasks)
}
}
}
}
@@ -1224,7 +1352,6 @@ func (s *Server) distributeTasksToRunners() {
t.AllowParallelRunners = true
}
pendingTasks = append(pendingTasks, t)
log.Printf("Found pending task %d (type: %s, job: %d '%s', status: %s)", t.TaskID, t.TaskType, t.JobID, t.JobName, t.JobStatus)
}
if len(pendingTasks) == 0 {
@@ -1308,11 +1435,6 @@ func (s *Server) distributeTasksToRunners() {
}
log.Printf("Distributing %d pending tasks (%v) to %d connected runners: %v", len(pendingTasks), taskTypes, len(connectedRunners), connectedRunners)
// Log each pending task for debugging
for _, task := range pendingTasks {
log.Printf(" - Task %d (type: %s, job: %d '%s', status: %s)", task.TaskID, task.TaskType, task.JobID, task.JobName, task.JobStatus)
}
// Distribute tasks to runners
// Sort tasks to prioritize metadata tasks
sort.Slice(pendingTasks, func(i, j int) bool {
@@ -1572,6 +1694,13 @@ func (s *Server) distributeTasksToRunners() {
continue
}
// Broadcast task assignment
s.broadcastTaskUpdate(task.JobID, task.TaskID, "task_update", map[string]interface{}{
"status": types.TaskStatusRunning,
"runner_id": selectedRunnerID,
"started_at": now,
})
// Task was successfully assigned, send via WebSocket
log.Printf("Assigned task %d (type: %s, job: %d) to runner %d", task.TaskID, task.TaskType, task.JobID, selectedRunnerID)
@@ -1642,6 +1771,8 @@ func (s *Server) assignTaskToRunner(runnerID int64, taskID int64) error {
var filePath string
if err := rows.Scan(&filePath); err == nil {
task.InputFiles = append(task.InputFiles, filePath)
} else {
log.Printf("Failed to scan input file path for task %d: %v", taskID, err)
}
}
} else {

View File

@@ -1,12 +1,17 @@
package api
import (
"compress/gzip"
"database/sql"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
@@ -38,6 +43,15 @@ type Server struct {
// Mutexes for each frontend connection to serialize writes
frontendConnsWriteMu map[string]*sync.Mutex // key: "jobId:taskId"
frontendConnsWriteMuMu sync.RWMutex
// Job list WebSocket connections (key: userID)
jobListConns map[int64]*websocket.Conn
jobListConnsMu sync.RWMutex
// Single job WebSocket connections (key: "userId:jobId")
jobConns map[string]*websocket.Conn
jobConnsMu sync.RWMutex
// Mutexes for job WebSocket connections
jobConnsWriteMu map[string]*sync.Mutex
jobConnsWriteMuMu sync.RWMutex
// Throttling for progress updates (per job)
progressUpdateTimes map[int64]time.Time // key: jobID
progressUpdateTimesMu sync.RWMutex
@@ -66,6 +80,9 @@ func NewServer(db *database.DB, auth *authpkg.Auth, storage *storage.Storage) (*
runnerConns: make(map[int64]*websocket.Conn),
frontendConns: make(map[string]*websocket.Conn),
frontendConnsWriteMu: make(map[string]*sync.Mutex),
jobListConns: make(map[int64]*websocket.Conn),
jobConns: make(map[string]*websocket.Conn),
jobConnsWriteMu: make(map[string]*sync.Mutex),
progressUpdateTimes: make(map[int64]time.Time),
}
@@ -83,16 +100,62 @@ func (s *Server) setupMiddleware() {
// Note: Timeout middleware is NOT applied globally to avoid conflicts with WebSocket connections
// WebSocket connections are long-lived and should not have HTTP timeouts
// Add gzip compression for JSON responses
s.router.Use(gzipMiddleware)
s.router.Use(cors.Handler(cors.Options{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "Range"},
ExposedHeaders: []string{"Link", "Content-Range", "Accept-Ranges", "Content-Length"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "Range", "If-None-Match"},
ExposedHeaders: []string{"Link", "Content-Range", "Accept-Ranges", "Content-Length", "ETag"},
AllowCredentials: true,
MaxAge: 300,
}))
}
// gzipMiddleware compresses responses with gzip if client supports it
func gzipMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Skip compression for WebSocket upgrades
if strings.ToLower(r.Header.Get("Upgrade")) == "websocket" {
next.ServeHTTP(w, r)
return
}
// Check if client accepts gzip
if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
next.ServeHTTP(w, r)
return
}
// Create gzip writer
gz := gzip.NewWriter(w)
defer gz.Close()
w.Header().Set("Content-Encoding", "gzip")
w.Header().Set("Vary", "Accept-Encoding")
// Wrap response writer
gzw := &gzipResponseWriter{Writer: gz, ResponseWriter: w}
next.ServeHTTP(gzw, r)
})
}
// gzipResponseWriter wraps http.ResponseWriter to add gzip compression
type gzipResponseWriter struct {
io.Writer
http.ResponseWriter
}
func (w *gzipResponseWriter) Write(b []byte) (int, error) {
return w.Writer.Write(b)
}
func (w *gzipResponseWriter) WriteHeader(statusCode int) {
// Don't set Content-Length when using gzip - it will be set automatically
w.ResponseWriter.WriteHeader(statusCode)
}
// setupRoutes configures routes
func (s *Server) setupRoutes() {
// Public routes
@@ -118,16 +181,21 @@ func (s *Server) setupRoutes() {
r.Post("/", s.handleCreateJob)
r.Post("/upload", s.handleUploadFileForJobCreation) // Upload before job creation
r.Get("/", s.handleListJobs)
r.Get("/summary", s.handleListJobsSummary)
r.Post("/batch", s.handleBatchGetJobs)
r.Get("/{id}", s.handleGetJob)
r.Delete("/{id}", s.handleCancelJob)
r.Post("/{id}/delete", s.handleDeleteJob)
r.Post("/{id}/upload", s.handleUploadJobFile)
r.Get("/{id}/files", s.handleListJobFiles)
r.Get("/{id}/files/count", s.handleGetJobFilesCount)
r.Get("/{id}/context", s.handleListContextArchive)
r.Get("/{id}/files/{fileId}/download", s.handleDownloadJobFile)
r.Get("/{id}/video", s.handleStreamVideo)
r.Get("/{id}/metadata", s.handleGetJobMetadata)
r.Get("/{id}/tasks", s.handleListJobTasks)
r.Get("/{id}/tasks/summary", s.handleListJobTasksSummary)
r.Post("/{id}/tasks/batch", s.handleBatchGetTasks)
r.Get("/{id}/tasks/{taskId}/logs", s.handleGetTaskLogs)
// WebSocket route - no timeout middleware (long-lived connection)
r.With(func(next http.Handler) http.Handler {
@@ -138,6 +206,19 @@ func (s *Server) setupRoutes() {
}).Get("/{id}/tasks/{taskId}/logs/ws", s.handleStreamTaskLogsWebSocket)
r.Get("/{id}/tasks/{taskId}/steps", s.handleGetTaskSteps)
r.Post("/{id}/tasks/{taskId}/retry", s.handleRetryTask)
// WebSocket routes for real-time updates
r.With(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Remove timeout middleware for WebSocket
next.ServeHTTP(w, r)
})
}).Get("/ws", s.handleJobsWebSocket)
r.With(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Remove timeout middleware for WebSocket
next.ServeHTTP(w, r)
})
}).Get("/{id}/ws", s.handleJobWebSocket)
})
// Admin routes
@@ -181,7 +262,8 @@ func (s *Server) setupRoutes() {
})
r.Post("/tasks/{id}/progress", s.handleUpdateTaskProgress)
r.Post("/tasks/{id}/steps", s.handleUpdateTaskStep)
r.Get("/jobs/{jobId}/context.tar.gz", s.handleDownloadJobContext)
r.Get("/jobs/{jobId}/context.tar", s.handleDownloadJobContext)
r.Get("/files/{jobId}/{fileName}", s.handleDownloadFileForRunner)
r.Post("/files/{jobId}/upload", s.handleUploadFileFromRunner)
r.Get("/jobs/{jobId}/status", s.handleGetJobStatusForRunner)
r.Get("/jobs/{jobId}/files", s.handleGetJobFilesForRunner)
@@ -311,12 +393,14 @@ func (s *Server) handleLogout(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleGetMe(w http.ResponseWriter, r *http.Request) {
cookie, err := r.Cookie("session_id")
if err != nil {
log.Printf("Authentication failed: missing session cookie in /auth/me")
s.respondError(w, http.StatusUnauthorized, "Not authenticated")
return
}
session, ok := s.auth.GetSession(cookie.Value)
if !ok {
log.Printf("Authentication failed: invalid session cookie in /auth/me")
s.respondError(w, http.StatusUnauthorized, "Invalid session")
return
}
@@ -410,6 +494,7 @@ func (s *Server) handleLocalLogin(w http.ResponseWriter, r *http.Request) {
session, err := s.auth.LocalLogin(req.Username, req.Password)
if err != nil {
log.Printf("Authentication failed: invalid credentials for username '%s'", req.Username)
s.respondError(w, http.StatusUnauthorized, "Invalid credentials")
return
}
@@ -512,6 +597,7 @@ func parseID(r *http.Request, param string) (int64, error) {
func (s *Server) StartBackgroundTasks() {
go s.recoverStuckTasks()
go s.cleanupOldRenderJobs()
go s.cleanupOldTempDirectories()
}
// recoverStuckTasks periodically checks for dead runners and stuck tasks
@@ -621,6 +707,7 @@ func (s *Server) recoverTaskTimeouts() {
err := rows.Scan(&taskID, &runnerID, &retryCount, &maxRetries, &timeoutSeconds, &startedAt)
if err != nil {
log.Printf("Failed to scan task row in recoverTaskTimeouts: %v", err)
continue
}
@@ -659,3 +746,72 @@ func (s *Server) recoverTaskTimeouts() {
}
}
}
// cleanupOldTempDirectories periodically cleans up old temporary directories
func (s *Server) cleanupOldTempDirectories() {
// Run cleanup every hour
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
// Run once immediately on startup
s.cleanupOldTempDirectoriesOnce()
for range ticker.C {
s.cleanupOldTempDirectoriesOnce()
}
}
// cleanupOldTempDirectoriesOnce removes temp directories older than 1 hour
func (s *Server) cleanupOldTempDirectoriesOnce() {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic in cleanupOldTempDirectories: %v", r)
}
}()
tempPath := filepath.Join(s.storage.BasePath(), "temp")
// Check if temp directory exists
if _, err := os.Stat(tempPath); os.IsNotExist(err) {
return
}
// Read all entries in temp directory
entries, err := os.ReadDir(tempPath)
if err != nil {
log.Printf("Failed to read temp directory: %v", err)
return
}
now := time.Now()
cleanedCount := 0
for _, entry := range entries {
if !entry.IsDir() {
continue
}
entryPath := filepath.Join(tempPath, entry.Name())
// Get directory info to check modification time
info, err := entry.Info()
if err != nil {
continue
}
// Remove directories older than 1 hour
age := now.Sub(info.ModTime())
if age > 1*time.Hour {
if err := os.RemoveAll(entryPath); err != nil {
log.Printf("Warning: Failed to clean up old temp directory %s: %v", entryPath, err)
} else {
cleanedCount++
log.Printf("Cleaned up old temp directory: %s (age: %v)", entryPath, age)
}
}
}
if cleanedCount > 0 {
log.Printf("Cleaned up %d old temp directories", cleanedCount)
}
}