From c9ade39ad9d2b9be9f654d1c6c5c7f7b7dd2364c Mon Sep 17 00:00:00 2001 From: Justin Harms Date: Sat, 22 Nov 2025 06:37:32 -0600 Subject: [PATCH] Implement job metadata extraction and task management features. Add validation for frame range limits, enhance job and task data structures, and introduce new API endpoints for metadata and task retrieval. Update client-side components to handle metadata extraction and display task statuses. Improve error handling in API responses. --- internal/api/jobs.go | 130 ++++++++- internal/api/metadata.go | 139 +++++++++ internal/api/runners.go | 225 +++++++++++++-- internal/api/server.go | 4 + internal/database/schema.go | 4 + internal/runner/client.go | 403 +++++++++++++++++++++++---- pkg/types/types.go | 35 +++ web/src/components/JobDetails.jsx | 50 +++- web/src/components/JobSubmission.jsx | 152 +++++++++- web/src/utils/api.js | 24 +- 10 files changed, 1078 insertions(+), 88 deletions(-) create mode 100644 internal/api/metadata.go diff --git a/internal/api/jobs.go b/internal/api/jobs.go index 308454b..be6f29f 100644 --- a/internal/api/jobs.go +++ b/internal/api/jobs.go @@ -8,6 +8,7 @@ import ( "log" "net/http" "strconv" + "strings" "time" "fuego/pkg/types" @@ -39,6 +40,13 @@ func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) { return } + // Validate frame range limits (prevent abuse) + const maxFrameRange = 10000 + if req.FrameEnd-req.FrameStart+1 > maxFrameRange { + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Frame range too large. Maximum allowed: %d frames", maxFrameRange)) + return + } + if req.OutputFormat == "" { req.OutputFormat = "PNG" } @@ -116,7 +124,7 @@ func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) { rows, err := s.db.Query( `SELECT id, user_id, name, status, progress, frame_start, frame_end, output_format, - allow_parallel_runners, timeout_seconds, created_at, started_at, completed_at, error_message + allow_parallel_runners, timeout_seconds, blend_metadata, created_at, started_at, completed_at, error_message FROM jobs WHERE user_id = ? ORDER BY created_at DESC`, userID, ) @@ -130,11 +138,12 @@ func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) { for rows.Next() { var job types.Job var startedAt, completedAt sql.NullTime + var blendMetadataJSON sql.NullString err := rows.Scan( &job.ID, &job.UserID, &job.Name, &job.Status, &job.Progress, &job.FrameStart, &job.FrameEnd, &job.OutputFormat, &job.AllowParallelRunners, &job.TimeoutSeconds, - &job.CreatedAt, &startedAt, &completedAt, &job.ErrorMessage, + &blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &job.ErrorMessage, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan job: %v", err)) @@ -147,6 +156,12 @@ func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) { if completedAt.Valid { job.CompletedAt = &completedAt.Time } + if blendMetadataJSON.Valid && blendMetadataJSON.String != "" { + var metadata types.BlendMetadata + if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err == nil { + job.BlendMetadata = &metadata + } + } jobs = append(jobs, job) } @@ -171,15 +186,16 @@ func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request) { var job types.Job var startedAt, completedAt sql.NullTime + var blendMetadataJSON sql.NullString err = s.db.QueryRow( `SELECT id, user_id, name, status, progress, frame_start, frame_end, output_format, - allow_parallel_runners, timeout_seconds, created_at, started_at, completed_at, error_message + allow_parallel_runners, timeout_seconds, blend_metadata, created_at, started_at, completed_at, error_message FROM jobs WHERE id = ? AND user_id = ?`, jobID, userID, ).Scan( &job.ID, &job.UserID, &job.Name, &job.Status, &job.Progress, &job.FrameStart, &job.FrameEnd, &job.OutputFormat, &job.AllowParallelRunners, &job.TimeoutSeconds, - &job.CreatedAt, &startedAt, &completedAt, &job.ErrorMessage, + &blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &job.ErrorMessage, ) if err == sql.ErrNoRows { @@ -197,6 +213,12 @@ func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request) { if completedAt.Valid { job.CompletedAt = &completedAt.Time } + if blendMetadataJSON.Valid && blendMetadataJSON.String != "" { + var metadata types.BlendMetadata + if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err == nil { + job.BlendMetadata = &metadata + } + } s.respondJSON(w, http.StatusOK, job) } @@ -307,6 +329,25 @@ func (s *Server) handleUploadJobFile(w http.ResponseWriter, r *http.Request) { fileID, _ := result.LastInsertId() + // If this is a blend file, create a metadata extraction task + if strings.HasSuffix(strings.ToLower(header.Filename), ".blend") { + // Create metadata extraction task + metadataTaskTimeout := 300 // 5 minutes for metadata extraction + taskResult, err := s.db.Exec( + `INSERT INTO tasks (job_id, frame_start, frame_end, task_type, status, timeout_seconds, max_retries) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + jobID, 0, 0, types.TaskTypeMetadata, types.TaskStatusPending, metadataTaskTimeout, 1, + ) + if err != nil { + log.Printf("Failed to create metadata extraction task: %v", err) + } else { + metadataTaskID, _ := taskResult.LastInsertId() + log.Printf("Created metadata extraction task %d for job %d", metadataTaskID, jobID) + // Try to distribute the task immediately + go s.distributeTasksToRunners() + } + } + s.respondJSON(w, http.StatusCreated, map[string]interface{}{ "id": fileID, "file_name": header.Filename, @@ -524,6 +565,87 @@ func (s *Server) handleStreamVideo(w http.ResponseWriter, r *http.Request) { } } +// handleListJobTasks lists all tasks for a job +func (s *Server) handleListJobTasks(w http.ResponseWriter, r *http.Request) { + userID, err := getUserID(r) + if err != nil { + s.respondError(w, http.StatusUnauthorized, err.Error()) + return + } + + jobID, err := parseID(r, "id") + if err != nil { + s.respondError(w, http.StatusBadRequest, err.Error()) + return + } + + // Verify job belongs to user + var jobUserID int64 + err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) + if err == sql.ErrNoRows { + s.respondError(w, http.StatusNotFound, "Job not found") + return + } + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) + return + } + if jobUserID != userID { + s.respondError(w, http.StatusForbidden, "Access denied") + return + } + + rows, err := s.db.Query( + `SELECT id, job_id, runner_id, frame_start, frame_end, status, task_type, + current_step, retry_count, max_retries, output_path, created_at, started_at, + completed_at, error_message, timeout_seconds + FROM tasks WHERE job_id = ? ORDER BY frame_start ASC`, + jobID, + ) + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query tasks: %v", err)) + return + } + defer rows.Close() + + tasks := []types.Task{} + for rows.Next() { + var task types.Task + var runnerID sql.NullInt64 + var startedAt, completedAt sql.NullTime + var timeoutSeconds sql.NullInt64 + + err := rows.Scan( + &task.ID, &task.JobID, &runnerID, &task.FrameStart, &task.FrameEnd, + &task.Status, &task.TaskType, &task.CurrentStep, &task.RetryCount, + &task.MaxRetries, &task.OutputPath, &task.CreatedAt, &startedAt, + &completedAt, &task.ErrorMessage, &timeoutSeconds, + ) + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan task: %v", err)) + return + } + + if runnerID.Valid { + task.RunnerID = &runnerID.Int64 + } + if startedAt.Valid { + task.StartedAt = &startedAt.Time + } + if completedAt.Valid { + task.CompletedAt = &completedAt.Time + } + if timeoutSeconds.Valid { + timeout := int(timeoutSeconds.Int64) + task.TimeoutSeconds = &timeout + } + + tasks = append(tasks, task) + } + + s.respondJSON(w, http.StatusOK, tasks) +} + // handleGetTaskLogs retrieves logs for a specific task func (s *Server) handleGetTaskLogs(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) diff --git a/internal/api/metadata.go b/internal/api/metadata.go new file mode 100644 index 0000000..e7bc12e --- /dev/null +++ b/internal/api/metadata.go @@ -0,0 +1,139 @@ +package api + +import ( + "database/sql" + "encoding/json" + "fmt" + "log" + "net/http" + + "fuego/pkg/types" +) + +// handleSubmitMetadata handles metadata submission from runner +func (s *Server) handleSubmitMetadata(w http.ResponseWriter, r *http.Request) { + jobID, err := parseID(r, "jobId") + if err != nil { + s.respondError(w, http.StatusBadRequest, err.Error()) + return + } + + // Get runner ID from context (set by runnerAuthMiddleware) + runnerID, ok := r.Context().Value("runner_id").(int64) + if !ok { + s.respondError(w, http.StatusUnauthorized, "runner_id not found in context") + return + } + + var metadata types.BlendMetadata + if err := json.NewDecoder(r.Body).Decode(&metadata); err != nil { + s.respondError(w, http.StatusBadRequest, "Invalid metadata JSON") + return + } + + // Verify job exists + var jobUserID int64 + err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) + if err == sql.ErrNoRows { + s.respondError(w, http.StatusNotFound, "Job not found") + return + } + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) + return + } + + // Find the metadata extraction task for this job + var taskID int64 + err = s.db.QueryRow( + `SELECT id FROM tasks WHERE job_id = ? AND task_type = ? AND runner_id = ?`, + jobID, types.TaskTypeMetadata, runnerID, + ).Scan(&taskID) + if err == sql.ErrNoRows { + s.respondError(w, http.StatusNotFound, "Metadata extraction task not found") + return + } + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to find task: %v", err)) + return + } + + // Convert metadata to JSON + metadataJSON, err := json.Marshal(metadata) + if err != nil { + s.respondError(w, http.StatusInternalServerError, "Failed to marshal metadata") + return + } + + // Update job with metadata + _, err = s.db.Exec( + `UPDATE jobs SET blend_metadata = ? WHERE id = ?`, + string(metadataJSON), jobID, + ) + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to update job metadata: %v", err)) + return + } + + // Mark task as completed + _, err = s.db.Exec( + `UPDATE tasks SET status = ?, completed_at = CURRENT_TIMESTAMP WHERE id = ?`, + types.TaskStatusCompleted, taskID, + ) + if err != nil { + log.Printf("Failed to mark metadata task as completed: %v", err) + } + + log.Printf("Metadata extracted for job %d: frame_start=%d, frame_end=%d", jobID, metadata.FrameStart, metadata.FrameEnd) + + s.respondJSON(w, http.StatusOK, map[string]string{"message": "Metadata submitted successfully"}) +} + +// handleGetJobMetadata retrieves metadata for a job +func (s *Server) handleGetJobMetadata(w http.ResponseWriter, r *http.Request) { + userID, err := getUserID(r) + if err != nil { + s.respondError(w, http.StatusUnauthorized, err.Error()) + return + } + + jobID, err := parseID(r, "id") + if err != nil { + s.respondError(w, http.StatusBadRequest, err.Error()) + return + } + + // Verify job belongs to user + var jobUserID int64 + var blendMetadataJSON sql.NullString + err = s.db.QueryRow( + `SELECT user_id, blend_metadata FROM jobs WHERE id = ?`, + jobID, + ).Scan(&jobUserID, &blendMetadataJSON) + if err == sql.ErrNoRows { + s.respondError(w, http.StatusNotFound, "Job not found") + return + } + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err)) + return + } + if jobUserID != userID { + s.respondError(w, http.StatusForbidden, "Access denied") + return + } + + if !blendMetadataJSON.Valid || blendMetadataJSON.String == "" { + s.respondJSON(w, http.StatusOK, nil) + return + } + + var metadata types.BlendMetadata + if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err != nil { + s.respondError(w, http.StatusInternalServerError, "Failed to parse metadata") + return + } + + s.respondJSON(w, http.StatusOK, metadata) +} + diff --git a/internal/api/runners.go b/internal/api/runners.go index d0c4d41..68504f9 100644 --- a/internal/api/runners.go +++ b/internal/api/runners.go @@ -185,6 +185,117 @@ func (s *Server) handleUpdateTaskProgress(w http.ResponseWriter, r *http.Request s.respondJSON(w, http.StatusOK, map[string]string{"message": "Progress updated"}) } +// handleUpdateTaskStep handles step start/complete events from runners +func (s *Server) handleUpdateTaskStep(w http.ResponseWriter, r *http.Request) { + // Get runner ID from context (set by runnerAuthMiddleware) + runnerID, ok := r.Context().Value("runner_id").(int64) + if !ok { + s.respondError(w, http.StatusUnauthorized, "runner_id not found in context") + return + } + + taskID, err := parseID(r, "id") + if err != nil { + s.respondError(w, http.StatusBadRequest, err.Error()) + return + } + + var req struct { + StepName string `json:"step_name"` + Status string `json:"status"` // "pending", "running", "completed", "failed", "skipped" + DurationMs *int `json:"duration_ms,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + s.respondError(w, http.StatusBadRequest, "Invalid request body") + return + } + + // Verify task belongs to runner + var taskRunnerID sql.NullInt64 + err = s.db.QueryRow("SELECT runner_id FROM tasks WHERE id = ?", taskID).Scan(&taskRunnerID) + if err == sql.ErrNoRows { + s.respondError(w, http.StatusNotFound, "Task not found") + return + } + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err)) + return + } + if !taskRunnerID.Valid || taskRunnerID.Int64 != runnerID { + s.respondError(w, http.StatusForbidden, "Task does not belong to this runner") + return + } + + now := time.Now() + var stepID int64 + + // Check if step already exists + var existingStepID sql.NullInt64 + err = s.db.QueryRow( + `SELECT id FROM task_steps WHERE task_id = ? AND step_name = ?`, + taskID, req.StepName, + ).Scan(&existingStepID) + + if err == sql.ErrNoRows || !existingStepID.Valid { + // Create new step + var startedAt *time.Time + var completedAt *time.Time + if req.Status == string(types.StepStatusRunning) || req.Status == string(types.StepStatusCompleted) || req.Status == string(types.StepStatusFailed) { + startedAt = &now + } + if req.Status == string(types.StepStatusCompleted) || req.Status == string(types.StepStatusFailed) { + completedAt = &now + } + + result, err := s.db.Exec( + `INSERT INTO task_steps (task_id, step_name, status, started_at, completed_at, duration_ms, error_message) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + taskID, req.StepName, req.Status, startedAt, completedAt, req.DurationMs, req.ErrorMessage, + ) + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create step: %v", err)) + return + } + stepID, _ = result.LastInsertId() + } else { + // Update existing step + stepID = existingStepID.Int64 + var startedAt *time.Time + var completedAt *time.Time + + // Get existing started_at if status is running/completed/failed + if req.Status == string(types.StepStatusRunning) || req.Status == string(types.StepStatusCompleted) || req.Status == string(types.StepStatusFailed) { + var existingStartedAt sql.NullTime + s.db.QueryRow(`SELECT started_at FROM task_steps WHERE id = ?`, stepID).Scan(&existingStartedAt) + if existingStartedAt.Valid { + startedAt = &existingStartedAt.Time + } else { + startedAt = &now + } + } + + if req.Status == string(types.StepStatusCompleted) || req.Status == string(types.StepStatusFailed) { + completedAt = &now + } + + _, err = s.db.Exec( + `UPDATE task_steps SET status = ?, started_at = ?, completed_at = ?, duration_ms = ?, error_message = ? + WHERE id = ?`, + req.Status, startedAt, completedAt, req.DurationMs, req.ErrorMessage, stepID, + ) + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to update step: %v", err)) + return + } + } + + s.respondJSON(w, http.StatusOK, map[string]interface{}{ + "step_id": stepID, + "message": "Step updated successfully", + }) +} + // handleDownloadFileForRunner allows runners to download job files func (s *Server) handleDownloadFileForRunner(w http.ResponseWriter, r *http.Request) { jobID, err := parseID(r, "jobId") @@ -396,6 +507,7 @@ type WSTaskAssignment struct { OutputFormat string `json:"output_format"` FrameStart int `json:"frame_start"` FrameEnd int `json:"frame_end"` + TaskType string `json:"task_type"` InputFiles []string `json:"input_files"` } @@ -633,20 +745,41 @@ func (s *Server) handleWebSocketTaskComplete(runnerID int64, taskUpdate WSTaskUp taskUpdate.TaskID, ).Scan(&jobID, &frameStart, &frameEnd) if err == nil { + // Count total tasks excluding failed ones (failed tasks are retried, so we count them) + // We exclude tasks that are in a terminal failed state with max retries exceeded var totalTasks, completedTasks int - s.db.QueryRow(`SELECT COUNT(*) FROM tasks WHERE job_id = ?`, jobID).Scan(&totalTasks) + s.db.QueryRow( + `SELECT COUNT(*) FROM tasks WHERE job_id = ? AND status IN (?, ?, ?, ?)`, + jobID, types.TaskStatusPending, types.TaskStatusRunning, types.TaskStatusCompleted, types.TaskStatusFailed, + ).Scan(&totalTasks) s.db.QueryRow( `SELECT COUNT(*) FROM tasks WHERE job_id = ? AND status = ?`, jobID, types.TaskStatusCompleted, ).Scan(&completedTasks) - progress := float64(completedTasks) / float64(totalTasks) * 100.0 + // Handle edge cases: division by zero and all tasks cancelled + var progress float64 + if totalTasks == 0 { + // All tasks cancelled or no tasks, set progress to 0 + progress = 0.0 + } else { + progress = float64(completedTasks) / float64(totalTasks) * 100.0 + } var jobStatus string var outputFormat string s.db.QueryRow(`SELECT output_format FROM jobs WHERE id = ?`, jobID).Scan(&outputFormat) - if completedTasks == totalTasks { + // Check if all non-cancelled tasks are completed + var pendingOrRunningTasks int + s.db.QueryRow( + `SELECT COUNT(*) FROM tasks + WHERE job_id = ? AND status IN (?, ?)`, + jobID, types.TaskStatusPending, types.TaskStatusRunning, + ).Scan(&pendingOrRunningTasks) + + if pendingOrRunningTasks == 0 && totalTasks > 0 { + // All tasks are either completed or failed/cancelled jobStatus = string(types.JobStatusCompleted) s.db.Exec( `UPDATE jobs SET status = ?, progress = ?, completed_at = ? WHERE id = ?`, @@ -654,7 +787,20 @@ func (s *Server) handleWebSocketTaskComplete(runnerID int64, taskUpdate WSTaskUp ) if outputFormat == "MP4" { - go s.generateMP4Video(jobID) + // Create a video generation task instead of calling generateMP4Video directly + // This prevents race conditions when multiple runners complete frames simultaneously + videoTaskTimeout := 86400 // 24 hours for video generation + _, err := s.db.Exec( + `INSERT INTO tasks (job_id, frame_start, frame_end, task_type, status, timeout_seconds, max_retries) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + jobID, 0, 0, types.TaskTypeVideoGeneration, types.TaskStatusPending, videoTaskTimeout, 1, + ) + if err != nil { + log.Printf("Failed to create video generation task for job %d: %v", jobID, err) + } else { + // Try to distribute the task immediately + go s.distributeTasksToRunners() + } } } else { jobStatus = string(types.JobStatusRunning) @@ -712,7 +858,7 @@ func (s *Server) broadcastLogToFrontend(taskID int64, logEntry WSLogEntry) { func (s *Server) distributeTasksToRunners() { // Get all pending tasks rows, err := s.db.Query( - `SELECT t.id, t.job_id, t.frame_start, t.frame_end, j.allow_parallel_runners, j.status as job_status + `SELECT t.id, t.job_id, t.frame_start, t.frame_end, t.task_type, j.allow_parallel_runners, j.status as job_status FROM tasks t JOIN jobs j ON t.job_id = j.id WHERE t.status = ? AND j.status != ? @@ -731,6 +877,7 @@ func (s *Server) distributeTasksToRunners() { JobID int64 FrameStart int FrameEnd int + TaskType string AllowParallelRunners bool } @@ -740,11 +887,12 @@ func (s *Server) distributeTasksToRunners() { JobID int64 FrameStart int FrameEnd int + TaskType string AllowParallelRunners bool } var allowParallel int var jobStatus string - err := rows.Scan(&t.TaskID, &t.JobID, &t.FrameStart, &t.FrameEnd, &allowParallel, &jobStatus) + err := rows.Scan(&t.TaskID, &t.JobID, &t.FrameStart, &t.FrameEnd, &t.TaskType, &allowParallel, &jobStatus) if err != nil { continue } @@ -770,13 +918,6 @@ func (s *Server) distributeTasksToRunners() { // Distribute tasks to runners for _, task := range pendingTasks { - // Check if task is already assigned - var assignedRunnerID sql.NullInt64 - err := s.db.QueryRow("SELECT runner_id FROM tasks WHERE id = ?", task.TaskID).Scan(&assignedRunnerID) - if err == nil && assignedRunnerID.Valid { - continue // Already assigned - } - // Find available runner var selectedRunnerID int64 for _, runnerID := range connectedRunners { @@ -812,9 +953,40 @@ func (s *Server) distributeTasksToRunners() { continue // No available runner } - // Assign task to runner + // Atomically assign task to runner using UPDATE with WHERE runner_id IS NULL + // This prevents race conditions when multiple goroutines try to assign the same task + now := time.Now() + result, err := s.db.Exec( + `UPDATE tasks SET runner_id = ?, status = ?, started_at = ? + WHERE id = ? AND runner_id IS NULL AND status = ?`, + selectedRunnerID, types.TaskStatusRunning, now, task.TaskID, types.TaskStatusPending, + ) + if err != nil { + log.Printf("Failed to atomically assign task %d: %v", task.TaskID, err) + continue + } + + // Check if the update actually affected a row (task was successfully assigned) + rowsAffected, err := result.RowsAffected() + if err != nil { + log.Printf("Failed to get rows affected for task %d: %v", task.TaskID, err) + continue + } + + if rowsAffected == 0 { + // Task was already assigned by another goroutine, skip + continue + } + + // Task was successfully assigned, send via WebSocket if err := s.assignTaskToRunner(selectedRunnerID, task.TaskID); err != nil { - log.Printf("Failed to assign task %d to runner %d: %v", task.TaskID, selectedRunnerID, err) + log.Printf("Failed to send task %d to runner %d: %v", task.TaskID, selectedRunnerID, err) + // Rollback the assignment if WebSocket send fails + s.db.Exec( + `UPDATE tasks SET runner_id = NULL, status = ?, started_at = NULL + WHERE id = ?`, + types.TaskStatusPending, task.TaskID, + ) } } } @@ -831,20 +1003,20 @@ func (s *Server) assignTaskToRunner(runnerID int64, taskID int64) error { // Get task details var task WSTaskAssignment - var jobName, outputFormat string + var jobName, outputFormat, taskType string err := s.db.QueryRow( - `SELECT t.job_id, t.frame_start, t.frame_end, j.name, j.output_format + `SELECT t.job_id, t.frame_start, t.frame_end, t.task_type, j.name, j.output_format FROM tasks t JOIN jobs j ON t.job_id = j.id WHERE t.id = ?`, taskID, - ).Scan(&task.JobID, &task.FrameStart, &task.FrameEnd, &jobName, &outputFormat) + ).Scan(&task.JobID, &task.FrameStart, &task.FrameEnd, &taskType, &jobName, &outputFormat) if err != nil { return err } task.TaskID = taskID - task.JobID = task.JobID task.JobName = jobName task.OutputFormat = outputFormat + task.TaskType = taskType // Get input files rows, err := s.db.Query( @@ -861,14 +1033,15 @@ func (s *Server) assignTaskToRunner(runnerID int64, taskID int64) error { } } - // Assign task to runner in database - now := time.Now() - _, err = s.db.Exec( - `UPDATE tasks SET runner_id = ?, status = ?, started_at = ? WHERE id = ?`, - runnerID, types.TaskStatusRunning, now, taskID, - ) + // Note: Task is already assigned in database by the atomic update in distributeTasksToRunners + // We just need to verify it's still assigned to this runner + var assignedRunnerID sql.NullInt64 + err = s.db.QueryRow("SELECT runner_id FROM tasks WHERE id = ?", taskID).Scan(&assignedRunnerID) if err != nil { - return err + return fmt.Errorf("task not found: %w", err) + } + if !assignedRunnerID.Valid || assignedRunnerID.Int64 != runnerID { + return fmt.Errorf("task %d is not assigned to runner %d", taskID, runnerID) } // Send task via WebSocket diff --git a/internal/api/server.go b/internal/api/server.go index a3b6613..610d2ae 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -109,6 +109,8 @@ func (s *Server) setupRoutes() { r.Get("/{id}/files", s.handleListJobFiles) r.Get("/{id}/files/{fileId}/download", s.handleDownloadJobFile) r.Get("/{id}/video", s.handleStreamVideo) + r.Get("/{id}/metadata", s.handleGetJobMetadata) + r.Get("/{id}/tasks", s.handleListJobTasks) r.Get("/{id}/tasks/{taskId}/logs", s.handleGetTaskLogs) r.Get("/{id}/tasks/{taskId}/logs/ws", s.handleStreamTaskLogsWebSocket) r.Get("/{id}/tasks/{taskId}/steps", s.handleGetTaskSteps) @@ -153,10 +155,12 @@ func (s *Server) setupRoutes() { return http.HandlerFunc(s.runnerAuthMiddleware(next.ServeHTTP)) }) r.Post("/tasks/{id}/progress", s.handleUpdateTaskProgress) + r.Post("/tasks/{id}/steps", s.handleUpdateTaskStep) r.Get("/files/{jobId}/{fileName}", s.handleDownloadFileForRunner) r.Post("/files/{jobId}/upload", s.handleUploadFileFromRunner) r.Get("/jobs/{jobId}/status", s.handleGetJobStatusForRunner) r.Get("/jobs/{jobId}/files", s.handleGetJobFilesForRunner) + r.Post("/jobs/{jobId}/metadata", s.handleSubmitMetadata) }) }) diff --git a/internal/database/schema.go b/internal/database/schema.go index f418936..36d40e8 100644 --- a/internal/database/schema.go +++ b/internal/database/schema.go @@ -175,6 +175,10 @@ func (db *DB) migrate() error { `ALTER TABLE jobs ADD COLUMN allow_parallel_runners BOOLEAN NOT NULL DEFAULT 1`, // Add timeout_seconds to jobs if it doesn't exist `ALTER TABLE jobs ADD COLUMN timeout_seconds INTEGER DEFAULT 86400`, + // Add blend_metadata to jobs if it doesn't exist + `ALTER TABLE jobs ADD COLUMN blend_metadata TEXT`, + // Add task_type to tasks if it doesn't exist + `ALTER TABLE tasks ADD COLUMN task_type TEXT DEFAULT 'render'`, // Add new columns to tasks if they don't exist `ALTER TABLE tasks ADD COLUMN current_step TEXT`, `ALTER TABLE tasks ADD COLUMN retry_count INTEGER DEFAULT 0`, diff --git a/internal/runner/client.go b/internal/runner/client.go index 7c66f19..7bbedbe 100644 --- a/internal/runner/client.go +++ b/internal/runner/client.go @@ -27,28 +27,31 @@ import ( // Client represents a runner client type Client struct { - managerURL string - name string - hostname string - ipAddress string - httpClient *http.Client - runnerID int64 - runnerSecret string - managerSecret string - wsConn *websocket.Conn - wsConnMu sync.RWMutex - stopChan chan struct{} + managerURL string + name string + hostname string + ipAddress string + httpClient *http.Client + runnerID int64 + runnerSecret string + managerSecret string + wsConn *websocket.Conn + wsConnMu sync.RWMutex + stopChan chan struct{} + stepStartTimes map[string]time.Time // key: "taskID:stepName" + stepTimesMu sync.RWMutex } // NewClient creates a new runner client func NewClient(managerURL, name, hostname, ipAddress string) *Client { return &Client{ - managerURL: managerURL, - name: name, - hostname: hostname, - ipAddress: ipAddress, - httpClient: &http.Client{Timeout: 30 * time.Second}, - stopChan: make(chan struct{}), + managerURL: managerURL, + name: name, + hostname: hostname, + ipAddress: ipAddress, + httpClient: &http.Client{Timeout: 30 * time.Second}, + stopChan: make(chan struct{}), + stepStartTimes: make(map[string]time.Time), } } @@ -259,14 +262,9 @@ func (c *Client) handleTaskAssignment(msg map[string]interface{}) { outputFormat, _ := data["output_format"].(string) frameStart, _ := data["frame_start"].(float64) frameEnd, _ := data["frame_end"].(float64) + taskType, _ := data["task_type"].(string) inputFilesRaw, _ := data["input_files"].([]interface{}) - if len(inputFilesRaw) == 0 { - log.Printf("No input files for task %v", taskID) - c.sendTaskComplete(int64(taskID), "", false, "No input files") - return - } - // Convert to task map format taskMap := map[string]interface{}{ "id": taskID, @@ -275,9 +273,27 @@ func (c *Client) handleTaskAssignment(msg map[string]interface{}) { "frame_end": frameEnd, } - // Process the task + // Process the task based on type go func() { - if err := c.processTask(taskMap, jobName, outputFormat, inputFilesRaw); err != nil { + var err error + if taskType == "metadata" { + if len(inputFilesRaw) == 0 { + log.Printf("No input files for metadata task %v", taskID) + c.sendTaskComplete(int64(taskID), "", false, "No input files") + return + } + err = c.processMetadataTask(taskMap, int64(jobID), inputFilesRaw) + } else if taskType == "video_generation" { + err = c.processVideoGenerationTask(taskMap, int64(jobID)) + } else { + if len(inputFilesRaw) == 0 { + log.Printf("No input files for task %v", taskID) + c.sendTaskComplete(int64(taskID), "", false, "No input files") + return + } + err = c.processTask(taskMap, jobName, outputFormat, inputFilesRaw) + } + if err != nil { log.Printf("Failed to process task %v: %v", taskID, err) c.sendTaskComplete(int64(taskID), "", false, err.Error()) } @@ -334,7 +350,78 @@ func (c *Client) sendLog(taskID int64, logLevel types.LogLevel, message, stepNam // sendStepUpdate sends a step start/complete event to the manager func (c *Client) sendStepUpdate(taskID int64, stepName string, status types.StepStatus, errorMsg string) { - // This would ideally be a separate endpoint, but for now we'll use logs + key := fmt.Sprintf("%d:%s", taskID, stepName) + var durationMs *int + + // Track step start time + if status == types.StepStatusRunning { + c.stepTimesMu.Lock() + c.stepStartTimes[key] = time.Now() + c.stepTimesMu.Unlock() + } + + // Calculate duration if step is completing + if status == types.StepStatusCompleted || status == types.StepStatusFailed { + c.stepTimesMu.RLock() + startTime, exists := c.stepStartTimes[key] + c.stepTimesMu.RUnlock() + if exists { + duration := int(time.Since(startTime).Milliseconds()) + durationMs = &duration + c.stepTimesMu.Lock() + delete(c.stepStartTimes, key) + c.stepTimesMu.Unlock() + } + } + + // Send step update via HTTP API + reqBody := map[string]interface{}{ + "step_name": stepName, + "status": string(status), + } + if durationMs != nil { + reqBody["duration_ms"] = *durationMs + } + if errorMsg != "" { + reqBody["error_message"] = errorMsg + } + + body, _ := json.Marshal(reqBody) + path := fmt.Sprintf("/api/runner/tasks/%d/steps?runner_id=%d", taskID, c.runnerID) + resp, err := c.doSignedRequest("POST", path, body) + if err != nil { + log.Printf("Failed to send step update: %v", err) + // Fallback to log-based tracking + msg := fmt.Sprintf("Step %s: %s", stepName, status) + if errorMsg != "" { + msg += " - " + errorMsg + } + logLevel := types.LogLevelInfo + if status == types.StepStatusFailed { + logLevel = types.LogLevelError + } + c.sendLog(taskID, logLevel, msg, stepName) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + log.Printf("Step update failed: %s", string(body)) + // Fallback to log-based tracking + msg := fmt.Sprintf("Step %s: %s", stepName, status) + if errorMsg != "" { + msg += " - " + errorMsg + } + logLevel := types.LogLevelInfo + if status == types.StepStatusFailed { + logLevel = types.LogLevelError + } + c.sendLog(taskID, logLevel, msg, stepName) + return + } + + // Also send log for debugging msg := fmt.Sprintf("Step %s: %s", stepName, status) if errorMsg != "" { msg += " - " + errorMsg @@ -455,33 +542,20 @@ func (c *Client) processTask(task map[string]interface{}, jobName, outputFormat } c.sendStepUpdate(taskID, "complete", types.StepStatusCompleted, "") - // For MP4 format, check if all frames are done and generate video - if outputFormat == "MP4" { - if err := c.checkAndGenerateMP4(jobID); err != nil { - log.Printf("Failed to generate MP4 for job %d: %v", jobID, err) - // Don't fail the task if video generation fails - frames are already uploaded - } - } - return nil } -// checkAndGenerateMP4 checks if all frames are complete and generates MP4 if so -func (c *Client) checkAndGenerateMP4(jobID int64) error { - // Check job status - job, err := c.getJobStatus(jobID) - if err != nil { - return fmt.Errorf("failed to get job status: %w", err) - } +// processVideoGenerationTask processes a video generation task +func (c *Client) processVideoGenerationTask(task map[string]interface{}, jobID int64) error { + taskID := int64(task["id"].(float64)) - if job["status"] != "completed" { - log.Printf("Job %d not yet complete (%v), skipping MP4 generation", jobID, job["status"]) - return nil - } + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting video generation task: job %d", jobID), "") + log.Printf("Processing video generation task %d for job %d", taskID, jobID) // Get all output files for this job files, err := c.getJobFiles(jobID) if err != nil { + c.sendStepUpdate(taskID, "get_files", types.StepStatusFailed, err.Error()) return fmt.Errorf("failed to get job files: %w", err) } @@ -496,14 +570,24 @@ func (c *Client) checkAndGenerateMP4(jobID int64) error { } if len(pngFiles) == 0 { - return fmt.Errorf("no PNG frame files found for MP4 generation") + err := fmt.Errorf("no PNG frame files found for MP4 generation") + c.sendStepUpdate(taskID, "get_files", types.StepStatusFailed, err.Error()) + return err } + c.sendStepUpdate(taskID, "get_files", types.StepStatusCompleted, "") + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Found %d PNG frames for video generation", len(pngFiles)), "get_files") + log.Printf("Generating MP4 for job %d from %d PNG frames", jobID, len(pngFiles)) + // Step: download_frames + c.sendStepUpdate(taskID, "download_frames", types.StepStatusRunning, "") + c.sendLog(taskID, types.LogLevelInfo, "Downloading PNG frames...", "download_frames") + // Create work directory for video generation workDir := filepath.Join(os.TempDir(), fmt.Sprintf("fuego-video-%d", jobID)) if err := os.MkdirAll(workDir, 0755); err != nil { + c.sendStepUpdate(taskID, "download_frames", types.StepStatusFailed, err.Error()) return fmt.Errorf("failed to create work directory: %w", err) } defer os.RemoveAll(workDir) @@ -521,11 +605,19 @@ func (c *Client) checkAndGenerateMP4(jobID int64) error { } if len(frameFiles) == 0 { - return fmt.Errorf("failed to download any frame files") + err := fmt.Errorf("failed to download any frame files") + c.sendStepUpdate(taskID, "download_frames", types.StepStatusFailed, err.Error()) + return err } // Sort frame files by name to ensure correct order sort.Strings(frameFiles) + c.sendStepUpdate(taskID, "download_frames", types.StepStatusCompleted, "") + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Downloaded %d frames", len(frameFiles)), "download_frames") + + // Step: generate_video + c.sendStepUpdate(taskID, "generate_video", types.StepStatusRunning, "") + c.sendLog(taskID, types.LogLevelInfo, "Generating MP4 video with ffmpeg...", "generate_video") // Generate MP4 using ffmpeg outputMP4 := filepath.Join(workDir, fmt.Sprintf("output_%d.mp4", jobID)) @@ -546,20 +638,42 @@ func (c *Client) checkAndGenerateMP4(jobID int64) error { if err != nil { // Try alternative method with concat demuxer log.Printf("First ffmpeg attempt failed, trying concat method: %s", string(output)) - return c.generateMP4WithConcat(frameFiles, outputMP4, workDir) + err = c.generateMP4WithConcat(frameFiles, outputMP4, workDir) + if err != nil { + c.sendStepUpdate(taskID, "generate_video", types.StepStatusFailed, err.Error()) + return err + } } // Check if MP4 was created if _, err := os.Stat(outputMP4); os.IsNotExist(err) { - return fmt.Errorf("MP4 file not created: %s", outputMP4) + err := fmt.Errorf("MP4 file not created: %s", outputMP4) + c.sendStepUpdate(taskID, "generate_video", types.StepStatusFailed, err.Error()) + return err } + c.sendStepUpdate(taskID, "generate_video", types.StepStatusCompleted, "") + c.sendLog(taskID, types.LogLevelInfo, "MP4 video generated successfully", "generate_video") + + // Step: upload_video + c.sendStepUpdate(taskID, "upload_video", types.StepStatusRunning, "") + c.sendLog(taskID, types.LogLevelInfo, "Uploading MP4 video...", "upload_video") + // Upload MP4 file mp4Path, err := c.uploadFile(jobID, outputMP4) if err != nil { + c.sendStepUpdate(taskID, "upload_video", types.StepStatusFailed, err.Error()) return fmt.Errorf("failed to upload MP4: %w", err) } + c.sendStepUpdate(taskID, "upload_video", types.StepStatusCompleted, "") + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Successfully uploaded MP4: %s", mp4Path), "upload_video") + + // Mark task as complete + if err := c.completeTask(taskID, mp4Path, true, ""); err != nil { + return err + } + log.Printf("Successfully generated and uploaded MP4 for job %d: %s", jobID, mp4Path) return nil } @@ -785,6 +899,197 @@ func (c *Client) uploadFile(jobID int64, filePath string) (string, error) { return result.FilePath, nil } +// processMetadataTask processes a metadata extraction task +func (c *Client) processMetadataTask(task map[string]interface{}, jobID int64, inputFiles []interface{}) error { + taskID := int64(task["id"].(float64)) + + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting metadata extraction task: job %d", jobID), "") + log.Printf("Processing metadata extraction task %d for job %d", taskID, jobID) + + // Create work directory + workDir := filepath.Join(os.TempDir(), fmt.Sprintf("fuego-metadata-%d", taskID)) + if err := os.MkdirAll(workDir, 0755); err != nil { + return fmt.Errorf("failed to create work directory: %w", err) + } + defer os.RemoveAll(workDir) + + // Step: download + c.sendStepUpdate(taskID, "download", types.StepStatusRunning, "") + c.sendLog(taskID, types.LogLevelInfo, "Downloading blend file...", "download") + blendFile := "" + for _, filePath := range inputFiles { + filePathStr := filePath.(string) + if err := c.downloadFile(filePathStr, workDir); err != nil { + c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error()) + return fmt.Errorf("failed to download file %s: %w", filePathStr, err) + } + if filepath.Ext(filePathStr) == ".blend" { + blendFile = filepath.Join(workDir, filepath.Base(filePathStr)) + } + } + + if blendFile == "" { + err := fmt.Errorf("no .blend file found in input files") + c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error()) + return err + } + c.sendStepUpdate(taskID, "download", types.StepStatusCompleted, "") + c.sendLog(taskID, types.LogLevelInfo, "Blend file downloaded successfully", "download") + + // Step: extract_metadata + c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusRunning, "") + c.sendLog(taskID, types.LogLevelInfo, "Extracting metadata from blend file...", "extract_metadata") + + // Create Python script to extract metadata + scriptPath := filepath.Join(workDir, "extract_metadata.py") + scriptContent := `import bpy +import json +import sys + +# Get scene +scene = bpy.context.scene + +# Extract frame range +frame_start = scene.frame_start +frame_end = scene.frame_end + +# Extract render settings +render = scene.render +resolution_x = render.resolution_x +resolution_y = render.resolution_y +samples = scene.cycles.samples if scene.cycles else scene.eevee.taa_render_samples +engine = scene.render.engine.lower() + +# Determine output format from file format +output_format = render.image_settings.file_format + +# Extract scene info +camera_count = len([obj for obj in scene.objects if obj.type == 'CAMERA']) +object_count = len(scene.objects) +material_count = len(bpy.data.materials) + +# Build metadata dictionary +metadata = { + "frame_start": frame_start, + "frame_end": frame_end, + "render_settings": { + "resolution_x": resolution_x, + "resolution_y": resolution_y, + "samples": samples, + "output_format": output_format, + "engine": engine + }, + "scene_info": { + "camera_count": camera_count, + "object_count": object_count, + "material_count": material_count + } +} + +# Output as JSON +print(json.dumps(metadata)) +sys.stdout.flush() +` + + if err := os.WriteFile(scriptPath, []byte(scriptContent), 0644); err != nil { + c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, err.Error()) + return fmt.Errorf("failed to create extraction script: %w", err) + } + + // Execute Blender with Python script + cmd := exec.Command("blender", "-b", blendFile, "--python", scriptPath) + cmd.Dir = workDir + output, err := cmd.CombinedOutput() + if err != nil { + errMsg := fmt.Sprintf("blender metadata extraction failed: %w\nOutput: %s", err, string(output)) + c.sendLog(taskID, types.LogLevelError, errMsg, "extract_metadata") + c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, errMsg) + return fmt.Errorf(errMsg) + } + + // Parse output (metadata is printed to stdout) + metadataJSON := strings.TrimSpace(string(output)) + // Extract JSON from output (Blender may print other stuff) + jsonStart := strings.Index(metadataJSON, "{") + jsonEnd := strings.LastIndex(metadataJSON, "}") + if jsonStart == -1 || jsonEnd == -1 || jsonEnd <= jsonStart { + errMsg := "Failed to extract JSON from Blender output" + c.sendLog(taskID, types.LogLevelError, errMsg, "extract_metadata") + c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, errMsg) + return fmt.Errorf(errMsg) + } + metadataJSON = metadataJSON[jsonStart : jsonEnd+1] + + var metadata types.BlendMetadata + if err := json.Unmarshal([]byte(metadataJSON), &metadata); err != nil { + errMsg := fmt.Sprintf("Failed to parse metadata JSON: %w", err) + c.sendLog(taskID, types.LogLevelError, errMsg, "extract_metadata") + c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, errMsg) + return fmt.Errorf(errMsg) + } + + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Metadata extracted: frames %d-%d, resolution %dx%d", + metadata.FrameStart, metadata.FrameEnd, metadata.RenderSettings.ResolutionX, metadata.RenderSettings.ResolutionY), "extract_metadata") + c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusCompleted, "") + + // Step: submit_metadata + c.sendStepUpdate(taskID, "submit_metadata", types.StepStatusRunning, "") + c.sendLog(taskID, types.LogLevelInfo, "Submitting metadata to manager...", "submit_metadata") + + // Submit metadata to manager + if err := c.submitMetadata(jobID, metadata); err != nil { + errMsg := fmt.Sprintf("Failed to submit metadata: %w", err) + c.sendLog(taskID, types.LogLevelError, errMsg, "submit_metadata") + c.sendStepUpdate(taskID, "submit_metadata", types.StepStatusFailed, errMsg) + return fmt.Errorf(errMsg) + } + + c.sendStepUpdate(taskID, "submit_metadata", types.StepStatusCompleted, "") + c.sendLog(taskID, types.LogLevelInfo, "Metadata extraction completed successfully", "") + + // Mark task as complete + c.sendTaskComplete(taskID, "", true, "") + return nil +} + +// submitMetadata submits extracted metadata to the manager +func (c *Client) submitMetadata(jobID int64, metadata types.BlendMetadata) error { + metadataJSON, err := json.Marshal(metadata) + if err != nil { + return fmt.Errorf("failed to marshal metadata: %w", err) + } + + path := fmt.Sprintf("/api/runner/jobs/%d/metadata?runner_id=%d", jobID, c.runnerID) + timestamp := time.Now() + message := fmt.Sprintf("POST\n%s\n%s\n%d", path, string(metadataJSON), timestamp.Unix()) + h := hmac.New(sha256.New, []byte(c.runnerSecret)) + h.Write([]byte(message)) + signature := hex.EncodeToString(h.Sum(nil)) + + url := fmt.Sprintf("%s%s", c.managerURL, path) + req, err := http.NewRequest("POST", url, bytes.NewReader(metadataJSON)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Runner-Signature", signature) + req.Header.Set("X-Runner-Timestamp", fmt.Sprintf("%d", timestamp.Unix())) + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to submit metadata: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("metadata submission failed: %s", string(body)) + } + + return nil +} + // completeTask marks a task as complete via WebSocket (or HTTP fallback) func (c *Client) completeTask(taskID int64, outputPath string, success bool, errorMsg string) error { return c.sendTaskComplete(taskID, outputPath, success, errorMsg) diff --git a/pkg/types/types.go b/pkg/types/types.go index dbe69c0..5b4f7f3 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -35,6 +35,7 @@ type Job struct { OutputFormat string `json:"output_format"` // PNG, JPEG, EXR, etc. AllowParallelRunners bool `json:"allow_parallel_runners"` // Allow multiple runners to work on this job TimeoutSeconds int `json:"timeout_seconds"` // Job-level timeout (24 hours default) + BlendMetadata *BlendMetadata `json:"blend_metadata,omitempty"` // Extracted metadata from blend file CreatedAt time.Time `json:"created_at"` StartedAt *time.Time `json:"started_at,omitempty"` CompletedAt *time.Time `json:"completed_at,omitempty"` @@ -72,6 +73,15 @@ const ( TaskStatusFailed TaskStatus = "failed" ) +// TaskType represents the type of a task +type TaskType string + +const ( + TaskTypeRender TaskType = "render" + TaskTypeMetadata TaskType = "metadata" + TaskTypeVideoGeneration TaskType = "video_generation" +) + // Task represents a render task assigned to a runner type Task struct { ID int64 `json:"id"` @@ -79,6 +89,7 @@ type Task struct { RunnerID *int64 `json:"runner_id,omitempty"` FrameStart int `json:"frame_start"` FrameEnd int `json:"frame_end"` + TaskType TaskType `json:"task_type"` Status TaskStatus `json:"status"` CurrentStep string `json:"current_step,omitempty"` RetryCount int `json:"retry_count"` @@ -199,3 +210,27 @@ type TaskLogEntry struct { StepName string `json:"step_name,omitempty"` } +// BlendMetadata represents extracted metadata from a blend file +type BlendMetadata struct { + FrameStart int `json:"frame_start"` + FrameEnd int `json:"frame_end"` + RenderSettings RenderSettings `json:"render_settings"` + SceneInfo SceneInfo `json:"scene_info"` +} + +// RenderSettings represents render settings from a blend file +type RenderSettings struct { + ResolutionX int `json:"resolution_x"` + ResolutionY int `json:"resolution_y"` + Samples int `json:"samples"` + OutputFormat string `json:"output_format"` + Engine string `json:"engine"` +} + +// SceneInfo represents scene information from a blend file +type SceneInfo struct { + CameraCount int `json:"camera_count"` + ObjectCount int `json:"object_count"` + MaterialCount int `json:"material_count"` +} + diff --git a/web/src/components/JobDetails.jsx b/web/src/components/JobDetails.jsx index ac81405..df7dcab 100644 --- a/web/src/components/JobDetails.jsx +++ b/web/src/components/JobDetails.jsx @@ -5,6 +5,7 @@ import VideoPlayer from './VideoPlayer'; export default function JobDetails({ job, onClose, onUpdate }) { const [jobDetails, setJobDetails] = useState(job); const [files, setFiles] = useState([]); + const [tasks, setTasks] = useState([]); const [loading, setLoading] = useState(true); const [videoUrl, setVideoUrl] = useState(null); const [selectedTaskId, setSelectedTaskId] = useState(null); @@ -42,12 +43,14 @@ export default function JobDetails({ job, onClose, onUpdate }) { const loadDetails = async () => { try { - const [details, fileList] = await Promise.all([ + const [details, fileList, taskList] = await Promise.all([ jobs.get(job.id), jobs.getFiles(job.id), + jobs.getTasks(job.id), ]); setJobDetails(details); setFiles(fileList); + setTasks(taskList); // Check if there's an MP4 output file const mp4File = fileList.find( @@ -151,6 +154,16 @@ export default function JobDetails({ job, onClose, onUpdate }) { } }; + const getTaskStatusColor = (status) => { + const colors = { + pending: 'bg-yellow-100 text-yellow-800', + running: 'bg-blue-100 text-blue-800', + completed: 'bg-green-100 text-green-800', + failed: 'bg-red-100 text-red-800', + }; + return colors[status] || 'bg-gray-100 text-gray-800'; + }; + const outputFiles = files.filter((f) => f.file_type === 'output'); const inputFiles = files.filter((f) => f.file_type === 'input'); @@ -269,9 +282,42 @@ export default function JobDetails({ job, onClose, onUpdate }) {

