diff --git a/internal/api/jobs.go b/internal/api/jobs.go index 308454b..be6f29f 100644 --- a/internal/api/jobs.go +++ b/internal/api/jobs.go @@ -8,6 +8,7 @@ import ( "log" "net/http" "strconv" + "strings" "time" "fuego/pkg/types" @@ -39,6 +40,13 @@ func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) { return } + // Validate frame range limits (prevent abuse) + const maxFrameRange = 10000 + if req.FrameEnd-req.FrameStart+1 > maxFrameRange { + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Frame range too large. Maximum allowed: %d frames", maxFrameRange)) + return + } + if req.OutputFormat == "" { req.OutputFormat = "PNG" } @@ -116,7 +124,7 @@ func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) { rows, err := s.db.Query( `SELECT id, user_id, name, status, progress, frame_start, frame_end, output_format, - allow_parallel_runners, timeout_seconds, created_at, started_at, completed_at, error_message + allow_parallel_runners, timeout_seconds, blend_metadata, created_at, started_at, completed_at, error_message FROM jobs WHERE user_id = ? ORDER BY created_at DESC`, userID, ) @@ -130,11 +138,12 @@ func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) { for rows.Next() { var job types.Job var startedAt, completedAt sql.NullTime + var blendMetadataJSON sql.NullString err := rows.Scan( &job.ID, &job.UserID, &job.Name, &job.Status, &job.Progress, &job.FrameStart, &job.FrameEnd, &job.OutputFormat, &job.AllowParallelRunners, &job.TimeoutSeconds, - &job.CreatedAt, &startedAt, &completedAt, &job.ErrorMessage, + &blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &job.ErrorMessage, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan job: %v", err)) @@ -147,6 +156,12 @@ func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) { if completedAt.Valid { job.CompletedAt = &completedAt.Time } + if blendMetadataJSON.Valid && blendMetadataJSON.String != "" { + var metadata types.BlendMetadata + if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err == nil { + job.BlendMetadata = &metadata + } + } jobs = append(jobs, job) } @@ -171,15 +186,16 @@ func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request) { var job types.Job var startedAt, completedAt sql.NullTime + var blendMetadataJSON sql.NullString err = s.db.QueryRow( `SELECT id, user_id, name, status, progress, frame_start, frame_end, output_format, - allow_parallel_runners, timeout_seconds, created_at, started_at, completed_at, error_message + allow_parallel_runners, timeout_seconds, blend_metadata, created_at, started_at, completed_at, error_message FROM jobs WHERE id = ? AND user_id = ?`, jobID, userID, ).Scan( &job.ID, &job.UserID, &job.Name, &job.Status, &job.Progress, &job.FrameStart, &job.FrameEnd, &job.OutputFormat, &job.AllowParallelRunners, &job.TimeoutSeconds, - &job.CreatedAt, &startedAt, &completedAt, &job.ErrorMessage, + &blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &job.ErrorMessage, ) if err == sql.ErrNoRows { @@ -197,6 +213,12 @@ func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request) { if completedAt.Valid { job.CompletedAt = &completedAt.Time } + if blendMetadataJSON.Valid && blendMetadataJSON.String != "" { + var metadata types.BlendMetadata + if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err == nil { + job.BlendMetadata = &metadata + } + } s.respondJSON(w, http.StatusOK, job) } @@ -307,6 +329,25 @@ func (s *Server) handleUploadJobFile(w http.ResponseWriter, r *http.Request) { fileID, _ := result.LastInsertId() + // If this is a blend file, create a metadata extraction task + if strings.HasSuffix(strings.ToLower(header.Filename), ".blend") { + // Create metadata extraction task + metadataTaskTimeout := 300 // 5 minutes for metadata extraction + taskResult, err := s.db.Exec( + `INSERT INTO tasks (job_id, frame_start, frame_end, task_type, status, timeout_seconds, max_retries) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + jobID, 0, 0, types.TaskTypeMetadata, types.TaskStatusPending, metadataTaskTimeout, 1, + ) + if err != nil { + log.Printf("Failed to create metadata extraction task: %v", err) + } else { + metadataTaskID, _ := taskResult.LastInsertId() + log.Printf("Created metadata extraction task %d for job %d", metadataTaskID, jobID) + // Try to distribute the task immediately + go s.distributeTasksToRunners() + } + } + s.respondJSON(w, http.StatusCreated, map[string]interface{}{ "id": fileID, "file_name": header.Filename, @@ -524,6 +565,87 @@ func (s *Server) handleStreamVideo(w http.ResponseWriter, r *http.Request) { } } +// handleListJobTasks lists all tasks for a job +func (s *Server) handleListJobTasks(w http.ResponseWriter, r *http.Request) { + userID, err := getUserID(r) + if err != nil { + s.respondError(w, http.StatusUnauthorized, err.Error()) + return + } + + jobID, err := parseID(r, "id") + if err != nil { + s.respondError(w, http.StatusBadRequest, err.Error()) + return + } + + // Verify job belongs to user + var jobUserID int64 + err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) + if err == sql.ErrNoRows { + s.respondError(w, http.StatusNotFound, "Job not found") + return + } + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) + return + } + if jobUserID != userID { + s.respondError(w, http.StatusForbidden, "Access denied") + return + } + + rows, err := s.db.Query( + `SELECT id, job_id, runner_id, frame_start, frame_end, status, task_type, + current_step, retry_count, max_retries, output_path, created_at, started_at, + completed_at, error_message, timeout_seconds + FROM tasks WHERE job_id = ? ORDER BY frame_start ASC`, + jobID, + ) + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query tasks: %v", err)) + return + } + defer rows.Close() + + tasks := []types.Task{} + for rows.Next() { + var task types.Task + var runnerID sql.NullInt64 + var startedAt, completedAt sql.NullTime + var timeoutSeconds sql.NullInt64 + + err := rows.Scan( + &task.ID, &task.JobID, &runnerID, &task.FrameStart, &task.FrameEnd, + &task.Status, &task.TaskType, &task.CurrentStep, &task.RetryCount, + &task.MaxRetries, &task.OutputPath, &task.CreatedAt, &startedAt, + &completedAt, &task.ErrorMessage, &timeoutSeconds, + ) + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan task: %v", err)) + return + } + + if runnerID.Valid { + task.RunnerID = &runnerID.Int64 + } + if startedAt.Valid { + task.StartedAt = &startedAt.Time + } + if completedAt.Valid { + task.CompletedAt = &completedAt.Time + } + if timeoutSeconds.Valid { + timeout := int(timeoutSeconds.Int64) + task.TimeoutSeconds = &timeout + } + + tasks = append(tasks, task) + } + + s.respondJSON(w, http.StatusOK, tasks) +} + // handleGetTaskLogs retrieves logs for a specific task func (s *Server) handleGetTaskLogs(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) diff --git a/internal/api/metadata.go b/internal/api/metadata.go new file mode 100644 index 0000000..e7bc12e --- /dev/null +++ b/internal/api/metadata.go @@ -0,0 +1,139 @@ +package api + +import ( + "database/sql" + "encoding/json" + "fmt" + "log" + "net/http" + + "fuego/pkg/types" +) + +// handleSubmitMetadata handles metadata submission from runner +func (s *Server) handleSubmitMetadata(w http.ResponseWriter, r *http.Request) { + jobID, err := parseID(r, "jobId") + if err != nil { + s.respondError(w, http.StatusBadRequest, err.Error()) + return + } + + // Get runner ID from context (set by runnerAuthMiddleware) + runnerID, ok := r.Context().Value("runner_id").(int64) + if !ok { + s.respondError(w, http.StatusUnauthorized, "runner_id not found in context") + return + } + + var metadata types.BlendMetadata + if err := json.NewDecoder(r.Body).Decode(&metadata); err != nil { + s.respondError(w, http.StatusBadRequest, "Invalid metadata JSON") + return + } + + // Verify job exists + var jobUserID int64 + err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) + if err == sql.ErrNoRows { + s.respondError(w, http.StatusNotFound, "Job not found") + return + } + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) + return + } + + // Find the metadata extraction task for this job + var taskID int64 + err = s.db.QueryRow( + `SELECT id FROM tasks WHERE job_id = ? AND task_type = ? AND runner_id = ?`, + jobID, types.TaskTypeMetadata, runnerID, + ).Scan(&taskID) + if err == sql.ErrNoRows { + s.respondError(w, http.StatusNotFound, "Metadata extraction task not found") + return + } + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to find task: %v", err)) + return + } + + // Convert metadata to JSON + metadataJSON, err := json.Marshal(metadata) + if err != nil { + s.respondError(w, http.StatusInternalServerError, "Failed to marshal metadata") + return + } + + // Update job with metadata + _, err = s.db.Exec( + `UPDATE jobs SET blend_metadata = ? WHERE id = ?`, + string(metadataJSON), jobID, + ) + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to update job metadata: %v", err)) + return + } + + // Mark task as completed + _, err = s.db.Exec( + `UPDATE tasks SET status = ?, completed_at = CURRENT_TIMESTAMP WHERE id = ?`, + types.TaskStatusCompleted, taskID, + ) + if err != nil { + log.Printf("Failed to mark metadata task as completed: %v", err) + } + + log.Printf("Metadata extracted for job %d: frame_start=%d, frame_end=%d", jobID, metadata.FrameStart, metadata.FrameEnd) + + s.respondJSON(w, http.StatusOK, map[string]string{"message": "Metadata submitted successfully"}) +} + +// handleGetJobMetadata retrieves metadata for a job +func (s *Server) handleGetJobMetadata(w http.ResponseWriter, r *http.Request) { + userID, err := getUserID(r) + if err != nil { + s.respondError(w, http.StatusUnauthorized, err.Error()) + return + } + + jobID, err := parseID(r, "id") + if err != nil { + s.respondError(w, http.StatusBadRequest, err.Error()) + return + } + + // Verify job belongs to user + var jobUserID int64 + var blendMetadataJSON sql.NullString + err = s.db.QueryRow( + `SELECT user_id, blend_metadata FROM jobs WHERE id = ?`, + jobID, + ).Scan(&jobUserID, &blendMetadataJSON) + if err == sql.ErrNoRows { + s.respondError(w, http.StatusNotFound, "Job not found") + return + } + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err)) + return + } + if jobUserID != userID { + s.respondError(w, http.StatusForbidden, "Access denied") + return + } + + if !blendMetadataJSON.Valid || blendMetadataJSON.String == "" { + s.respondJSON(w, http.StatusOK, nil) + return + } + + var metadata types.BlendMetadata + if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err != nil { + s.respondError(w, http.StatusInternalServerError, "Failed to parse metadata") + return + } + + s.respondJSON(w, http.StatusOK, metadata) +} + diff --git a/internal/api/runners.go b/internal/api/runners.go index d0c4d41..68504f9 100644 --- a/internal/api/runners.go +++ b/internal/api/runners.go @@ -185,6 +185,117 @@ func (s *Server) handleUpdateTaskProgress(w http.ResponseWriter, r *http.Request s.respondJSON(w, http.StatusOK, map[string]string{"message": "Progress updated"}) } +// handleUpdateTaskStep handles step start/complete events from runners +func (s *Server) handleUpdateTaskStep(w http.ResponseWriter, r *http.Request) { + // Get runner ID from context (set by runnerAuthMiddleware) + runnerID, ok := r.Context().Value("runner_id").(int64) + if !ok { + s.respondError(w, http.StatusUnauthorized, "runner_id not found in context") + return + } + + taskID, err := parseID(r, "id") + if err != nil { + s.respondError(w, http.StatusBadRequest, err.Error()) + return + } + + var req struct { + StepName string `json:"step_name"` + Status string `json:"status"` // "pending", "running", "completed", "failed", "skipped" + DurationMs *int `json:"duration_ms,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + s.respondError(w, http.StatusBadRequest, "Invalid request body") + return + } + + // Verify task belongs to runner + var taskRunnerID sql.NullInt64 + err = s.db.QueryRow("SELECT runner_id FROM tasks WHERE id = ?", taskID).Scan(&taskRunnerID) + if err == sql.ErrNoRows { + s.respondError(w, http.StatusNotFound, "Task not found") + return + } + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err)) + return + } + if !taskRunnerID.Valid || taskRunnerID.Int64 != runnerID { + s.respondError(w, http.StatusForbidden, "Task does not belong to this runner") + return + } + + now := time.Now() + var stepID int64 + + // Check if step already exists + var existingStepID sql.NullInt64 + err = s.db.QueryRow( + `SELECT id FROM task_steps WHERE task_id = ? AND step_name = ?`, + taskID, req.StepName, + ).Scan(&existingStepID) + + if err == sql.ErrNoRows || !existingStepID.Valid { + // Create new step + var startedAt *time.Time + var completedAt *time.Time + if req.Status == string(types.StepStatusRunning) || req.Status == string(types.StepStatusCompleted) || req.Status == string(types.StepStatusFailed) { + startedAt = &now + } + if req.Status == string(types.StepStatusCompleted) || req.Status == string(types.StepStatusFailed) { + completedAt = &now + } + + result, err := s.db.Exec( + `INSERT INTO task_steps (task_id, step_name, status, started_at, completed_at, duration_ms, error_message) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + taskID, req.StepName, req.Status, startedAt, completedAt, req.DurationMs, req.ErrorMessage, + ) + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create step: %v", err)) + return + } + stepID, _ = result.LastInsertId() + } else { + // Update existing step + stepID = existingStepID.Int64 + var startedAt *time.Time + var completedAt *time.Time + + // Get existing started_at if status is running/completed/failed + if req.Status == string(types.StepStatusRunning) || req.Status == string(types.StepStatusCompleted) || req.Status == string(types.StepStatusFailed) { + var existingStartedAt sql.NullTime + s.db.QueryRow(`SELECT started_at FROM task_steps WHERE id = ?`, stepID).Scan(&existingStartedAt) + if existingStartedAt.Valid { + startedAt = &existingStartedAt.Time + } else { + startedAt = &now + } + } + + if req.Status == string(types.StepStatusCompleted) || req.Status == string(types.StepStatusFailed) { + completedAt = &now + } + + _, err = s.db.Exec( + `UPDATE task_steps SET status = ?, started_at = ?, completed_at = ?, duration_ms = ?, error_message = ? + WHERE id = ?`, + req.Status, startedAt, completedAt, req.DurationMs, req.ErrorMessage, stepID, + ) + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to update step: %v", err)) + return + } + } + + s.respondJSON(w, http.StatusOK, map[string]interface{}{ + "step_id": stepID, + "message": "Step updated successfully", + }) +} + // handleDownloadFileForRunner allows runners to download job files func (s *Server) handleDownloadFileForRunner(w http.ResponseWriter, r *http.Request) { jobID, err := parseID(r, "jobId") @@ -396,6 +507,7 @@ type WSTaskAssignment struct { OutputFormat string `json:"output_format"` FrameStart int `json:"frame_start"` FrameEnd int `json:"frame_end"` + TaskType string `json:"task_type"` InputFiles []string `json:"input_files"` } @@ -633,20 +745,41 @@ func (s *Server) handleWebSocketTaskComplete(runnerID int64, taskUpdate WSTaskUp taskUpdate.TaskID, ).Scan(&jobID, &frameStart, &frameEnd) if err == nil { + // Count total tasks excluding failed ones (failed tasks are retried, so we count them) + // We exclude tasks that are in a terminal failed state with max retries exceeded var totalTasks, completedTasks int - s.db.QueryRow(`SELECT COUNT(*) FROM tasks WHERE job_id = ?`, jobID).Scan(&totalTasks) + s.db.QueryRow( + `SELECT COUNT(*) FROM tasks WHERE job_id = ? AND status IN (?, ?, ?, ?)`, + jobID, types.TaskStatusPending, types.TaskStatusRunning, types.TaskStatusCompleted, types.TaskStatusFailed, + ).Scan(&totalTasks) s.db.QueryRow( `SELECT COUNT(*) FROM tasks WHERE job_id = ? AND status = ?`, jobID, types.TaskStatusCompleted, ).Scan(&completedTasks) - progress := float64(completedTasks) / float64(totalTasks) * 100.0 + // Handle edge cases: division by zero and all tasks cancelled + var progress float64 + if totalTasks == 0 { + // All tasks cancelled or no tasks, set progress to 0 + progress = 0.0 + } else { + progress = float64(completedTasks) / float64(totalTasks) * 100.0 + } var jobStatus string var outputFormat string s.db.QueryRow(`SELECT output_format FROM jobs WHERE id = ?`, jobID).Scan(&outputFormat) - if completedTasks == totalTasks { + // Check if all non-cancelled tasks are completed + var pendingOrRunningTasks int + s.db.QueryRow( + `SELECT COUNT(*) FROM tasks + WHERE job_id = ? AND status IN (?, ?)`, + jobID, types.TaskStatusPending, types.TaskStatusRunning, + ).Scan(&pendingOrRunningTasks) + + if pendingOrRunningTasks == 0 && totalTasks > 0 { + // All tasks are either completed or failed/cancelled jobStatus = string(types.JobStatusCompleted) s.db.Exec( `UPDATE jobs SET status = ?, progress = ?, completed_at = ? WHERE id = ?`, @@ -654,7 +787,20 @@ func (s *Server) handleWebSocketTaskComplete(runnerID int64, taskUpdate WSTaskUp ) if outputFormat == "MP4" { - go s.generateMP4Video(jobID) + // Create a video generation task instead of calling generateMP4Video directly + // This prevents race conditions when multiple runners complete frames simultaneously + videoTaskTimeout := 86400 // 24 hours for video generation + _, err := s.db.Exec( + `INSERT INTO tasks (job_id, frame_start, frame_end, task_type, status, timeout_seconds, max_retries) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + jobID, 0, 0, types.TaskTypeVideoGeneration, types.TaskStatusPending, videoTaskTimeout, 1, + ) + if err != nil { + log.Printf("Failed to create video generation task for job %d: %v", jobID, err) + } else { + // Try to distribute the task immediately + go s.distributeTasksToRunners() + } } } else { jobStatus = string(types.JobStatusRunning) @@ -712,7 +858,7 @@ func (s *Server) broadcastLogToFrontend(taskID int64, logEntry WSLogEntry) { func (s *Server) distributeTasksToRunners() { // Get all pending tasks rows, err := s.db.Query( - `SELECT t.id, t.job_id, t.frame_start, t.frame_end, j.allow_parallel_runners, j.status as job_status + `SELECT t.id, t.job_id, t.frame_start, t.frame_end, t.task_type, j.allow_parallel_runners, j.status as job_status FROM tasks t JOIN jobs j ON t.job_id = j.id WHERE t.status = ? AND j.status != ? @@ -731,6 +877,7 @@ func (s *Server) distributeTasksToRunners() { JobID int64 FrameStart int FrameEnd int + TaskType string AllowParallelRunners bool } @@ -740,11 +887,12 @@ func (s *Server) distributeTasksToRunners() { JobID int64 FrameStart int FrameEnd int + TaskType string AllowParallelRunners bool } var allowParallel int var jobStatus string - err := rows.Scan(&t.TaskID, &t.JobID, &t.FrameStart, &t.FrameEnd, &allowParallel, &jobStatus) + err := rows.Scan(&t.TaskID, &t.JobID, &t.FrameStart, &t.FrameEnd, &t.TaskType, &allowParallel, &jobStatus) if err != nil { continue } @@ -770,13 +918,6 @@ func (s *Server) distributeTasksToRunners() { // Distribute tasks to runners for _, task := range pendingTasks { - // Check if task is already assigned - var assignedRunnerID sql.NullInt64 - err := s.db.QueryRow("SELECT runner_id FROM tasks WHERE id = ?", task.TaskID).Scan(&assignedRunnerID) - if err == nil && assignedRunnerID.Valid { - continue // Already assigned - } - // Find available runner var selectedRunnerID int64 for _, runnerID := range connectedRunners { @@ -812,9 +953,40 @@ func (s *Server) distributeTasksToRunners() { continue // No available runner } - // Assign task to runner + // Atomically assign task to runner using UPDATE with WHERE runner_id IS NULL + // This prevents race conditions when multiple goroutines try to assign the same task + now := time.Now() + result, err := s.db.Exec( + `UPDATE tasks SET runner_id = ?, status = ?, started_at = ? + WHERE id = ? AND runner_id IS NULL AND status = ?`, + selectedRunnerID, types.TaskStatusRunning, now, task.TaskID, types.TaskStatusPending, + ) + if err != nil { + log.Printf("Failed to atomically assign task %d: %v", task.TaskID, err) + continue + } + + // Check if the update actually affected a row (task was successfully assigned) + rowsAffected, err := result.RowsAffected() + if err != nil { + log.Printf("Failed to get rows affected for task %d: %v", task.TaskID, err) + continue + } + + if rowsAffected == 0 { + // Task was already assigned by another goroutine, skip + continue + } + + // Task was successfully assigned, send via WebSocket if err := s.assignTaskToRunner(selectedRunnerID, task.TaskID); err != nil { - log.Printf("Failed to assign task %d to runner %d: %v", task.TaskID, selectedRunnerID, err) + log.Printf("Failed to send task %d to runner %d: %v", task.TaskID, selectedRunnerID, err) + // Rollback the assignment if WebSocket send fails + s.db.Exec( + `UPDATE tasks SET runner_id = NULL, status = ?, started_at = NULL + WHERE id = ?`, + types.TaskStatusPending, task.TaskID, + ) } } } @@ -831,20 +1003,20 @@ func (s *Server) assignTaskToRunner(runnerID int64, taskID int64) error { // Get task details var task WSTaskAssignment - var jobName, outputFormat string + var jobName, outputFormat, taskType string err := s.db.QueryRow( - `SELECT t.job_id, t.frame_start, t.frame_end, j.name, j.output_format + `SELECT t.job_id, t.frame_start, t.frame_end, t.task_type, j.name, j.output_format FROM tasks t JOIN jobs j ON t.job_id = j.id WHERE t.id = ?`, taskID, - ).Scan(&task.JobID, &task.FrameStart, &task.FrameEnd, &jobName, &outputFormat) + ).Scan(&task.JobID, &task.FrameStart, &task.FrameEnd, &taskType, &jobName, &outputFormat) if err != nil { return err } task.TaskID = taskID - task.JobID = task.JobID task.JobName = jobName task.OutputFormat = outputFormat + task.TaskType = taskType // Get input files rows, err := s.db.Query( @@ -861,14 +1033,15 @@ func (s *Server) assignTaskToRunner(runnerID int64, taskID int64) error { } } - // Assign task to runner in database - now := time.Now() - _, err = s.db.Exec( - `UPDATE tasks SET runner_id = ?, status = ?, started_at = ? WHERE id = ?`, - runnerID, types.TaskStatusRunning, now, taskID, - ) + // Note: Task is already assigned in database by the atomic update in distributeTasksToRunners + // We just need to verify it's still assigned to this runner + var assignedRunnerID sql.NullInt64 + err = s.db.QueryRow("SELECT runner_id FROM tasks WHERE id = ?", taskID).Scan(&assignedRunnerID) if err != nil { - return err + return fmt.Errorf("task not found: %w", err) + } + if !assignedRunnerID.Valid || assignedRunnerID.Int64 != runnerID { + return fmt.Errorf("task %d is not assigned to runner %d", taskID, runnerID) } // Send task via WebSocket diff --git a/internal/api/server.go b/internal/api/server.go index a3b6613..610d2ae 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -109,6 +109,8 @@ func (s *Server) setupRoutes() { r.Get("/{id}/files", s.handleListJobFiles) r.Get("/{id}/files/{fileId}/download", s.handleDownloadJobFile) r.Get("/{id}/video", s.handleStreamVideo) + r.Get("/{id}/metadata", s.handleGetJobMetadata) + r.Get("/{id}/tasks", s.handleListJobTasks) r.Get("/{id}/tasks/{taskId}/logs", s.handleGetTaskLogs) r.Get("/{id}/tasks/{taskId}/logs/ws", s.handleStreamTaskLogsWebSocket) r.Get("/{id}/tasks/{taskId}/steps", s.handleGetTaskSteps) @@ -153,10 +155,12 @@ func (s *Server) setupRoutes() { return http.HandlerFunc(s.runnerAuthMiddleware(next.ServeHTTP)) }) r.Post("/tasks/{id}/progress", s.handleUpdateTaskProgress) + r.Post("/tasks/{id}/steps", s.handleUpdateTaskStep) r.Get("/files/{jobId}/{fileName}", s.handleDownloadFileForRunner) r.Post("/files/{jobId}/upload", s.handleUploadFileFromRunner) r.Get("/jobs/{jobId}/status", s.handleGetJobStatusForRunner) r.Get("/jobs/{jobId}/files", s.handleGetJobFilesForRunner) + r.Post("/jobs/{jobId}/metadata", s.handleSubmitMetadata) }) }) diff --git a/internal/database/schema.go b/internal/database/schema.go index f418936..36d40e8 100644 --- a/internal/database/schema.go +++ b/internal/database/schema.go @@ -175,6 +175,10 @@ func (db *DB) migrate() error { `ALTER TABLE jobs ADD COLUMN allow_parallel_runners BOOLEAN NOT NULL DEFAULT 1`, // Add timeout_seconds to jobs if it doesn't exist `ALTER TABLE jobs ADD COLUMN timeout_seconds INTEGER DEFAULT 86400`, + // Add blend_metadata to jobs if it doesn't exist + `ALTER TABLE jobs ADD COLUMN blend_metadata TEXT`, + // Add task_type to tasks if it doesn't exist + `ALTER TABLE tasks ADD COLUMN task_type TEXT DEFAULT 'render'`, // Add new columns to tasks if they don't exist `ALTER TABLE tasks ADD COLUMN current_step TEXT`, `ALTER TABLE tasks ADD COLUMN retry_count INTEGER DEFAULT 0`, diff --git a/internal/runner/client.go b/internal/runner/client.go index 7c66f19..7bbedbe 100644 --- a/internal/runner/client.go +++ b/internal/runner/client.go @@ -27,28 +27,31 @@ import ( // Client represents a runner client type Client struct { - managerURL string - name string - hostname string - ipAddress string - httpClient *http.Client - runnerID int64 - runnerSecret string - managerSecret string - wsConn *websocket.Conn - wsConnMu sync.RWMutex - stopChan chan struct{} + managerURL string + name string + hostname string + ipAddress string + httpClient *http.Client + runnerID int64 + runnerSecret string + managerSecret string + wsConn *websocket.Conn + wsConnMu sync.RWMutex + stopChan chan struct{} + stepStartTimes map[string]time.Time // key: "taskID:stepName" + stepTimesMu sync.RWMutex } // NewClient creates a new runner client func NewClient(managerURL, name, hostname, ipAddress string) *Client { return &Client{ - managerURL: managerURL, - name: name, - hostname: hostname, - ipAddress: ipAddress, - httpClient: &http.Client{Timeout: 30 * time.Second}, - stopChan: make(chan struct{}), + managerURL: managerURL, + name: name, + hostname: hostname, + ipAddress: ipAddress, + httpClient: &http.Client{Timeout: 30 * time.Second}, + stopChan: make(chan struct{}), + stepStartTimes: make(map[string]time.Time), } } @@ -259,14 +262,9 @@ func (c *Client) handleTaskAssignment(msg map[string]interface{}) { outputFormat, _ := data["output_format"].(string) frameStart, _ := data["frame_start"].(float64) frameEnd, _ := data["frame_end"].(float64) + taskType, _ := data["task_type"].(string) inputFilesRaw, _ := data["input_files"].([]interface{}) - if len(inputFilesRaw) == 0 { - log.Printf("No input files for task %v", taskID) - c.sendTaskComplete(int64(taskID), "", false, "No input files") - return - } - // Convert to task map format taskMap := map[string]interface{}{ "id": taskID, @@ -275,9 +273,27 @@ func (c *Client) handleTaskAssignment(msg map[string]interface{}) { "frame_end": frameEnd, } - // Process the task + // Process the task based on type go func() { - if err := c.processTask(taskMap, jobName, outputFormat, inputFilesRaw); err != nil { + var err error + if taskType == "metadata" { + if len(inputFilesRaw) == 0 { + log.Printf("No input files for metadata task %v", taskID) + c.sendTaskComplete(int64(taskID), "", false, "No input files") + return + } + err = c.processMetadataTask(taskMap, int64(jobID), inputFilesRaw) + } else if taskType == "video_generation" { + err = c.processVideoGenerationTask(taskMap, int64(jobID)) + } else { + if len(inputFilesRaw) == 0 { + log.Printf("No input files for task %v", taskID) + c.sendTaskComplete(int64(taskID), "", false, "No input files") + return + } + err = c.processTask(taskMap, jobName, outputFormat, inputFilesRaw) + } + if err != nil { log.Printf("Failed to process task %v: %v", taskID, err) c.sendTaskComplete(int64(taskID), "", false, err.Error()) } @@ -334,7 +350,78 @@ func (c *Client) sendLog(taskID int64, logLevel types.LogLevel, message, stepNam // sendStepUpdate sends a step start/complete event to the manager func (c *Client) sendStepUpdate(taskID int64, stepName string, status types.StepStatus, errorMsg string) { - // This would ideally be a separate endpoint, but for now we'll use logs + key := fmt.Sprintf("%d:%s", taskID, stepName) + var durationMs *int + + // Track step start time + if status == types.StepStatusRunning { + c.stepTimesMu.Lock() + c.stepStartTimes[key] = time.Now() + c.stepTimesMu.Unlock() + } + + // Calculate duration if step is completing + if status == types.StepStatusCompleted || status == types.StepStatusFailed { + c.stepTimesMu.RLock() + startTime, exists := c.stepStartTimes[key] + c.stepTimesMu.RUnlock() + if exists { + duration := int(time.Since(startTime).Milliseconds()) + durationMs = &duration + c.stepTimesMu.Lock() + delete(c.stepStartTimes, key) + c.stepTimesMu.Unlock() + } + } + + // Send step update via HTTP API + reqBody := map[string]interface{}{ + "step_name": stepName, + "status": string(status), + } + if durationMs != nil { + reqBody["duration_ms"] = *durationMs + } + if errorMsg != "" { + reqBody["error_message"] = errorMsg + } + + body, _ := json.Marshal(reqBody) + path := fmt.Sprintf("/api/runner/tasks/%d/steps?runner_id=%d", taskID, c.runnerID) + resp, err := c.doSignedRequest("POST", path, body) + if err != nil { + log.Printf("Failed to send step update: %v", err) + // Fallback to log-based tracking + msg := fmt.Sprintf("Step %s: %s", stepName, status) + if errorMsg != "" { + msg += " - " + errorMsg + } + logLevel := types.LogLevelInfo + if status == types.StepStatusFailed { + logLevel = types.LogLevelError + } + c.sendLog(taskID, logLevel, msg, stepName) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + log.Printf("Step update failed: %s", string(body)) + // Fallback to log-based tracking + msg := fmt.Sprintf("Step %s: %s", stepName, status) + if errorMsg != "" { + msg += " - " + errorMsg + } + logLevel := types.LogLevelInfo + if status == types.StepStatusFailed { + logLevel = types.LogLevelError + } + c.sendLog(taskID, logLevel, msg, stepName) + return + } + + // Also send log for debugging msg := fmt.Sprintf("Step %s: %s", stepName, status) if errorMsg != "" { msg += " - " + errorMsg @@ -455,33 +542,20 @@ func (c *Client) processTask(task map[string]interface{}, jobName, outputFormat } c.sendStepUpdate(taskID, "complete", types.StepStatusCompleted, "") - // For MP4 format, check if all frames are done and generate video - if outputFormat == "MP4" { - if err := c.checkAndGenerateMP4(jobID); err != nil { - log.Printf("Failed to generate MP4 for job %d: %v", jobID, err) - // Don't fail the task if video generation fails - frames are already uploaded - } - } - return nil } -// checkAndGenerateMP4 checks if all frames are complete and generates MP4 if so -func (c *Client) checkAndGenerateMP4(jobID int64) error { - // Check job status - job, err := c.getJobStatus(jobID) - if err != nil { - return fmt.Errorf("failed to get job status: %w", err) - } +// processVideoGenerationTask processes a video generation task +func (c *Client) processVideoGenerationTask(task map[string]interface{}, jobID int64) error { + taskID := int64(task["id"].(float64)) - if job["status"] != "completed" { - log.Printf("Job %d not yet complete (%v), skipping MP4 generation", jobID, job["status"]) - return nil - } + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting video generation task: job %d", jobID), "") + log.Printf("Processing video generation task %d for job %d", taskID, jobID) // Get all output files for this job files, err := c.getJobFiles(jobID) if err != nil { + c.sendStepUpdate(taskID, "get_files", types.StepStatusFailed, err.Error()) return fmt.Errorf("failed to get job files: %w", err) } @@ -496,14 +570,24 @@ func (c *Client) checkAndGenerateMP4(jobID int64) error { } if len(pngFiles) == 0 { - return fmt.Errorf("no PNG frame files found for MP4 generation") + err := fmt.Errorf("no PNG frame files found for MP4 generation") + c.sendStepUpdate(taskID, "get_files", types.StepStatusFailed, err.Error()) + return err } + c.sendStepUpdate(taskID, "get_files", types.StepStatusCompleted, "") + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Found %d PNG frames for video generation", len(pngFiles)), "get_files") + log.Printf("Generating MP4 for job %d from %d PNG frames", jobID, len(pngFiles)) + // Step: download_frames + c.sendStepUpdate(taskID, "download_frames", types.StepStatusRunning, "") + c.sendLog(taskID, types.LogLevelInfo, "Downloading PNG frames...", "download_frames") + // Create work directory for video generation workDir := filepath.Join(os.TempDir(), fmt.Sprintf("fuego-video-%d", jobID)) if err := os.MkdirAll(workDir, 0755); err != nil { + c.sendStepUpdate(taskID, "download_frames", types.StepStatusFailed, err.Error()) return fmt.Errorf("failed to create work directory: %w", err) } defer os.RemoveAll(workDir) @@ -521,11 +605,19 @@ func (c *Client) checkAndGenerateMP4(jobID int64) error { } if len(frameFiles) == 0 { - return fmt.Errorf("failed to download any frame files") + err := fmt.Errorf("failed to download any frame files") + c.sendStepUpdate(taskID, "download_frames", types.StepStatusFailed, err.Error()) + return err } // Sort frame files by name to ensure correct order sort.Strings(frameFiles) + c.sendStepUpdate(taskID, "download_frames", types.StepStatusCompleted, "") + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Downloaded %d frames", len(frameFiles)), "download_frames") + + // Step: generate_video + c.sendStepUpdate(taskID, "generate_video", types.StepStatusRunning, "") + c.sendLog(taskID, types.LogLevelInfo, "Generating MP4 video with ffmpeg...", "generate_video") // Generate MP4 using ffmpeg outputMP4 := filepath.Join(workDir, fmt.Sprintf("output_%d.mp4", jobID)) @@ -546,20 +638,42 @@ func (c *Client) checkAndGenerateMP4(jobID int64) error { if err != nil { // Try alternative method with concat demuxer log.Printf("First ffmpeg attempt failed, trying concat method: %s", string(output)) - return c.generateMP4WithConcat(frameFiles, outputMP4, workDir) + err = c.generateMP4WithConcat(frameFiles, outputMP4, workDir) + if err != nil { + c.sendStepUpdate(taskID, "generate_video", types.StepStatusFailed, err.Error()) + return err + } } // Check if MP4 was created if _, err := os.Stat(outputMP4); os.IsNotExist(err) { - return fmt.Errorf("MP4 file not created: %s", outputMP4) + err := fmt.Errorf("MP4 file not created: %s", outputMP4) + c.sendStepUpdate(taskID, "generate_video", types.StepStatusFailed, err.Error()) + return err } + c.sendStepUpdate(taskID, "generate_video", types.StepStatusCompleted, "") + c.sendLog(taskID, types.LogLevelInfo, "MP4 video generated successfully", "generate_video") + + // Step: upload_video + c.sendStepUpdate(taskID, "upload_video", types.StepStatusRunning, "") + c.sendLog(taskID, types.LogLevelInfo, "Uploading MP4 video...", "upload_video") + // Upload MP4 file mp4Path, err := c.uploadFile(jobID, outputMP4) if err != nil { + c.sendStepUpdate(taskID, "upload_video", types.StepStatusFailed, err.Error()) return fmt.Errorf("failed to upload MP4: %w", err) } + c.sendStepUpdate(taskID, "upload_video", types.StepStatusCompleted, "") + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Successfully uploaded MP4: %s", mp4Path), "upload_video") + + // Mark task as complete + if err := c.completeTask(taskID, mp4Path, true, ""); err != nil { + return err + } + log.Printf("Successfully generated and uploaded MP4 for job %d: %s", jobID, mp4Path) return nil } @@ -785,6 +899,197 @@ func (c *Client) uploadFile(jobID int64, filePath string) (string, error) { return result.FilePath, nil } +// processMetadataTask processes a metadata extraction task +func (c *Client) processMetadataTask(task map[string]interface{}, jobID int64, inputFiles []interface{}) error { + taskID := int64(task["id"].(float64)) + + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting metadata extraction task: job %d", jobID), "") + log.Printf("Processing metadata extraction task %d for job %d", taskID, jobID) + + // Create work directory + workDir := filepath.Join(os.TempDir(), fmt.Sprintf("fuego-metadata-%d", taskID)) + if err := os.MkdirAll(workDir, 0755); err != nil { + return fmt.Errorf("failed to create work directory: %w", err) + } + defer os.RemoveAll(workDir) + + // Step: download + c.sendStepUpdate(taskID, "download", types.StepStatusRunning, "") + c.sendLog(taskID, types.LogLevelInfo, "Downloading blend file...", "download") + blendFile := "" + for _, filePath := range inputFiles { + filePathStr := filePath.(string) + if err := c.downloadFile(filePathStr, workDir); err != nil { + c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error()) + return fmt.Errorf("failed to download file %s: %w", filePathStr, err) + } + if filepath.Ext(filePathStr) == ".blend" { + blendFile = filepath.Join(workDir, filepath.Base(filePathStr)) + } + } + + if blendFile == "" { + err := fmt.Errorf("no .blend file found in input files") + c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error()) + return err + } + c.sendStepUpdate(taskID, "download", types.StepStatusCompleted, "") + c.sendLog(taskID, types.LogLevelInfo, "Blend file downloaded successfully", "download") + + // Step: extract_metadata + c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusRunning, "") + c.sendLog(taskID, types.LogLevelInfo, "Extracting metadata from blend file...", "extract_metadata") + + // Create Python script to extract metadata + scriptPath := filepath.Join(workDir, "extract_metadata.py") + scriptContent := `import bpy +import json +import sys + +# Get scene +scene = bpy.context.scene + +# Extract frame range +frame_start = scene.frame_start +frame_end = scene.frame_end + +# Extract render settings +render = scene.render +resolution_x = render.resolution_x +resolution_y = render.resolution_y +samples = scene.cycles.samples if scene.cycles else scene.eevee.taa_render_samples +engine = scene.render.engine.lower() + +# Determine output format from file format +output_format = render.image_settings.file_format + +# Extract scene info +camera_count = len([obj for obj in scene.objects if obj.type == 'CAMERA']) +object_count = len(scene.objects) +material_count = len(bpy.data.materials) + +# Build metadata dictionary +metadata = { + "frame_start": frame_start, + "frame_end": frame_end, + "render_settings": { + "resolution_x": resolution_x, + "resolution_y": resolution_y, + "samples": samples, + "output_format": output_format, + "engine": engine + }, + "scene_info": { + "camera_count": camera_count, + "object_count": object_count, + "material_count": material_count + } +} + +# Output as JSON +print(json.dumps(metadata)) +sys.stdout.flush() +` + + if err := os.WriteFile(scriptPath, []byte(scriptContent), 0644); err != nil { + c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, err.Error()) + return fmt.Errorf("failed to create extraction script: %w", err) + } + + // Execute Blender with Python script + cmd := exec.Command("blender", "-b", blendFile, "--python", scriptPath) + cmd.Dir = workDir + output, err := cmd.CombinedOutput() + if err != nil { + errMsg := fmt.Sprintf("blender metadata extraction failed: %w\nOutput: %s", err, string(output)) + c.sendLog(taskID, types.LogLevelError, errMsg, "extract_metadata") + c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, errMsg) + return fmt.Errorf(errMsg) + } + + // Parse output (metadata is printed to stdout) + metadataJSON := strings.TrimSpace(string(output)) + // Extract JSON from output (Blender may print other stuff) + jsonStart := strings.Index(metadataJSON, "{") + jsonEnd := strings.LastIndex(metadataJSON, "}") + if jsonStart == -1 || jsonEnd == -1 || jsonEnd <= jsonStart { + errMsg := "Failed to extract JSON from Blender output" + c.sendLog(taskID, types.LogLevelError, errMsg, "extract_metadata") + c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, errMsg) + return fmt.Errorf(errMsg) + } + metadataJSON = metadataJSON[jsonStart : jsonEnd+1] + + var metadata types.BlendMetadata + if err := json.Unmarshal([]byte(metadataJSON), &metadata); err != nil { + errMsg := fmt.Sprintf("Failed to parse metadata JSON: %w", err) + c.sendLog(taskID, types.LogLevelError, errMsg, "extract_metadata") + c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, errMsg) + return fmt.Errorf(errMsg) + } + + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Metadata extracted: frames %d-%d, resolution %dx%d", + metadata.FrameStart, metadata.FrameEnd, metadata.RenderSettings.ResolutionX, metadata.RenderSettings.ResolutionY), "extract_metadata") + c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusCompleted, "") + + // Step: submit_metadata + c.sendStepUpdate(taskID, "submit_metadata", types.StepStatusRunning, "") + c.sendLog(taskID, types.LogLevelInfo, "Submitting metadata to manager...", "submit_metadata") + + // Submit metadata to manager + if err := c.submitMetadata(jobID, metadata); err != nil { + errMsg := fmt.Sprintf("Failed to submit metadata: %w", err) + c.sendLog(taskID, types.LogLevelError, errMsg, "submit_metadata") + c.sendStepUpdate(taskID, "submit_metadata", types.StepStatusFailed, errMsg) + return fmt.Errorf(errMsg) + } + + c.sendStepUpdate(taskID, "submit_metadata", types.StepStatusCompleted, "") + c.sendLog(taskID, types.LogLevelInfo, "Metadata extraction completed successfully", "") + + // Mark task as complete + c.sendTaskComplete(taskID, "", true, "") + return nil +} + +// submitMetadata submits extracted metadata to the manager +func (c *Client) submitMetadata(jobID int64, metadata types.BlendMetadata) error { + metadataJSON, err := json.Marshal(metadata) + if err != nil { + return fmt.Errorf("failed to marshal metadata: %w", err) + } + + path := fmt.Sprintf("/api/runner/jobs/%d/metadata?runner_id=%d", jobID, c.runnerID) + timestamp := time.Now() + message := fmt.Sprintf("POST\n%s\n%s\n%d", path, string(metadataJSON), timestamp.Unix()) + h := hmac.New(sha256.New, []byte(c.runnerSecret)) + h.Write([]byte(message)) + signature := hex.EncodeToString(h.Sum(nil)) + + url := fmt.Sprintf("%s%s", c.managerURL, path) + req, err := http.NewRequest("POST", url, bytes.NewReader(metadataJSON)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Runner-Signature", signature) + req.Header.Set("X-Runner-Timestamp", fmt.Sprintf("%d", timestamp.Unix())) + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to submit metadata: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("metadata submission failed: %s", string(body)) + } + + return nil +} + // completeTask marks a task as complete via WebSocket (or HTTP fallback) func (c *Client) completeTask(taskID int64, outputPath string, success bool, errorMsg string) error { return c.sendTaskComplete(taskID, outputPath, success, errorMsg) diff --git a/pkg/types/types.go b/pkg/types/types.go index dbe69c0..5b4f7f3 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -35,6 +35,7 @@ type Job struct { OutputFormat string `json:"output_format"` // PNG, JPEG, EXR, etc. AllowParallelRunners bool `json:"allow_parallel_runners"` // Allow multiple runners to work on this job TimeoutSeconds int `json:"timeout_seconds"` // Job-level timeout (24 hours default) + BlendMetadata *BlendMetadata `json:"blend_metadata,omitempty"` // Extracted metadata from blend file CreatedAt time.Time `json:"created_at"` StartedAt *time.Time `json:"started_at,omitempty"` CompletedAt *time.Time `json:"completed_at,omitempty"` @@ -72,6 +73,15 @@ const ( TaskStatusFailed TaskStatus = "failed" ) +// TaskType represents the type of a task +type TaskType string + +const ( + TaskTypeRender TaskType = "render" + TaskTypeMetadata TaskType = "metadata" + TaskTypeVideoGeneration TaskType = "video_generation" +) + // Task represents a render task assigned to a runner type Task struct { ID int64 `json:"id"` @@ -79,6 +89,7 @@ type Task struct { RunnerID *int64 `json:"runner_id,omitempty"` FrameStart int `json:"frame_start"` FrameEnd int `json:"frame_end"` + TaskType TaskType `json:"task_type"` Status TaskStatus `json:"status"` CurrentStep string `json:"current_step,omitempty"` RetryCount int `json:"retry_count"` @@ -199,3 +210,27 @@ type TaskLogEntry struct { StepName string `json:"step_name,omitempty"` } +// BlendMetadata represents extracted metadata from a blend file +type BlendMetadata struct { + FrameStart int `json:"frame_start"` + FrameEnd int `json:"frame_end"` + RenderSettings RenderSettings `json:"render_settings"` + SceneInfo SceneInfo `json:"scene_info"` +} + +// RenderSettings represents render settings from a blend file +type RenderSettings struct { + ResolutionX int `json:"resolution_x"` + ResolutionY int `json:"resolution_y"` + Samples int `json:"samples"` + OutputFormat string `json:"output_format"` + Engine string `json:"engine"` +} + +// SceneInfo represents scene information from a blend file +type SceneInfo struct { + CameraCount int `json:"camera_count"` + ObjectCount int `json:"object_count"` + MaterialCount int `json:"material_count"` +} + diff --git a/web/src/components/JobDetails.jsx b/web/src/components/JobDetails.jsx index ac81405..df7dcab 100644 --- a/web/src/components/JobDetails.jsx +++ b/web/src/components/JobDetails.jsx @@ -5,6 +5,7 @@ import VideoPlayer from './VideoPlayer'; export default function JobDetails({ job, onClose, onUpdate }) { const [jobDetails, setJobDetails] = useState(job); const [files, setFiles] = useState([]); + const [tasks, setTasks] = useState([]); const [loading, setLoading] = useState(true); const [videoUrl, setVideoUrl] = useState(null); const [selectedTaskId, setSelectedTaskId] = useState(null); @@ -42,12 +43,14 @@ export default function JobDetails({ job, onClose, onUpdate }) { const loadDetails = async () => { try { - const [details, fileList] = await Promise.all([ + const [details, fileList, taskList] = await Promise.all([ jobs.get(job.id), jobs.getFiles(job.id), + jobs.getTasks(job.id), ]); setJobDetails(details); setFiles(fileList); + setTasks(taskList); // Check if there's an MP4 output file const mp4File = fileList.find( @@ -151,6 +154,16 @@ export default function JobDetails({ job, onClose, onUpdate }) { } }; + const getTaskStatusColor = (status) => { + const colors = { + pending: 'bg-yellow-100 text-yellow-800', + running: 'bg-blue-100 text-blue-800', + completed: 'bg-green-100 text-green-800', + failed: 'bg-red-100 text-red-800', + }; + return colors[status] || 'bg-gray-100 text-gray-800'; + }; + const outputFiles = files.filter((f) => f.file_type === 'output'); const inputFiles = files.filter((f) => f.file_type === 'input'); @@ -269,9 +282,42 @@ export default function JobDetails({ job, onClose, onUpdate }) {

