Files
jiggablend/internal/api/jobs.go

2650 lines
80 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package api
import (
"archive/tar"
"bufio"
"bytes"
"compress/gzip"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
authpkg "jiggablend/internal/auth"
"jiggablend/pkg/types"
"github.com/go-chi/chi/v5"
)
// isAdminUser checks if the current user is an admin
func isAdminUser(r *http.Request) bool {
return authpkg.IsAdmin(r.Context())
}
// handleCreateJob creates a new job
func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
var req types.CreateJobRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
s.respondError(w, http.StatusBadRequest, "Invalid request body")
return
}
// Validate job type - only render jobs are supported now
if req.JobType != types.JobTypeRender {
s.respondError(w, http.StatusBadRequest, "Invalid job_type: only 'render' jobs are supported")
return
}
if req.Name == "" {
s.respondError(w, http.StatusBadRequest, "Job name is required")
return
}
// Validate render job requirements
if req.JobType == types.JobTypeRender {
if req.FrameStart == nil || req.FrameEnd == nil {
s.respondError(w, http.StatusBadRequest, "frame_start and frame_end are required for render jobs")
return
}
if *req.FrameStart < 0 || *req.FrameEnd < *req.FrameStart {
s.respondError(w, http.StatusBadRequest, "Invalid frame range")
return
}
// Validate frame range limits (prevent abuse)
const maxFrameRange = 10000
if *req.FrameEnd-*req.FrameStart+1 > maxFrameRange {
s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Frame range too large. Maximum allowed: %d frames", maxFrameRange))
return
}
if req.OutputFormat == nil || *req.OutputFormat == "" {
defaultFormat := "PNG"
req.OutputFormat = &defaultFormat
}
}
// Default allow_parallel_runners to true for render jobs if not provided
var allowParallelRunners *bool
if req.JobType == types.JobTypeRender {
allowParallelRunners = new(bool)
*allowParallelRunners = true
if req.AllowParallelRunners != nil {
*allowParallelRunners = *req.AllowParallelRunners
}
}
// Set job timeout to 24 hours (86400 seconds)
jobTimeout := 86400
// Store render settings in blend_metadata if provided
var blendMetadataJSON *string
if req.RenderSettings != nil {
metadata := types.BlendMetadata{
FrameStart: *req.FrameStart,
FrameEnd: *req.FrameEnd,
RenderSettings: *req.RenderSettings,
}
metadataBytes, err := json.Marshal(metadata)
if err == nil {
metadataStr := string(metadataBytes)
blendMetadataJSON = &metadataStr
}
}
log.Printf("Creating render job with output_format: '%s' (from user selection)", *req.OutputFormat)
var jobID int64
err = s.db.QueryRow(
`INSERT INTO jobs (user_id, job_type, name, status, progress, frame_start, frame_end, output_format, allow_parallel_runners, timeout_seconds, blend_metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
RETURNING id`,
userID, req.JobType, req.Name, types.JobStatusPending, 0.0, *req.FrameStart, *req.FrameEnd, *req.OutputFormat, allowParallelRunners, jobTimeout, blendMetadataJSON,
).Scan(&jobID)
if err == nil {
log.Printf("Created render job %d with output_format: '%s'", jobID, *req.OutputFormat)
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create job: %v", err))
return
}
// If upload session ID is provided, move the context archive from temp to job directory
if req.UploadSessionID != nil && *req.UploadSessionID != "" {
log.Printf("Processing upload session for job %d: %s", jobID, *req.UploadSessionID)
// Session ID is the full temp directory path
tempDir := *req.UploadSessionID
tempContextPath := filepath.Join(tempDir, "context.tar.gz")
if _, err := os.Stat(tempContextPath); err == nil {
log.Printf("Found context archive at %s, moving to job %d directory", tempContextPath, jobID)
// Move context to job directory
jobPath := s.storage.JobPath(jobID)
if err := os.MkdirAll(jobPath, 0755); err != nil {
log.Printf("ERROR: Failed to create job directory for job %d: %v", jobID, err)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create job directory: %v", err))
return
}
jobContextPath := filepath.Join(jobPath, "context.tar.gz")
// Copy file instead of rename (works across filesystems)
srcFile, err := os.Open(tempContextPath)
if err != nil {
log.Printf("ERROR: Failed to open source context archive %s: %v", tempContextPath, err)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to open context archive: %v", err))
return
}
defer srcFile.Close()
dstFile, err := os.Create(jobContextPath)
if err != nil {
log.Printf("ERROR: Failed to create destination context archive %s: %v", jobContextPath, err)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create context archive: %v", err))
return
}
defer dstFile.Close()
_, err = io.Copy(dstFile, srcFile)
if err != nil {
dstFile.Close()
os.Remove(jobContextPath) // Clean up partial file
log.Printf("ERROR: Failed to copy context archive from %s to %s: %v", tempContextPath, jobContextPath, err)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to copy context archive: %v", err))
return
}
// Close files before deleting source
srcFile.Close()
if err := dstFile.Close(); err != nil {
log.Printf("ERROR: Failed to close destination file: %v", err)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to finalize context archive: %v", err))
return
}
// Delete source file after successful copy
if err := os.Remove(tempContextPath); err != nil {
log.Printf("Warning: Failed to remove source context archive %s: %v", tempContextPath, err)
// Don't fail the operation if cleanup fails
}
log.Printf("Successfully copied context archive to %s", jobContextPath)
// Record context archive in database
contextInfo, err := os.Stat(jobContextPath)
if err != nil {
log.Printf("ERROR: Failed to stat context archive after move: %v", err)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify context archive: %v", err))
return
}
var fileID int64
err = s.db.QueryRow(
`INSERT INTO job_files (job_id, file_type, file_path, file_name, file_size)
VALUES (?, ?, ?, ?, ?)
RETURNING id`,
jobID, types.JobFileTypeInput, jobContextPath, filepath.Base(jobContextPath), contextInfo.Size(),
).Scan(&fileID)
if err != nil {
log.Printf("ERROR: Failed to record context archive in database for job %d: %v", jobID, err)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to record context archive: %v", err))
return
}
log.Printf("Successfully recorded context archive in database for job %d (file ID: %d, size: %d bytes)", jobID, fileID, contextInfo.Size())
// Clean up temp directory
if err := os.RemoveAll(tempDir); err != nil {
log.Printf("Warning: Failed to clean up temp directory %s: %v", tempDir, err)
}
} else {
log.Printf("ERROR: Context archive not found at %s for session %s: %v", tempContextPath, *req.UploadSessionID, err)
s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Context archive not found for upload session. Please upload the file again."))
return
}
} else {
log.Printf("Warning: No upload session ID provided for job %d - job created without input files", jobID)
}
// Only create render tasks for render jobs
if req.JobType == types.JobTypeRender {
// Determine task timeout based on output format
taskTimeout := 300 // Default: 5 minutes for frame rendering
if *req.OutputFormat == "EXR_264_MP4" || *req.OutputFormat == "EXR_AV1_MP4" {
// For MP4, we'll create frame tasks with 5 min timeout
// Video generation tasks will be created later with 24h timeout
taskTimeout = 300
}
// Create tasks for the job
// If allow_parallel_runners is false, create a single task for all frames
// Otherwise, create one task per frame for parallel processing
if allowParallelRunners != nil && !*allowParallelRunners {
// Single task for entire frame range
_, err = s.db.Exec(
`INSERT INTO tasks (job_id, frame_start, frame_end, task_type, status, timeout_seconds, max_retries)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
jobID, *req.FrameStart, *req.FrameEnd, types.TaskTypeRender, types.TaskStatusPending, taskTimeout, 3,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create task: %v", err))
return
}
log.Printf("Created 1 render task for job %d (frames %d-%d, single runner)", jobID, *req.FrameStart, *req.FrameEnd)
} else {
// One task per frame for parallel processing
for frame := *req.FrameStart; frame <= *req.FrameEnd; frame++ {
_, err = s.db.Exec(
`INSERT INTO tasks (job_id, frame_start, frame_end, task_type, status, timeout_seconds, max_retries)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
jobID, frame, frame, types.TaskTypeRender, types.TaskStatusPending, taskTimeout, 3,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create tasks: %v", err))
return
}
}
log.Printf("Created %d render tasks for job %d (frames %d-%d, parallel)", *req.FrameEnd-*req.FrameStart+1, jobID, *req.FrameStart, *req.FrameEnd)
}
// Update job status (should be pending since tasks are pending)
s.updateJobStatusFromTasks(jobID)
}
// Build response job object
job := types.Job{
ID: jobID,
UserID: userID,
JobType: req.JobType,
Name: req.Name,
Status: types.JobStatusPending,
Progress: 0.0,
TimeoutSeconds: jobTimeout,
CreatedAt: time.Now(),
}
if req.JobType == types.JobTypeRender {
job.FrameStart = req.FrameStart
job.FrameEnd = req.FrameEnd
job.OutputFormat = req.OutputFormat
job.AllowParallelRunners = allowParallelRunners
}
// Immediately try to distribute tasks to connected runners
go s.distributeTasksToRunners()
s.respondJSON(w, http.StatusCreated, job)
}
// handleListJobs lists jobs for the current user
func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
// Query all jobs for the user
query := `SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format,
allow_parallel_runners, timeout_seconds, blend_metadata, created_at, started_at, completed_at, error_message
FROM jobs WHERE user_id = ? ORDER BY created_at DESC`
rows, err := s.db.Query(query, userID)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query jobs: %v", err))
return
}
defer rows.Close()
jobs := []types.Job{}
for rows.Next() {
var job types.Job
var jobType string
var startedAt, completedAt sql.NullTime
var blendMetadataJSON sql.NullString
var errorMessage sql.NullString
var frameStart, frameEnd sql.NullInt64
var outputFormat sql.NullString
var allowParallelRunners sql.NullBool
err := rows.Scan(
&job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress,
&frameStart, &frameEnd, &outputFormat, &allowParallelRunners, &job.TimeoutSeconds,
&blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &errorMessage,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan job: %v", err))
return
}
job.JobType = types.JobType(jobType)
if frameStart.Valid {
fs := int(frameStart.Int64)
job.FrameStart = &fs
}
if frameEnd.Valid {
fe := int(frameEnd.Int64)
job.FrameEnd = &fe
}
if outputFormat.Valid {
job.OutputFormat = &outputFormat.String
}
if allowParallelRunners.Valid {
job.AllowParallelRunners = &allowParallelRunners.Bool
}
if startedAt.Valid {
job.StartedAt = &startedAt.Time
}
if completedAt.Valid {
job.CompletedAt = &completedAt.Time
}
if blendMetadataJSON.Valid && blendMetadataJSON.String != "" {
var metadata types.BlendMetadata
if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err == nil {
job.BlendMetadata = &metadata
}
}
if errorMessage.Valid {
job.ErrorMessage = errorMessage.String
}
jobs = append(jobs, job)
}
s.respondJSON(w, http.StatusOK, jobs)
}
// handleGetJob gets a specific job
func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
var job types.Job
var jobType string
var startedAt, completedAt sql.NullTime
var blendMetadataJSON sql.NullString
var errorMessage sql.NullString
var frameStart, frameEnd sql.NullInt64
var outputFormat sql.NullString
var allowParallelRunners sql.NullBool
// Allow admins to view any job, regular users can only view their own
isAdmin := isAdminUser(r)
var err2 error
if isAdmin {
err2 = s.db.QueryRow(
`SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format,
allow_parallel_runners, timeout_seconds, blend_metadata, created_at, started_at, completed_at, error_message
FROM jobs WHERE id = ?`,
jobID,
).Scan(
&job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress,
&frameStart, &frameEnd, &outputFormat, &allowParallelRunners, &job.TimeoutSeconds,
&blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &errorMessage,
)
} else {
err2 = s.db.QueryRow(
`SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format,
allow_parallel_runners, timeout_seconds, blend_metadata, created_at, started_at, completed_at, error_message
FROM jobs WHERE id = ? AND user_id = ?`,
jobID, userID,
).Scan(
&job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress,
&frameStart, &frameEnd, &outputFormat, &allowParallelRunners, &job.TimeoutSeconds,
&blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &errorMessage,
)
}
if err2 == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err2 != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err2))
return
}
job.JobType = types.JobType(jobType)
if frameStart.Valid {
fs := int(frameStart.Int64)
job.FrameStart = &fs
}
if frameEnd.Valid {
fe := int(frameEnd.Int64)
job.FrameEnd = &fe
}
if outputFormat.Valid {
job.OutputFormat = &outputFormat.String
}
if allowParallelRunners.Valid {
job.AllowParallelRunners = &allowParallelRunners.Bool
}
if startedAt.Valid {
job.StartedAt = &startedAt.Time
}
if completedAt.Valid {
job.CompletedAt = &completedAt.Time
}
if blendMetadataJSON.Valid && blendMetadataJSON.String != "" {
var metadata types.BlendMetadata
if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err == nil {
job.BlendMetadata = &metadata
}
}
if errorMessage.Valid {
job.ErrorMessage = errorMessage.String
}
s.respondJSON(w, http.StatusOK, job)
}
// handleCancelJob cancels a job
func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Check if this is a metadata extraction job - if so, don't cancel running metadata tasks
var jobType string
var jobStatus string
err = s.db.QueryRow("SELECT job_type, status FROM jobs WHERE id = ? AND user_id = ?", jobID, userID).Scan(&jobType, &jobStatus)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
// Don't allow cancelling already completed or cancelled jobs
if jobStatus == string(types.JobStatusCompleted) || jobStatus == string(types.JobStatusCancelled) {
s.respondJSON(w, http.StatusOK, map[string]string{"message": "Job already " + jobStatus})
return
}
result, err := s.db.Exec(
`UPDATE jobs SET status = ? WHERE id = ? AND user_id = ?`,
types.JobStatusCancelled, jobID, userID,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to cancel job: %v", err))
return
}
rowsAffected, _ := result.RowsAffected()
if rowsAffected == 0 {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
log.Printf("Cancelling job %d (type: %s)", jobID, jobType)
// Cancel all pending tasks
_, err = s.db.Exec(
`UPDATE tasks SET status = ? WHERE job_id = ? AND status = ?`,
types.TaskStatusFailed, jobID, types.TaskStatusPending,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to cancel tasks: %v", err))
return
}
s.respondJSON(w, http.StatusOK, map[string]string{"message": "Job cancelled"})
}
// handleDeleteJob permanently deletes a job and all its associated data
func (s *Server) handleDeleteJob(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Verify job belongs to user (unless admin) and check status
isAdmin := isAdminUser(r)
var jobUserID int64
var jobStatus string
if isAdmin {
err = s.db.QueryRow("SELECT user_id, status FROM jobs WHERE id = ?", jobID).Scan(&jobUserID, &jobStatus)
} else {
// Non-admin users can only delete their own jobs
err = s.db.QueryRow("SELECT user_id, status FROM jobs WHERE id = ? AND user_id = ?", jobID, userID).Scan(&jobUserID, &jobStatus)
}
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if !isAdmin && jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
// Prevent deletion of jobs that are still cancellable (pending or running)
if jobStatus == string(types.JobStatusPending) || jobStatus == string(types.JobStatusRunning) {
s.respondError(w, http.StatusBadRequest, "Cannot delete a job that is pending or running. Please cancel it first.")
return
}
// Delete in transaction to ensure consistency
tx, err := s.db.Begin()
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to start transaction: %v", err))
return
}
defer tx.Rollback()
// Delete task logs
_, err = tx.Exec(`DELETE FROM task_logs WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID)
if err != nil {
tx.Rollback()
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete task logs: %v", err))
return
}
// Delete task steps
_, err = tx.Exec(`DELETE FROM task_steps WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID)
if err != nil {
tx.Rollback()
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete task steps: %v", err))
return
}
// Delete tasks
_, err = tx.Exec("DELETE FROM tasks WHERE job_id = ?", jobID)
if err != nil {
tx.Rollback()
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete tasks: %v", err))
return
}
// Delete job files
_, err = tx.Exec("DELETE FROM job_files WHERE job_id = ?", jobID)
if err != nil {
tx.Rollback()
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete job files: %v", err))
return
}
// Delete the job
_, err = tx.Exec("DELETE FROM jobs WHERE id = ?", jobID)
if err != nil {
tx.Rollback()
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete job: %v", err))
return
}
// Commit transaction
if err = tx.Commit(); err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to commit transaction: %v", err))
return
}
// Delete physical files
if err := s.storage.DeleteJobFiles(jobID); err != nil {
log.Printf("Warning: Failed to delete job files for job %d: %v", jobID, err)
// Don't fail the request if file deletion fails - the database records are already deleted
}
log.Printf("Deleted job %d (user: %d, admin: %v)", jobID, jobUserID, isAdmin)
s.respondJSON(w, http.StatusOK, map[string]string{"message": "Job deleted"})
}
// cleanupOldRenderJobs periodically deletes render jobs older than 1 month
func (s *Server) cleanupOldRenderJobs() {
// Run cleanup every hour
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
// Run once immediately on startup
s.cleanupOldRenderJobsOnce()
for range ticker.C {
s.cleanupOldRenderJobsOnce()
}
}
// cleanupOldRenderJobsOnce finds and deletes render jobs older than 1 month that are completed, failed, or cancelled
func (s *Server) cleanupOldRenderJobsOnce() {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic in cleanupOldRenderJobs: %v", r)
}
}()
// Find render jobs older than 1 month that are in a final state (completed, failed, or cancelled)
// Don't delete running or pending jobs
rows, err := s.db.Query(
`SELECT id FROM jobs
WHERE job_type = ?
AND status IN (?, ?, ?)
AND created_at < CURRENT_TIMESTAMP - INTERVAL '1 month'`,
types.JobTypeRender,
types.JobStatusCompleted,
types.JobStatusFailed,
types.JobStatusCancelled,
)
if err != nil {
log.Printf("Failed to query old render jobs: %v", err)
return
}
defer rows.Close()
var jobIDs []int64
for rows.Next() {
var jobID int64
if err := rows.Scan(&jobID); err == nil {
jobIDs = append(jobIDs, jobID)
}
}
rows.Close()
if len(jobIDs) == 0 {
return
}
log.Printf("Cleaning up %d old render jobs", len(jobIDs))
// Delete each job
for _, jobID := range jobIDs {
// Delete in transaction to ensure consistency
tx, err := s.db.Begin()
if err != nil {
log.Printf("Failed to start transaction for job %d: %v", jobID, err)
continue
}
// Delete task logs
_, err = tx.Exec(`DELETE FROM task_logs WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete task logs for job %d: %v", jobID, err)
continue
}
// Delete task steps
_, err = tx.Exec(`DELETE FROM task_steps WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete task steps for job %d: %v", jobID, err)
continue
}
// Delete tasks
_, err = tx.Exec("DELETE FROM tasks WHERE job_id = ?", jobID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete tasks for job %d: %v", jobID, err)
continue
}
// Delete job files
_, err = tx.Exec("DELETE FROM job_files WHERE job_id = ?", jobID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete job files for job %d: %v", jobID, err)
continue
}
// Delete the job
_, err = tx.Exec("DELETE FROM jobs WHERE id = ?", jobID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete job %d: %v", jobID, err)
continue
}
// Commit transaction
if err = tx.Commit(); err != nil {
log.Printf("Failed to commit transaction for job %d: %v", jobID, err)
continue
}
// Delete physical files (best effort, don't fail if this errors)
if err := s.storage.DeleteJobFiles(jobID); err != nil {
log.Printf("Warning: Failed to delete files for render job %d: %v", jobID, err)
}
}
log.Printf("Cleaned up %d old render jobs", len(jobIDs))
}
// handleUploadJobFile handles file upload for a job
func (s *Server) handleUploadJobFile(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Verify job belongs to user
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
// Parse multipart form with large limit for big files
// Note: For very large files, this will use temporary files on disk
err = r.ParseMultipartForm(20 << 30) // 20 GB (for large ZIP files and blend files)
if err != nil {
log.Printf("Error parsing multipart form for job %d: %v", jobID, err)
s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Failed to parse form: %v", err))
return
}
file, header, err := r.FormFile("file")
if err != nil {
log.Printf("Error getting file from form for job %d: %v", jobID, err)
s.respondError(w, http.StatusBadRequest, fmt.Sprintf("No file provided: %v", err))
return
}
defer file.Close()
log.Printf("Uploading file '%s' (size: %d bytes) for job %d", header.Filename, header.Size, jobID)
jobPath := s.storage.JobPath(jobID)
if err := os.MkdirAll(jobPath, 0755); err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create job directory: %v", err))
return
}
// Create temporary directory for processing upload
tmpDir, err := os.MkdirTemp("", fmt.Sprintf("fuego-upload-%d-*", jobID))
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create temporary directory: %v", err))
return
}
defer os.RemoveAll(tmpDir)
var fileID int64
var mainBlendFile string
var extractedFiles []string
// Check if this is a ZIP file
if strings.HasSuffix(strings.ToLower(header.Filename), ".zip") {
log.Printf("Processing ZIP file '%s' for job %d", header.Filename, jobID)
// Save ZIP to temporary directory
zipPath := filepath.Join(tmpDir, header.Filename)
log.Printf("Creating ZIP file at: %s", zipPath)
zipFile, err := os.Create(zipPath)
if err != nil {
log.Printf("ERROR: Failed to create ZIP file for job %d: %v", jobID, err)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create ZIP file: %v", err))
return
}
log.Printf("Copying %d bytes to ZIP file for job %d...", header.Size, jobID)
copied, err := io.Copy(zipFile, file)
zipFile.Close()
if err != nil {
log.Printf("ERROR: Failed to save ZIP file for job %d (copied %d bytes): %v", jobID, copied, err)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to save ZIP file: %v", err))
return
}
log.Printf("Successfully copied %d bytes to ZIP file for job %d", copied, jobID)
// Extract ZIP file to temporary directory
log.Printf("Extracting ZIP file for job %d...", jobID)
extractedFiles, err = s.storage.ExtractZip(zipPath, tmpDir)
if err != nil {
log.Printf("ERROR: Failed to extract ZIP file for job %d: %v", jobID, err)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to extract ZIP file: %v", err))
return
}
log.Printf("Successfully extracted %d files from ZIP for job %d", len(extractedFiles), jobID)
// Find main blend file (check for user selection first, then auto-detect)
mainBlendParam := r.FormValue("main_blend_file")
if mainBlendParam != "" {
// User specified main blend file
mainBlendFile = filepath.Join(tmpDir, mainBlendParam)
if _, err := os.Stat(mainBlendFile); err != nil {
s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Specified main blend file not found: %s", mainBlendParam))
return
}
} else {
// Auto-detect: find blend files in root directory
blendFiles := []string{}
err := filepath.Walk(tmpDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// Only check files in root directory (not subdirectories)
relPath, _ := filepath.Rel(tmpDir, path)
if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".blend") {
// Check if it's in root (no path separators)
if !strings.Contains(relPath, string(filepath.Separator)) {
blendFiles = append(blendFiles, path)
}
}
return nil
})
if err == nil && len(blendFiles) == 1 {
// Only one blend file in root - use it
mainBlendFile = blendFiles[0]
} else if len(blendFiles) > 1 {
// Multiple blend files - need user to specify
// Return list of blend files for user to choose
blendFileNames := []string{}
for _, f := range blendFiles {
rel, _ := filepath.Rel(tmpDir, f)
blendFileNames = append(blendFileNames, rel)
}
s.respondJSON(w, http.StatusOK, map[string]interface{}{
"zip_extracted": true,
"blend_files": blendFileNames,
"message": "Multiple blend files found. Please specify the main blend file.",
})
return
}
}
} else {
// Regular file upload (not ZIP) - save to temporary directory
filePath := filepath.Join(tmpDir, header.Filename)
outFile, err := os.Create(filePath)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create file: %v", err))
return
}
// Get a fresh file reader (FormFile returns a new reader each time)
fileReader, _, err := r.FormFile("file")
if err != nil {
outFile.Close()
s.respondError(w, http.StatusBadRequest, fmt.Sprintf("No file provided: %v", err))
return
}
if _, err := io.Copy(outFile, fileReader); err != nil {
fileReader.Close()
outFile.Close()
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to save file: %v", err))
return
}
fileReader.Close()
outFile.Close()
if strings.HasSuffix(strings.ToLower(header.Filename), ".blend") {
mainBlendFile = filePath
}
}
// Create context archive from temporary directory - this is the primary artifact
// Exclude the original uploaded ZIP file (but keep blend files as they're needed for rendering)
log.Printf("Creating context archive for job %d...", jobID)
var excludeFiles []string
if strings.HasSuffix(strings.ToLower(header.Filename), ".zip") {
excludeFiles = append(excludeFiles, header.Filename)
}
contextPath, err := s.storage.CreateJobContextFromDir(tmpDir, jobID, excludeFiles...)
if err != nil {
log.Printf("ERROR: Failed to create context archive for job %d: %v", jobID, err)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create context archive: %v", err))
return
}
log.Printf("Successfully created context archive for job %d at %s", jobID, contextPath)
// Record context archive in database
contextInfo, err := os.Stat(contextPath)
if err != nil {
log.Printf("ERROR: Failed to stat context archive for job %d: %v", jobID, err)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to stat context archive: %v", err))
return
}
err = s.db.QueryRow(
`INSERT INTO job_files (job_id, file_type, file_path, file_name, file_size)
VALUES (?, ?, ?, ?, ?)
RETURNING id`,
jobID, types.JobFileTypeInput, contextPath, filepath.Base(contextPath), contextInfo.Size(),
).Scan(&fileID)
if err != nil {
log.Printf("ERROR: Failed to record context archive in database for job %d: %v", jobID, err)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to record context archive: %v", err))
return
}
log.Printf("Context archive recorded in database with ID %d for job %d", fileID, jobID)
// Extract metadata directly from the context archive
log.Printf("Extracting metadata for job %d...", jobID)
metadata, err := s.extractMetadataFromContext(jobID)
if err != nil {
log.Printf("Warning: Failed to extract metadata for job %d: %v", jobID, err)
// Don't fail the upload if metadata extraction fails - job can still proceed
} else {
// Update job with metadata
metadataJSON, err := json.Marshal(metadata)
if err == nil {
_, err = s.db.Exec(
`UPDATE jobs SET blend_metadata = ? WHERE id = ?`,
string(metadataJSON), jobID,
)
if err != nil {
log.Printf("Warning: Failed to update job metadata in database: %v", err)
} else {
log.Printf("Successfully extracted and stored metadata for job %d", jobID)
}
} else {
log.Printf("Warning: Failed to marshal metadata: %v", err)
}
}
response := map[string]interface{}{
"id": fileID,
"file_name": header.Filename,
"file_size": header.Size,
"context_archive": filepath.Base(contextPath),
}
if strings.HasSuffix(strings.ToLower(header.Filename), ".zip") {
response["zip_extracted"] = true
response["extracted_files_count"] = len(extractedFiles)
if mainBlendFile != "" {
// Get relative path from temp dir
relPath, _ := filepath.Rel(tmpDir, mainBlendFile)
response["main_blend_file"] = relPath
}
} else if mainBlendFile != "" {
relPath, _ := filepath.Rel(tmpDir, mainBlendFile)
response["main_blend_file"] = relPath
}
s.respondJSON(w, http.StatusCreated, response)
}
// handleUploadFileForJobCreation handles file upload before job creation
// Creates context archive and extracts metadata, returns metadata and upload session ID
func (s *Server) handleUploadFileForJobCreation(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
// Parse multipart form with large limit for big files
err = r.ParseMultipartForm(20 << 30) // 20 GB
if err != nil {
log.Printf("Error parsing multipart form: %v", err)
s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Failed to parse form: %v", err))
return
}
file, header, err := r.FormFile("file")
if err != nil {
log.Printf("Error getting file from form: %v", err)
s.respondError(w, http.StatusBadRequest, fmt.Sprintf("No file provided: %v", err))
return
}
defer file.Close()
log.Printf("Uploading file '%s' (size: %d bytes) for user %d (pre-job creation)", header.Filename, header.Size, userID)
// Create temporary directory for processing upload (user-specific)
tmpDir, err := os.MkdirTemp("", fmt.Sprintf("fuego-upload-user-%d-*", userID))
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create temporary directory: %v", err))
return
}
// Note: We'll clean this up after job creation or after timeout
var mainBlendFile string
var extractedFiles []string
// Check if this is a ZIP file
if strings.HasSuffix(strings.ToLower(header.Filename), ".zip") {
log.Printf("Processing ZIP file '%s'", header.Filename)
// Save ZIP to temporary directory
zipPath := filepath.Join(tmpDir, header.Filename)
zipFile, err := os.Create(zipPath)
if err != nil {
os.RemoveAll(tmpDir)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create ZIP file: %v", err))
return
}
copied, err := io.Copy(zipFile, file)
zipFile.Close()
if err != nil {
os.RemoveAll(tmpDir)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to save ZIP file: %v", err))
return
}
log.Printf("Successfully copied %d bytes to ZIP file", copied)
// Extract ZIP file to temporary directory
extractedFiles, err = s.storage.ExtractZip(zipPath, tmpDir)
if err != nil {
os.RemoveAll(tmpDir)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to extract ZIP file: %v", err))
return
}
log.Printf("Successfully extracted %d files from ZIP", len(extractedFiles))
// Find main blend file
mainBlendParam := r.FormValue("main_blend_file")
if mainBlendParam != "" {
mainBlendFile = filepath.Join(tmpDir, mainBlendParam)
if _, err := os.Stat(mainBlendFile); err != nil {
os.RemoveAll(tmpDir)
s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Specified main blend file not found: %s", mainBlendParam))
return
}
} else {
// Auto-detect: find blend files in root directory
blendFiles := []string{}
err := filepath.Walk(tmpDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
relPath, _ := filepath.Rel(tmpDir, path)
if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".blend") {
if !strings.Contains(relPath, string(filepath.Separator)) {
blendFiles = append(blendFiles, path)
}
}
return nil
})
if err == nil && len(blendFiles) == 1 {
mainBlendFile = blendFiles[0]
} else if len(blendFiles) > 1 {
// Multiple blend files - return list for user to choose
blendFileNames := []string{}
for _, f := range blendFiles {
rel, _ := filepath.Rel(tmpDir, f)
blendFileNames = append(blendFileNames, rel)
}
os.RemoveAll(tmpDir)
s.respondJSON(w, http.StatusOK, map[string]interface{}{
"zip_extracted": true,
"blend_files": blendFileNames,
"message": "Multiple blend files found. Please specify the main blend file.",
})
return
}
}
} else {
// Regular file upload (not ZIP) - save to temporary directory
filePath := filepath.Join(tmpDir, header.Filename)
outFile, err := os.Create(filePath)
if err != nil {
os.RemoveAll(tmpDir)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create file: %v", err))
return
}
fileReader, _, err := r.FormFile("file")
if err != nil {
outFile.Close()
os.RemoveAll(tmpDir)
s.respondError(w, http.StatusBadRequest, fmt.Sprintf("No file provided: %v", err))
return
}
if _, err := io.Copy(outFile, fileReader); err != nil {
fileReader.Close()
outFile.Close()
os.RemoveAll(tmpDir)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to save file: %v", err))
return
}
fileReader.Close()
outFile.Close()
if strings.HasSuffix(strings.ToLower(header.Filename), ".blend") {
mainBlendFile = filePath
}
}
// Create context archive from temporary directory
log.Printf("Creating context archive from temporary directory...")
var excludeFiles []string
if strings.HasSuffix(strings.ToLower(header.Filename), ".zip") {
excludeFiles = append(excludeFiles, header.Filename)
}
// Create context in temp directory (we'll move it to job directory later)
contextPath := filepath.Join(tmpDir, "context.tar.gz")
contextPath, err = s.createContextFromDir(tmpDir, contextPath, excludeFiles...)
if err != nil {
os.RemoveAll(tmpDir)
log.Printf("ERROR: Failed to create context archive: %v", err)
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create context archive: %v", err))
return
}
log.Printf("Successfully created context archive at %s", contextPath)
// Extract metadata from context archive
log.Printf("Extracting metadata from context archive...")
metadata, err := s.extractMetadataFromTempContext(contextPath)
if err != nil {
log.Printf("Warning: Failed to extract metadata: %v", err)
// Continue anyway - user can fill in manually
metadata = nil
}
// Generate a session ID to track this upload
// Store the full temp directory path as session ID for easy lookup
sessionID := tmpDir
response := map[string]interface{}{
"session_id": sessionID, // Full temp directory path
"file_name": header.Filename,
"file_size": header.Size,
"context_archive": filepath.Base(contextPath),
}
if strings.HasSuffix(strings.ToLower(header.Filename), ".zip") {
response["zip_extracted"] = true
response["extracted_files_count"] = len(extractedFiles)
if mainBlendFile != "" {
relPath, _ := filepath.Rel(tmpDir, mainBlendFile)
response["main_blend_file"] = relPath
}
} else if mainBlendFile != "" {
relPath, _ := filepath.Rel(tmpDir, mainBlendFile)
response["main_blend_file"] = relPath
}
if metadata != nil {
response["metadata"] = metadata
response["metadata_extracted"] = true
} else {
response["metadata_extracted"] = false
}
s.respondJSON(w, http.StatusOK, response)
}
// extractMetadataFromTempContext extracts metadata from a context archive in a temporary location
func (s *Server) extractMetadataFromTempContext(contextPath string) (*types.BlendMetadata, error) {
// Create temporary directory for extraction
tmpDir, err := os.MkdirTemp("", "fuego-metadata-temp-*")
if err != nil {
return nil, fmt.Errorf("failed to create temporary directory: %w", err)
}
defer os.RemoveAll(tmpDir)
// Extract context archive
if err := s.extractTarGz(contextPath, tmpDir); err != nil {
return nil, fmt.Errorf("failed to extract context: %w", err)
}
// Find .blend file
blendFile := ""
err = filepath.Walk(tmpDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".blend") {
lower := strings.ToLower(info.Name())
idx := strings.LastIndex(lower, ".blend")
if idx != -1 {
suffix := lower[idx+len(".blend"):]
isSaveFile := false
if len(suffix) > 0 {
isSaveFile = true
for _, r := range suffix {
if r < '0' || r > '9' {
isSaveFile = false
break
}
}
}
if !isSaveFile {
blendFile = path
return filepath.SkipAll
}
}
}
return nil
})
if err != nil || blendFile == "" {
return nil, fmt.Errorf("no .blend file found in context")
}
// Use the same extraction script and process as extractMetadataFromContext
// (Copy the logic from extractMetadataFromContext but use tmpDir and blendFile)
return s.runBlenderMetadataExtraction(blendFile, tmpDir)
}
// runBlenderMetadataExtraction runs Blender to extract metadata from a blend file
func (s *Server) runBlenderMetadataExtraction(blendFile, workDir string) (*types.BlendMetadata, error) {
// Create Python script (same as in extractMetadataFromContext)
scriptPath := filepath.Join(workDir, "extract_metadata.py")
scriptContent := `import bpy
import json
import sys
try:
bpy.ops.file.make_paths_relative()
print("Made all file paths relative to blend file")
except Exception as e:
print(f"Warning: Could not make paths relative: {e}")
missing_files_info = {
"checked": False,
"has_missing": False,
"missing_files": [],
"missing_addons": []
}
try:
missing = []
for mod in bpy.context.preferences.addons:
if mod.module.endswith("_missing"):
missing.append(mod.module.rsplit("_", 1)[0])
missing_files_info["checked"] = True
if missing:
missing_files_info["has_missing"] = True
missing_files_info["missing_addons"] = missing
print("Missing add-ons required by this .blend:")
for name in missing:
print(" -", name)
else:
print("No missing add-ons detected file is headless-safe")
except Exception as e:
print(f"Warning: Could not check for missing addons: {e}")
missing_files_info["error"] = str(e)
scene = bpy.context.scene
frame_start = scene.frame_start
frame_end = scene.frame_end
animation_start = None
animation_end = None
for obj in scene.objects:
if obj.animation_data and obj.animation_data.action:
action = obj.animation_data.action
if action.fcurves:
for fcurve in action.fcurves:
if fcurve.keyframe_points:
for keyframe in fcurve.keyframe_points:
frame = int(keyframe.co[0])
if animation_start is None or frame < animation_start:
animation_start = frame
if animation_end is None or frame > animation_end:
animation_end = frame
if animation_start is not None and animation_end is not None:
if frame_start == frame_end or (animation_start < frame_start or animation_end > frame_end):
frame_start = animation_start
frame_end = animation_end
render = scene.render
resolution_x = render.resolution_x
resolution_y = render.resolution_y
engine = scene.render.engine.upper()
output_format = render.image_settings.file_format
engine_settings = {}
if engine == 'CYCLES':
cycles = scene.cycles
engine_settings = {
"samples": getattr(cycles, 'samples', 128),
"use_denoising": getattr(cycles, 'use_denoising', False),
"denoising_radius": getattr(cycles, 'denoising_radius', 0),
"denoising_strength": getattr(cycles, 'denoising_strength', 0.0),
"device": getattr(cycles, 'device', 'CPU'),
"use_adaptive_sampling": getattr(cycles, 'use_adaptive_sampling', False),
"adaptive_threshold": getattr(cycles, 'adaptive_threshold', 0.01) if getattr(cycles, 'use_adaptive_sampling', False) else 0.01,
"use_fast_gi": getattr(cycles, 'use_fast_gi', False),
"light_tree": getattr(cycles, 'use_light_tree', False),
"use_light_linking": getattr(cycles, 'use_light_linking', False),
"caustics_reflective": getattr(cycles, 'caustics_reflective', False),
"caustics_refractive": getattr(cycles, 'caustics_refractive', False),
"blur_glossy": getattr(cycles, 'blur_glossy', 0.0),
"max_bounces": getattr(cycles, 'max_bounces', 12),
"diffuse_bounces": getattr(cycles, 'diffuse_bounces', 4),
"glossy_bounces": getattr(cycles, 'glossy_bounces', 4),
"transmission_bounces": getattr(cycles, 'transmission_bounces', 12),
"volume_bounces": getattr(cycles, 'volume_bounces', 0),
"transparent_max_bounces": getattr(cycles, 'transparent_max_bounces', 8),
"film_transparent": getattr(cycles, 'film_transparent', False),
"use_layer_samples": getattr(cycles, 'use_layer_samples', False),
}
elif engine == 'EEVEE' or engine == 'EEVEE_NEXT':
eevee = scene.eevee
engine_settings = {
"taa_render_samples": getattr(eevee, 'taa_render_samples', 64),
"use_bloom": getattr(eevee, 'use_bloom', False),
"bloom_threshold": getattr(eevee, 'bloom_threshold', 0.8),
"bloom_intensity": getattr(eevee, 'bloom_intensity', 0.05),
"bloom_radius": getattr(eevee, 'bloom_radius', 6.5),
"use_ssr": getattr(eevee, 'use_ssr', True),
"use_ssr_refraction": getattr(eevee, 'use_ssr_refraction', False),
"ssr_quality": getattr(eevee, 'ssr_quality', 'MEDIUM'),
"use_ssao": getattr(eevee, 'use_ssao', True),
"ssao_quality": getattr(eevee, 'ssao_quality', 'MEDIUM'),
"ssao_distance": getattr(eevee, 'ssao_distance', 0.2),
"ssao_factor": getattr(eevee, 'ssao_factor', 1.0),
"use_soft_shadows": getattr(eevee, 'use_soft_shadows', True),
"use_shadow_high_bitdepth": getattr(eevee, 'use_shadow_high_bitdepth', True),
"use_volumetric": getattr(eevee, 'use_volumetric', False),
"volumetric_tile_size": getattr(eevee, 'volumetric_tile_size', '8'),
"volumetric_samples": getattr(eevee, 'volumetric_samples', 64),
"volumetric_start": getattr(eevee, 'volumetric_start', 0.0),
"volumetric_end": getattr(eevee, 'volumetric_end', 100.0),
"use_volumetric_lights": getattr(eevee, 'use_volumetric_lights', True),
"use_volumetric_shadows": getattr(eevee, 'use_volumetric_shadows', True),
"use_gtao": getattr(eevee, 'use_gtao', False),
"gtao_quality": getattr(eevee, 'gtao_quality', 'MEDIUM'),
"use_overscan": getattr(eevee, 'use_overscan', False),
}
else:
engine_settings = {
"samples": getattr(scene, 'samples', 128) if hasattr(scene, 'samples') else 128
}
camera_count = len([obj for obj in scene.objects if obj.type == 'CAMERA'])
object_count = len(scene.objects)
material_count = len(bpy.data.materials)
metadata = {
"frame_start": frame_start,
"frame_end": frame_end,
"render_settings": {
"resolution_x": resolution_x,
"resolution_y": resolution_y,
"output_format": output_format,
"engine": engine.lower(),
"engine_settings": engine_settings
},
"scene_info": {
"camera_count": camera_count,
"object_count": object_count,
"material_count": material_count
},
"missing_files_info": missing_files_info
}
print(json.dumps(metadata))
sys.stdout.flush()
`
if err := os.WriteFile(scriptPath, []byte(scriptContent), 0644); err != nil {
return nil, fmt.Errorf("failed to create extraction script: %w", err)
}
// Execute Blender
cmd := exec.Command("blender", "-b", blendFile, "--python", scriptPath)
cmd.Dir = workDir
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stdout pipe: %w", err)
}
stderrPipe, err := cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stderr pipe: %w", err)
}
var stdoutBuffer bytes.Buffer
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start blender: %w", err)
}
stdoutDone := make(chan bool)
go func() {
defer close(stdoutDone)
scanner := bufio.NewScanner(stdoutPipe)
for scanner.Scan() {
line := scanner.Text()
stdoutBuffer.WriteString(line)
stdoutBuffer.WriteString("\n")
}
}()
stderrDone := make(chan bool)
go func() {
defer close(stderrDone)
scanner := bufio.NewScanner(stderrPipe)
for scanner.Scan() {
_ = scanner.Text()
}
}()
err = cmd.Wait()
<-stdoutDone
<-stderrDone
if err != nil {
return nil, fmt.Errorf("blender metadata extraction failed: %w", err)
}
metadataJSON := strings.TrimSpace(stdoutBuffer.String())
jsonStart := strings.Index(metadataJSON, "{")
jsonEnd := strings.LastIndex(metadataJSON, "}")
if jsonStart == -1 || jsonEnd == -1 || jsonEnd <= jsonStart {
return nil, errors.New("failed to extract JSON from Blender output")
}
metadataJSON = metadataJSON[jsonStart : jsonEnd+1]
var metadata types.BlendMetadata
if err := json.Unmarshal([]byte(metadataJSON), &metadata); err != nil {
return nil, fmt.Errorf("failed to parse metadata JSON: %w", err)
}
log.Printf("Metadata extracted: frame_start=%d, frame_end=%d", metadata.FrameStart, metadata.FrameEnd)
return &metadata, nil
}
// createContextFromDir creates a context archive from a source directory to a specific destination path
func (s *Server) createContextFromDir(sourceDir, destPath string, excludeFiles ...string) (string, error) {
// Build set of files to exclude
excludeSet := make(map[string]bool)
for _, excludeFile := range excludeFiles {
excludePath := filepath.Clean(excludeFile)
excludeSet[excludePath] = true
excludeSet[filepath.ToSlash(excludePath)] = true
}
// Collect all files from source directory
var filesToInclude []string
err := filepath.Walk(sourceDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
// Skip Blender save files
lower := strings.ToLower(info.Name())
idx := strings.LastIndex(lower, ".blend")
if idx != -1 {
suffix := lower[idx+len(".blend"):]
if len(suffix) > 0 {
isSaveFile := true
for _, r := range suffix {
if r < '0' || r > '9' {
isSaveFile = false
break
}
}
if isSaveFile {
return nil
}
}
}
relPath, err := filepath.Rel(sourceDir, path)
if err != nil {
return err
}
cleanRelPath := filepath.Clean(relPath)
if strings.HasPrefix(cleanRelPath, "..") {
return fmt.Errorf("invalid file path: %s", relPath)
}
if excludeSet[cleanRelPath] || excludeSet[filepath.ToSlash(cleanRelPath)] {
return nil
}
filesToInclude = append(filesToInclude, path)
return nil
})
if err != nil {
return "", fmt.Errorf("failed to walk source directory: %w", err)
}
if len(filesToInclude) == 0 {
return "", fmt.Errorf("no files found to include in context archive")
}
// Collect relative paths to find common prefix
relPaths := make([]string, 0, len(filesToInclude))
for _, filePath := range filesToInclude {
relPath, err := filepath.Rel(sourceDir, filePath)
if err != nil {
return "", fmt.Errorf("failed to get relative path: %w", err)
}
relPaths = append(relPaths, relPath)
}
// Find and strip common leading directory
commonPrefix := ""
if len(relPaths) > 0 {
firstComponents := make([]string, 0, len(relPaths))
for _, path := range relPaths {
parts := strings.Split(filepath.ToSlash(path), "/")
if len(parts) > 0 && parts[0] != "" {
firstComponents = append(firstComponents, parts[0])
} else {
firstComponents = nil
break
}
}
if len(firstComponents) > 0 {
commonFirst := firstComponents[0]
allSame := true
for _, comp := range firstComponents {
if comp != commonFirst {
allSame = false
break
}
}
if allSame {
commonPrefix = commonFirst + "/"
}
}
}
// Validate single .blend file at root
blendFilesAtRoot := 0
for _, relPath := range relPaths {
tarPath := filepath.ToSlash(relPath)
if commonPrefix != "" && strings.HasPrefix(tarPath, commonPrefix) {
tarPath = strings.TrimPrefix(tarPath, commonPrefix)
}
if strings.HasSuffix(strings.ToLower(tarPath), ".blend") && !strings.Contains(tarPath, "/") {
blendFilesAtRoot++
}
}
if blendFilesAtRoot == 0 {
return "", fmt.Errorf("no .blend file found at root level in context archive")
}
if blendFilesAtRoot > 1 {
return "", fmt.Errorf("multiple .blend files found at root level in context archive (found %d, expected 1)", blendFilesAtRoot)
}
// Create the tar.gz file
contextFile, err := os.Create(destPath)
if err != nil {
return "", fmt.Errorf("failed to create context file: %w", err)
}
defer contextFile.Close()
gzWriter := gzip.NewWriter(contextFile)
defer gzWriter.Close()
tarWriter := tar.NewWriter(gzWriter)
defer tarWriter.Close()
// Add each file to the tar archive
for i, filePath := range filesToInclude {
file, err := os.Open(filePath)
if err != nil {
return "", fmt.Errorf("failed to open file: %w", err)
}
info, err := file.Stat()
if err != nil {
file.Close()
return "", fmt.Errorf("failed to stat file: %w", err)
}
relPath := relPaths[i]
tarPath := filepath.ToSlash(relPath)
if commonPrefix != "" && strings.HasPrefix(tarPath, commonPrefix) {
tarPath = strings.TrimPrefix(tarPath, commonPrefix)
}
header, err := tar.FileInfoHeader(info, "")
if err != nil {
file.Close()
return "", fmt.Errorf("failed to create tar header: %w", err)
}
header.Name = tarPath
if err := tarWriter.WriteHeader(header); err != nil {
file.Close()
return "", fmt.Errorf("failed to write tar header: %w", err)
}
if _, err := io.Copy(tarWriter, file); err != nil {
file.Close()
return "", fmt.Errorf("failed to write file to tar: %w", err)
}
file.Close()
}
if err := tarWriter.Close(); err != nil {
return "", fmt.Errorf("failed to close tar writer: %w", err)
}
if err := gzWriter.Close(); err != nil {
return "", fmt.Errorf("failed to close gzip writer: %w", err)
}
if err := contextFile.Close(); err != nil {
return "", fmt.Errorf("failed to close context file: %w", err)
}
return destPath, nil
}
// handleListJobFiles lists files for a job
func (s *Server) handleListJobFiles(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Verify job belongs to user (unless admin)
isAdmin := isAdminUser(r)
if !isAdmin {
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
} else {
// Admin: verify job exists
var exists bool
err = s.db.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists)
if err != nil || !exists {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
}
rows, err := s.db.Query(
`SELECT id, job_id, file_type, file_path, file_name, file_size, created_at
FROM job_files WHERE job_id = ? ORDER BY created_at DESC`,
jobID,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query files: %v", err))
return
}
defer rows.Close()
files := []types.JobFile{}
for rows.Next() {
var file types.JobFile
err := rows.Scan(
&file.ID, &file.JobID, &file.FileType, &file.FilePath,
&file.FileName, &file.FileSize, &file.CreatedAt,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan file: %v", err))
return
}
files = append(files, file)
}
s.respondJSON(w, http.StatusOK, files)
}
// handleListContextArchive lists files inside the context archive
func (s *Server) handleListContextArchive(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Verify job belongs to user (unless admin)
isAdmin := isAdminUser(r)
if !isAdmin {
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
}
// Get context archive path
contextPath := filepath.Join(s.storage.JobPath(jobID), "context.tar.gz")
if !s.storage.FileExists(contextPath) {
s.respondError(w, http.StatusNotFound, "Context archive not found")
return
}
// Read the tar.gz and list its contents
file, err := s.storage.GetFile(contextPath)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to open context archive: %v", err))
return
}
defer file.Close()
gzReader, err := gzip.NewReader(file)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to read context archive: %v", err))
return
}
defer gzReader.Close()
tarReader := tar.NewReader(gzReader)
type ArchiveFile struct {
Name string `json:"name"`
Size int64 `json:"size"`
Path string `json:"path"`
}
var archiveFiles []ArchiveFile
for {
header, err := tarReader.Next()
if err == io.EOF {
break
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to read archive: %v", err))
return
}
// Only include regular files (not directories)
if header.Typeflag == tar.TypeReg {
archiveFiles = append(archiveFiles, ArchiveFile{
Name: filepath.Base(header.Name),
Size: header.Size,
Path: header.Name,
})
}
}
s.respondJSON(w, http.StatusOK, archiveFiles)
}
// handleDownloadJobFile downloads a job file
func (s *Server) handleDownloadJobFile(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
fileID, err := parseID(r, "fileId")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Verify job belongs to user (unless admin)
isAdmin := isAdminUser(r)
if !isAdmin {
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
} else {
// Admin: verify job exists
var exists bool
err = s.db.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists)
if err != nil || !exists {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
}
// Get file info
var filePath, fileName string
err = s.db.QueryRow(
`SELECT file_path, file_name FROM job_files WHERE id = ? AND job_id = ?`,
fileID, jobID,
).Scan(&filePath, &fileName)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "File not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query file: %v", err))
return
}
// Open file
file, err := s.storage.GetFile(filePath)
if err != nil {
s.respondError(w, http.StatusNotFound, "File not found on disk")
return
}
defer file.Close()
// Determine content type based on file extension
contentType := "application/octet-stream"
disposition := "attachment"
fileNameLower := strings.ToLower(fileName)
switch {
case strings.HasSuffix(fileNameLower, ".png"):
contentType = "image/png"
disposition = "inline"
case strings.HasSuffix(fileNameLower, ".jpg") || strings.HasSuffix(fileNameLower, ".jpeg"):
contentType = "image/jpeg"
disposition = "inline"
case strings.HasSuffix(fileNameLower, ".gif"):
contentType = "image/gif"
disposition = "inline"
case strings.HasSuffix(fileNameLower, ".webp"):
contentType = "image/webp"
disposition = "inline"
case strings.HasSuffix(fileNameLower, ".bmp"):
contentType = "image/bmp"
disposition = "inline"
case strings.HasSuffix(fileNameLower, ".svg"):
contentType = "image/svg+xml"
disposition = "inline"
}
// Set headers
w.Header().Set("Content-Disposition", fmt.Sprintf("%s; filename=%s", disposition, fileName))
w.Header().Set("Content-Type", contentType)
// Stream file
io.Copy(w, file)
}
// handleStreamVideo streams MP4 video file with range support
func (s *Server) handleStreamVideo(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Verify job belongs to user (unless admin)
isAdmin := isAdminUser(r)
var jobUserID int64
var outputFormat string
if isAdmin {
err = s.db.QueryRow("SELECT user_id, output_format FROM jobs WHERE id = ?", jobID).Scan(&jobUserID, &outputFormat)
} else {
err = s.db.QueryRow("SELECT user_id, output_format FROM jobs WHERE id = ? AND user_id = ?", jobID, userID).Scan(&jobUserID, &outputFormat)
}
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err))
return
}
if !isAdmin && jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
// Find MP4 file
var filePath, fileName string
err = s.db.QueryRow(
`SELECT file_path, file_name FROM job_files
WHERE job_id = ? AND file_type = ? AND file_name LIKE '%.mp4'
ORDER BY created_at DESC LIMIT 1`,
jobID, types.JobFileTypeOutput,
).Scan(&filePath, &fileName)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Video file not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query file: %v", err))
return
}
// Open file
file, err := s.storage.GetFile(filePath)
if err != nil {
s.respondError(w, http.StatusNotFound, "File not found on disk")
return
}
defer file.Close()
// Get file info
fileInfo, err := file.Stat()
if err != nil {
s.respondError(w, http.StatusInternalServerError, "Failed to get file info")
return
}
fileSize := fileInfo.Size()
// Handle range requests for video seeking
rangeHeader := r.Header.Get("Range")
if rangeHeader != "" {
// Parse range header
var start, end int64
fmt.Sscanf(rangeHeader, "bytes=%d-%d", &start, &end)
if end == 0 {
end = fileSize - 1
}
// Seek to start position
file.Seek(start, 0)
// Set headers for partial content
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, fileSize))
w.Header().Set("Accept-Ranges", "bytes")
w.Header().Set("Content-Length", fmt.Sprintf("%d", end-start+1))
w.Header().Set("Content-Type", "video/mp4")
w.WriteHeader(http.StatusPartialContent)
// Copy partial content
io.CopyN(w, file, end-start+1)
} else {
// Full file
w.Header().Set("Content-Type", "video/mp4")
w.Header().Set("Content-Length", fmt.Sprintf("%d", fileSize))
w.Header().Set("Accept-Ranges", "bytes")
io.Copy(w, file)
}
}
// handleListJobTasks lists all tasks for a job
func (s *Server) handleListJobTasks(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Verify job belongs to user (unless admin)
isAdmin := isAdminUser(r)
if !isAdmin {
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
} else {
// Admin: verify job exists
var exists bool
err = s.db.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists)
if err != nil || !exists {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
}
rows, err := s.db.Query(
`SELECT id, job_id, runner_id, frame_start, frame_end, status, task_type,
current_step, retry_count, max_retries, output_path, created_at, started_at,
completed_at, error_message, timeout_seconds
FROM tasks WHERE job_id = ? ORDER BY frame_start ASC`,
jobID,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query tasks: %v", err))
return
}
defer rows.Close()
tasks := []types.Task{}
for rows.Next() {
var task types.Task
var runnerID sql.NullInt64
var startedAt, completedAt sql.NullTime
var timeoutSeconds sql.NullInt64
var errorMessage sql.NullString
var currentStep sql.NullString
var outputPath sql.NullString
err := rows.Scan(
&task.ID, &task.JobID, &runnerID, &task.FrameStart, &task.FrameEnd,
&task.Status, &task.TaskType, &currentStep, &task.RetryCount,
&task.MaxRetries, &outputPath, &task.CreatedAt, &startedAt,
&completedAt, &errorMessage, &timeoutSeconds,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan task: %v", err))
return
}
if runnerID.Valid {
task.RunnerID = &runnerID.Int64
}
if startedAt.Valid {
task.StartedAt = &startedAt.Time
}
if completedAt.Valid {
task.CompletedAt = &completedAt.Time
}
if timeoutSeconds.Valid {
timeout := int(timeoutSeconds.Int64)
task.TimeoutSeconds = &timeout
}
if errorMessage.Valid {
task.ErrorMessage = errorMessage.String
}
if currentStep.Valid {
task.CurrentStep = currentStep.String
}
if outputPath.Valid {
task.OutputPath = outputPath.String
}
tasks = append(tasks, task)
}
s.respondJSON(w, http.StatusOK, tasks)
}
// handleGetTaskLogs retrieves logs for a specific task
func (s *Server) handleGetTaskLogs(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
taskIDStr := chi.URLParam(r, "taskId")
taskID, err := strconv.ParseInt(taskIDStr, 10, 64)
if err != nil {
s.respondError(w, http.StatusBadRequest, "Invalid task ID")
return
}
// Verify job belongs to user (unless admin)
isAdmin := isAdminUser(r)
if !isAdmin {
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
} else {
// Admin: verify job exists
var exists bool
err = s.db.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists)
if err != nil || !exists {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
}
// Verify task belongs to job
var taskJobID int64
err = s.db.QueryRow("SELECT job_id FROM tasks WHERE id = ?", taskID).Scan(&taskJobID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Task not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err))
return
}
if taskJobID != jobID {
s.respondError(w, http.StatusBadRequest, "Task does not belong to this job")
return
}
// Get query parameters for filtering
stepName := r.URL.Query().Get("step_name")
logLevel := r.URL.Query().Get("log_level")
limitStr := r.URL.Query().Get("limit")
limit := 1000 // default
if limitStr != "" {
if l, err := strconv.Atoi(limitStr); err == nil && l > 0 {
limit = l
}
}
// Build query
query := `SELECT id, task_id, runner_id, log_level, message, step_name, created_at
FROM task_logs WHERE task_id = ?`
args := []interface{}{taskID}
if stepName != "" {
query += " AND step_name = ?"
args = append(args, stepName)
}
if logLevel != "" {
query += " AND log_level = ?"
args = append(args, logLevel)
}
query += " ORDER BY created_at ASC LIMIT ?"
args = append(args, limit)
rows, err := s.db.Query(query, args...)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query logs: %v", err))
return
}
defer rows.Close()
logs := []types.TaskLog{}
for rows.Next() {
var log types.TaskLog
var runnerID sql.NullInt64
err := rows.Scan(
&log.ID, &log.TaskID, &runnerID, &log.LogLevel, &log.Message,
&log.StepName, &log.CreatedAt,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan log: %v", err))
return
}
if runnerID.Valid {
log.RunnerID = &runnerID.Int64
}
logs = append(logs, log)
}
s.respondJSON(w, http.StatusOK, logs)
}
// handleGetTaskSteps retrieves step timeline for a specific task
func (s *Server) handleGetTaskSteps(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
taskIDStr := chi.URLParam(r, "taskId")
taskID, err := strconv.ParseInt(taskIDStr, 10, 64)
if err != nil {
s.respondError(w, http.StatusBadRequest, "Invalid task ID")
return
}
// Verify job belongs to user (unless admin)
isAdmin := isAdminUser(r)
if !isAdmin {
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
} else {
// Admin: verify job exists
var exists bool
err = s.db.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists)
if err != nil || !exists {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
}
// Verify task belongs to job
var taskJobID int64
err = s.db.QueryRow("SELECT job_id FROM tasks WHERE id = ?", taskID).Scan(&taskJobID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Task not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err))
return
}
if taskJobID != jobID {
s.respondError(w, http.StatusBadRequest, "Task does not belong to this job")
return
}
rows, err := s.db.Query(
`SELECT id, task_id, step_name, status, started_at, completed_at, duration_ms, error_message
FROM task_steps WHERE task_id = ? ORDER BY started_at ASC`,
taskID,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query steps: %v", err))
return
}
defer rows.Close()
steps := []types.TaskStep{}
for rows.Next() {
var step types.TaskStep
var startedAt, completedAt sql.NullTime
var durationMs sql.NullInt64
var errorMessage sql.NullString
err := rows.Scan(
&step.ID, &step.TaskID, &step.StepName, &step.Status,
&startedAt, &completedAt, &durationMs, &errorMessage,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan step: %v", err))
return
}
if startedAt.Valid {
step.StartedAt = &startedAt.Time
}
if completedAt.Valid {
step.CompletedAt = &completedAt.Time
}
if durationMs.Valid {
duration := int(durationMs.Int64)
step.DurationMs = &duration
}
if errorMessage.Valid {
step.ErrorMessage = errorMessage.String
}
steps = append(steps, step)
}
s.respondJSON(w, http.StatusOK, steps)
}
// handleRetryTask retries a failed task
func (s *Server) handleRetryTask(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
taskIDStr := chi.URLParam(r, "taskId")
taskID, err := strconv.ParseInt(taskIDStr, 10, 64)
if err != nil {
s.respondError(w, http.StatusBadRequest, "Invalid task ID")
return
}
// Verify job belongs to user
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
// Verify task belongs to job and is in a retryable state
var taskJobID int64
var taskStatus string
var retryCount, maxRetries int
err = s.db.QueryRow(
"SELECT job_id, status, retry_count, max_retries FROM tasks WHERE id = ?",
taskID,
).Scan(&taskJobID, &taskStatus, &retryCount, &maxRetries)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Task not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err))
return
}
if taskJobID != jobID {
s.respondError(w, http.StatusBadRequest, "Task does not belong to this job")
return
}
if taskStatus != string(types.TaskStatusFailed) {
s.respondError(w, http.StatusBadRequest, "Task is not in failed state")
return
}
if retryCount >= maxRetries {
s.respondError(w, http.StatusBadRequest, "Maximum retries exceeded")
return
}
// Reset task to pending
_, err = s.db.Exec(
`UPDATE tasks SET status = ?, runner_id = NULL, current_step = NULL,
error_message = NULL, started_at = NULL, completed_at = NULL
WHERE id = ?`,
types.TaskStatusPending, taskID,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to retry task: %v", err))
return
}
s.respondJSON(w, http.StatusOK, map[string]string{"message": "Task queued for retry"})
}
// handleStreamTaskLogsWebSocket streams task logs via WebSocket
// Note: This is called after auth middleware, so userID is already verified
func (s *Server) handleStreamTaskLogsWebSocket(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
taskIDStr := chi.URLParam(r, "taskId")
taskID, err := strconv.ParseInt(taskIDStr, 10, 64)
if err != nil {
s.respondError(w, http.StatusBadRequest, "Invalid task ID")
return
}
// Verify job belongs to user
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
// Verify task belongs to job
var taskJobID int64
err = s.db.QueryRow("SELECT job_id FROM tasks WHERE id = ?", taskID).Scan(&taskJobID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Task not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err))
return
}
if taskJobID != jobID {
s.respondError(w, http.StatusBadRequest, "Task does not belong to this job")
return
}
// Upgrade to WebSocket
conn, err := s.wsUpgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Failed to upgrade WebSocket: %v", err)
return
}
defer conn.Close()
key := fmt.Sprintf("%d:%d", jobID, taskID)
s.frontendConnsMu.Lock()
s.frontendConns[key] = conn
s.frontendConnsMu.Unlock()
// Create a write mutex for this connection
s.frontendConnsWriteMuMu.Lock()
s.frontendConnsWriteMu[key] = &sync.Mutex{}
writeMu := s.frontendConnsWriteMu[key]
s.frontendConnsWriteMuMu.Unlock()
defer func() {
s.frontendConnsMu.Lock()
delete(s.frontendConns, key)
s.frontendConnsMu.Unlock()
s.frontendConnsWriteMuMu.Lock()
delete(s.frontendConnsWriteMu, key)
s.frontendConnsWriteMuMu.Unlock()
}()
// Send initial connection message
writeMu.Lock()
err = conn.WriteJSON(map[string]interface{}{
"type": "connected",
"timestamp": time.Now().Unix(),
})
writeMu.Unlock()
if err != nil {
log.Printf("Failed to send initial connection message: %v", err)
return
}
// Get last log ID to start streaming from
lastIDStr := r.URL.Query().Get("last_id")
lastID := int64(0)
if lastIDStr != "" {
if id, err := strconv.ParseInt(lastIDStr, 10, 64); err == nil {
lastID = id
}
}
// Send existing logs
// Order by id ASC to ensure consistent ordering and avoid race conditions
rows, err := s.db.Query(
`SELECT id, task_id, runner_id, log_level, message, step_name, created_at
FROM task_logs WHERE task_id = ? AND id > ? ORDER BY id ASC LIMIT 100`,
taskID, lastID,
)
if err == nil {
defer rows.Close()
for rows.Next() {
var log types.TaskLog
var runnerID sql.NullInt64
err := rows.Scan(
&log.ID, &log.TaskID, &runnerID, &log.LogLevel, &log.Message,
&log.StepName, &log.CreatedAt,
)
if err != nil {
continue
}
if runnerID.Valid {
log.RunnerID = &runnerID.Int64
}
// Always update lastID to the highest ID we've seen
if log.ID > lastID {
lastID = log.ID
}
// Serialize writes to prevent concurrent write panics
writeMu.Lock()
writeErr := conn.WriteJSON(map[string]interface{}{
"type": "log",
"data": log,
"timestamp": time.Now().Unix(),
})
writeMu.Unlock()
if writeErr != nil {
// Connection closed, exit the loop
return
}
}
}
// Poll for new logs and send them
// Use shorter interval for more responsive updates, but order by id for consistency
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
rows, err := s.db.Query(
`SELECT id, task_id, runner_id, log_level, message, step_name, created_at
FROM task_logs WHERE task_id = ? AND id > ? ORDER BY id ASC LIMIT 100`,
taskID, lastID,
)
if err != nil {
continue
}
for rows.Next() {
var log types.TaskLog
var runnerID sql.NullInt64
err := rows.Scan(
&log.ID, &log.TaskID, &runnerID, &log.LogLevel, &log.Message,
&log.StepName, &log.CreatedAt,
)
if err != nil {
continue
}
if runnerID.Valid {
log.RunnerID = &runnerID.Int64
}
// Always update lastID to the highest ID we've seen
if log.ID > lastID {
lastID = log.ID
}
// Serialize writes to prevent concurrent write panics
writeMu.Lock()
err = conn.WriteJSON(map[string]interface{}{
"type": "log",
"data": log,
"timestamp": time.Now().Unix(),
})
writeMu.Unlock()
if err != nil {
// Connection closed, exit the loop
return
}
}
rows.Close()
}
}
}