- Task Execution + Tasks

+ {tasks.length > 0 && ( +
+

Task List

+
+ {tasks.map((task) => ( +
handleTaskClick(task.id)} + className={`flex items-center justify-between p-3 bg-white rounded cursor-pointer hover:bg-gray-100 transition-colors ${ + selectedTaskId === task.id ? 'ring-2 ring-purple-600' : '' + }`} + > +
+ + {task.status} + + + Frame {task.frame_start} + {task.frame_end !== task.frame_start ? `-${task.frame_end}` : ''} + + {task.task_type && task.task_type !== 'render' && ( + ({task.task_type}) + )} +
+
+ {task.runner_id && `Runner ${task.runner_id}`} +
+
+ ))} +
+
+ )} + {taskSteps.length > 0 && (

Steps

diff --git a/web/src/components/JobSubmission.jsx b/web/src/components/JobSubmission.jsx index ca1617c..5f9cb09 100644 --- a/web/src/components/JobSubmission.jsx +++ b/web/src/components/JobSubmission.jsx @@ -1,4 +1,4 @@ -import { useState } from 'react'; +import { useState, useEffect } from 'react'; import { jobs } from '../utils/api'; export default function JobSubmission({ onSuccess }) { @@ -12,6 +12,113 @@ export default function JobSubmission({ onSuccess }) { const [file, setFile] = useState(null); const [submitting, setSubmitting] = useState(false); const [error, setError] = useState(''); + const [metadataStatus, setMetadataStatus] = useState(null); // 'extracting', 'completed', 'error' + const [metadata, setMetadata] = useState(null); + const [currentJobId, setCurrentJobId] = useState(null); + + // Poll for metadata after file upload + useEffect(() => { + if (!currentJobId || metadataStatus !== 'extracting') return; + + let pollCount = 0; + const maxPolls = 30; // 60 seconds max (30 * 2 seconds) + let timeoutId = null; + + const pollMetadata = async () => { + pollCount++; + + // Stop polling after timeout + if (pollCount > maxPolls) { + setMetadataStatus('error'); + // Cancel temp job on timeout + try { + await jobs.cancel(currentJobId); + } catch (err) { + // Ignore errors when canceling + } + return; + } + + try { + const metadata = await jobs.getMetadata(currentJobId); + if (metadata) { + setMetadata(metadata); + setMetadataStatus('completed'); + // Auto-populate form fields + setFormData(prev => ({ + ...prev, + frame_start: metadata.frame_start || prev.frame_start, + frame_end: metadata.frame_end || prev.frame_end, + output_format: metadata.render_settings?.output_format || prev.output_format, + })); + } + } catch (err) { + // Metadata not ready yet, continue polling (only if 404/not found) + if (err.message.includes('404') || err.message.includes('not found')) { + // Continue polling via interval + } else { + setMetadataStatus('error'); + } + } + }; + + const interval = setInterval(pollMetadata, 2000); + + // Set timeout to stop polling after 60 seconds + timeoutId = setTimeout(() => { + clearInterval(interval); + if (metadataStatus === 'extracting') { + setMetadataStatus('error'); + // Cancel temp job on timeout + jobs.cancel(currentJobId).catch(() => {}); + } + }, 60000); + + return () => { + clearInterval(interval); + if (timeoutId) clearTimeout(timeoutId); + // Cleanup: cancel temp job if component unmounts during extraction + if (currentJobId && metadataStatus === 'extracting') { + jobs.cancel(currentJobId).catch(() => {}); + } + }; + }, [currentJobId, metadataStatus]); + + const handleFileChange = async (e) => { + const selectedFile = e.target.files[0]; + if (!selectedFile) { + setFile(null); + return; + } + + setFile(selectedFile); + setMetadataStatus(null); + setMetadata(null); + setCurrentJobId(null); + + // If it's a blend file, create a temporary job to extract metadata + if (selectedFile.name.toLowerCase().endsWith('.blend')) { + try { + // Create a temporary job for metadata extraction + const tempJob = await jobs.create({ + name: 'Metadata Extraction', + frame_start: 1, + frame_end: 10, + output_format: 'PNG', + allow_parallel_runners: true, + }); + + setCurrentJobId(tempJob.id); + setMetadataStatus('extracting'); + + // Upload file to trigger metadata extraction + await jobs.uploadFile(tempJob.id, selectedFile); + } catch (err) { + console.error('Failed to start metadata extraction:', err); + setMetadataStatus('error'); + } + } + }; const handleSubmit = async (e) => { e.preventDefault(); @@ -27,7 +134,16 @@ export default function JobSubmission({ onSuccess }) { throw new Error('Invalid frame range'); } - // Create job + // If we have a temporary job for metadata extraction, cancel it + if (currentJobId) { + try { + await jobs.cancel(currentJobId); + } catch (err) { + // Ignore errors when canceling temp job + } + } + + // Create actual job const job = await jobs.create({ name: formData.name, frame_start: parseInt(formData.frame_start), @@ -48,6 +164,9 @@ export default function JobSubmission({ onSuccess }) { allow_parallel_runners: true, }); setFile(null); + setMetadata(null); + setMetadataStatus(null); + setCurrentJobId(null); e.target.reset(); if (onSuccess) { @@ -150,10 +269,37 @@ export default function JobSubmission({ onSuccess }) { setFile(e.target.files[0])} + onChange={handleFileChange} required className="w-full px-4 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-purple-600 focus:border-transparent file:mr-4 file:py-2 file:px-4 file:rounded-lg file:border-0 file:text-sm file:font-semibold file:bg-purple-50 file:text-purple-700 hover:file:bg-purple-100" /> + {metadataStatus === 'extracting' && ( +
+ Extracting metadata from blend file... +
+ )} + {metadataStatus === 'completed' && metadata && ( +
+
Metadata extracted successfully!
+
+
Frames: {metadata.frame_start} - {metadata.frame_end}
+
Resolution: {metadata.render_settings?.resolution_x} x {metadata.render_settings?.resolution_y}
+
Engine: {metadata.render_settings?.engine}
+
Samples: {metadata.render_settings?.samples}
+
Form fields have been auto-populated. You can adjust them if needed.
+ {(formData.frame_start < metadata.frame_start || formData.frame_end > metadata.frame_end) && ( +
+ Warning: Your frame range ({formData.frame_start}-{formData.frame_end}) exceeds the blend file's range ({metadata.frame_start}-{metadata.frame_end}). This may cause errors. +
+ )} +
+
+ )} + {metadataStatus === 'error' && ( +
+ Could not extract metadata. Please fill in the form manually. +
+ )}