- Task Execution + Tasks

+ {tasks.length > 0 && ( +
+

Task List

+
+ {tasks.map((task) => ( +
handleTaskClick(task.id)} + className={`flex items-center justify-between p-3 bg-white rounded cursor-pointer hover:bg-gray-100 transition-colors ${ + selectedTaskId === task.id ? 'ring-2 ring-purple-600' : '' + }`} + > +
+ + {task.status} + + + Frame {task.frame_start} + {task.frame_end !== task.frame_start ? `-${task.frame_end}` : ''} + + {task.task_type && task.task_type !== 'render' && ( + ({task.task_type}) + )} +
+
+ {task.runner_id && `Runner ${task.runner_id}`} +
+
+ ))} +
+
+ )} + {taskSteps.length > 0 && (

Steps

diff --git a/web/src/components/JobSubmission.jsx b/web/src/components/JobSubmission.jsx index ca1617c..5f9cb09 100644 --- a/web/src/components/JobSubmission.jsx +++ b/web/src/components/JobSubmission.jsx @@ -1,4 +1,4 @@ -import { useState } from 'react'; +import { useState, useEffect } from 'react'; import { jobs } from '../utils/api'; export default function JobSubmission({ onSuccess }) { @@ -12,6 +12,113 @@ export default function JobSubmission({ onSuccess }) { const [file, setFile] = useState(null); const [submitting, setSubmitting] = useState(false); const [error, setError] = useState(''); + const [metadataStatus, setMetadataStatus] = useState(null); // 'extracting', 'completed', 'error' + const [metadata, setMetadata] = useState(null); + const [currentJobId, setCurrentJobId] = useState(null); + + // Poll for metadata after file upload + useEffect(() => { + if (!currentJobId || metadataStatus !== 'extracting') return; + + let pollCount = 0; + const maxPolls = 30; // 60 seconds max (30 * 2 seconds) + let timeoutId = null; + + const pollMetadata = async () => { + pollCount++; + + // Stop polling after timeout + if (pollCount > maxPolls) { + setMetadataStatus('error'); + // Cancel temp job on timeout + try { + await jobs.cancel(currentJobId); + } catch (err) { + // Ignore errors when canceling + } + return; + } + + try { + const metadata = await jobs.getMetadata(currentJobId); + if (metadata) { + setMetadata(metadata); + setMetadataStatus('completed'); + // Auto-populate form fields + setFormData(prev => ({ + ...prev, + frame_start: metadata.frame_start || prev.frame_start, + frame_end: metadata.frame_end || prev.frame_end, + output_format: metadata.render_settings?.output_format || prev.output_format, + })); + } + } catch (err) { + // Metadata not ready yet, continue polling (only if 404/not found) + if (err.message.includes('404') || err.message.includes('not found')) { + // Continue polling via interval + } else { + setMetadataStatus('error'); + } + } + }; + + const interval = setInterval(pollMetadata, 2000); + + // Set timeout to stop polling after 60 seconds + timeoutId = setTimeout(() => { + clearInterval(interval); + if (metadataStatus === 'extracting') { + setMetadataStatus('error'); + // Cancel temp job on timeout + jobs.cancel(currentJobId).catch(() => {}); + } + }, 60000); + + return () => { + clearInterval(interval); + if (timeoutId) clearTimeout(timeoutId); + // Cleanup: cancel temp job if component unmounts during extraction + if (currentJobId && metadataStatus === 'extracting') { + jobs.cancel(currentJobId).catch(() => {}); + } + }; + }, [currentJobId, metadataStatus]); + + const handleFileChange = async (e) => { + const selectedFile = e.target.files[0]; + if (!selectedFile) { + setFile(null); + return; + } + + setFile(selectedFile); + setMetadataStatus(null); + setMetadata(null); + setCurrentJobId(null); + + // If it's a blend file, create a temporary job to extract metadata + if (selectedFile.name.toLowerCase().endsWith('.blend')) { + try { + // Create a temporary job for metadata extraction + const tempJob = await jobs.create({ + name: 'Metadata Extraction', + frame_start: 1, + frame_end: 10, + output_format: 'PNG', + allow_parallel_runners: true, + }); + + setCurrentJobId(tempJob.id); + setMetadataStatus('extracting'); + + // Upload file to trigger metadata extraction + await jobs.uploadFile(tempJob.id, selectedFile); + } catch (err) { + console.error('Failed to start metadata extraction:', err); + setMetadataStatus('error'); + } + } + }; const handleSubmit = async (e) => { e.preventDefault(); @@ -27,7 +134,16 @@ export default function JobSubmission({ onSuccess }) { throw new Error('Invalid frame range'); } - // Create job + // If we have a temporary job for metadata extraction, cancel it + if (currentJobId) { + try { + await jobs.cancel(currentJobId); + } catch (err) { + // Ignore errors when canceling temp job + } + } + + // Create actual job const job = await jobs.create({ name: formData.name, frame_start: parseInt(formData.frame_start), @@ -48,6 +164,9 @@ export default function JobSubmission({ onSuccess }) { allow_parallel_runners: true, }); setFile(null); + setMetadata(null); + setMetadataStatus(null); + setCurrentJobId(null); e.target.reset(); if (onSuccess) { @@ -150,10 +269,37 @@ export default function JobSubmission({ onSuccess }) { setFile(e.target.files[0])} + onChange={handleFileChange} required className="w-full px-4 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-purple-600 focus:border-transparent file:mr-4 file:py-2 file:px-4 file:rounded-lg file:border-0 file:text-sm file:font-semibold file:bg-purple-50 file:text-purple-700 hover:file:bg-purple-100" /> + {metadataStatus === 'extracting' && ( +
+ Extracting metadata from blend file... +
+ )} + {metadataStatus === 'completed' && metadata && ( +
+
Metadata extracted successfully!
+
+
Frames: {metadata.frame_start} - {metadata.frame_end}
+
Resolution: {metadata.render_settings?.resolution_x} x {metadata.render_settings?.resolution_y}
+
Engine: {metadata.render_settings?.engine}
+
Samples: {metadata.render_settings?.samples}
+
Form fields have been auto-populated. You can adjust them if needed.
+ {(formData.frame_start < metadata.frame_start || formData.frame_end > metadata.frame_end) && ( +
+ Warning: Your frame range ({formData.frame_start}-{formData.frame_end}) exceeds the blend file's range ({metadata.frame_start}-{metadata.frame_end}). This may cause errors. +
+ )} +
+
+ )} + {metadataStatus === 'error' && ( +
+ Could not extract metadata. Please fill in the form manually. +
+ )}