Files
jiggablend/internal/api/jobs.go

1956 lines
57 KiB
Go

package api
import (
"database/sql"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
authpkg "jiggablend/internal/auth"
"jiggablend/pkg/types"
"github.com/go-chi/chi/v5"
)
// isAdminUser checks if the current user is an admin
func isAdminUser(r *http.Request) bool {
return authpkg.IsAdmin(r.Context())
}
// handleCreateJob creates a new job
func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
var req types.CreateJobRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
s.respondError(w, http.StatusBadRequest, "Invalid request body")
return
}
// Validate job type
if req.JobType != types.JobTypeMetadata && req.JobType != types.JobTypeRender {
s.respondError(w, http.StatusBadRequest, "Invalid job_type: must be 'metadata' or 'render'")
return
}
if req.Name == "" {
s.respondError(w, http.StatusBadRequest, "Job name is required")
return
}
// Validate render job requirements
if req.JobType == types.JobTypeRender {
if req.FrameStart == nil || req.FrameEnd == nil {
s.respondError(w, http.StatusBadRequest, "frame_start and frame_end are required for render jobs")
return
}
if *req.FrameStart < 0 || *req.FrameEnd < *req.FrameStart {
s.respondError(w, http.StatusBadRequest, "Invalid frame range")
return
}
// Validate frame range limits (prevent abuse)
const maxFrameRange = 10000
if *req.FrameEnd-*req.FrameStart+1 > maxFrameRange {
s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Frame range too large. Maximum allowed: %d frames", maxFrameRange))
return
}
if req.OutputFormat == nil || *req.OutputFormat == "" {
defaultFormat := "PNG"
req.OutputFormat = &defaultFormat
}
}
// Default allow_parallel_runners to true for render jobs if not provided
var allowParallelRunners *bool
if req.JobType == types.JobTypeRender {
allowParallelRunners = new(bool)
*allowParallelRunners = true
if req.AllowParallelRunners != nil {
*allowParallelRunners = *req.AllowParallelRunners
}
}
// Set job timeout to 24 hours (86400 seconds)
jobTimeout := 86400
// Build SQL query based on job type
var jobID int64
if req.JobType == types.JobTypeMetadata {
// Metadata jobs don't need frame range or output format
err = s.db.QueryRow(
`INSERT INTO jobs (user_id, job_type, name, status, progress, timeout_seconds)
VALUES (?, ?, ?, ?, ?, ?)
RETURNING id`,
userID, req.JobType, req.Name, types.JobStatusPending, 0.0, jobTimeout,
).Scan(&jobID)
} else {
// Render jobs need all fields
err = s.db.QueryRow(
`INSERT INTO jobs (user_id, job_type, name, status, progress, frame_start, frame_end, output_format, allow_parallel_runners, timeout_seconds)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
RETURNING id`,
userID, req.JobType, req.Name, types.JobStatusPending, 0.0, *req.FrameStart, *req.FrameEnd, *req.OutputFormat, allowParallelRunners, jobTimeout,
).Scan(&jobID)
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create job: %v", err))
return
}
// For render jobs, copy input files from metadata job if specified
if req.JobType == types.JobTypeRender && req.MetadataJobID != nil {
// Verify metadata job exists and belongs to the same user
var metadataJobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ? AND job_type = ?", *req.MetadataJobID, types.JobTypeMetadata).Scan(&metadataJobUserID)
if err == nil && metadataJobUserID == userID {
// Copy input files from metadata job to render job
_, err = s.db.Exec(
`INSERT INTO job_files (job_id, file_type, file_path, file_name, file_size)
SELECT ?, file_type, file_path, file_name, file_size
FROM job_files
WHERE job_id = ? AND file_type = ?`,
jobID, *req.MetadataJobID, types.JobFileTypeInput,
)
if err != nil {
log.Printf("Warning: Failed to copy input files from metadata job %d to render job %d: %v", *req.MetadataJobID, jobID, err)
} else {
log.Printf("Copied input files from metadata job %d to render job %d", *req.MetadataJobID, jobID)
}
} else {
log.Printf("Warning: Metadata job %d not found or doesn't belong to user %d, skipping file copy", *req.MetadataJobID, userID)
}
}
// Only create render tasks for render jobs
if req.JobType == types.JobTypeRender {
// Determine task timeout based on output format
taskTimeout := 300 // Default: 5 minutes for frame rendering
if *req.OutputFormat == "MP4" {
// For MP4, we'll create frame tasks with 5 min timeout
// Video generation tasks will be created later with 24h timeout
taskTimeout = 300
}
// Create tasks for the job
// If allow_parallel_runners is false, create a single task for all frames
// Otherwise, create one task per frame for parallel processing
if allowParallelRunners != nil && !*allowParallelRunners {
// Single task for entire frame range
_, err = s.db.Exec(
`INSERT INTO tasks (job_id, frame_start, frame_end, task_type, status, timeout_seconds, max_retries)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
jobID, *req.FrameStart, *req.FrameEnd, types.TaskTypeRender, types.TaskStatusPending, taskTimeout, 3,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create task: %v", err))
return
}
log.Printf("Created 1 render task for job %d (frames %d-%d, single runner)", jobID, *req.FrameStart, *req.FrameEnd)
} else {
// One task per frame for parallel processing
for frame := *req.FrameStart; frame <= *req.FrameEnd; frame++ {
_, err = s.db.Exec(
`INSERT INTO tasks (job_id, frame_start, frame_end, task_type, status, timeout_seconds, max_retries)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
jobID, frame, frame, types.TaskTypeRender, types.TaskStatusPending, taskTimeout, 3,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create tasks: %v", err))
return
}
}
log.Printf("Created %d render tasks for job %d (frames %d-%d, parallel)", *req.FrameEnd-*req.FrameStart+1, jobID, *req.FrameStart, *req.FrameEnd)
}
// Update job status (should be pending since tasks are pending)
s.updateJobStatusFromTasks(jobID)
} else {
log.Printf("Created metadata extraction job %d (no render tasks)", jobID)
}
// Build response job object
job := types.Job{
ID: jobID,
UserID: userID,
JobType: req.JobType,
Name: req.Name,
Status: types.JobStatusPending,
Progress: 0.0,
TimeoutSeconds: jobTimeout,
CreatedAt: time.Now(),
}
if req.JobType == types.JobTypeRender {
job.FrameStart = req.FrameStart
job.FrameEnd = req.FrameEnd
job.OutputFormat = req.OutputFormat
job.AllowParallelRunners = allowParallelRunners
}
// Immediately try to distribute tasks to connected runners
go s.distributeTasksToRunners()
s.respondJSON(w, http.StatusCreated, job)
}
// handleListJobs lists jobs for the current user
func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
// Filter out metadata jobs for non-admin users
isAdmin := isAdminUser(r)
var query string
if isAdmin {
query = `SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format,
allow_parallel_runners, timeout_seconds, blend_metadata, created_at, started_at, completed_at, error_message
FROM jobs WHERE user_id = ? ORDER BY created_at DESC`
} else {
query = `SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format,
allow_parallel_runners, timeout_seconds, blend_metadata, created_at, started_at, completed_at, error_message
FROM jobs WHERE user_id = ? AND job_type != ? ORDER BY created_at DESC`
}
var rows *sql.Rows
if isAdmin {
rows, err = s.db.Query(query, userID)
} else {
rows, err = s.db.Query(query, userID, types.JobTypeMetadata)
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query jobs: %v", err))
return
}
defer rows.Close()
jobs := []types.Job{}
for rows.Next() {
var job types.Job
var jobType string
var startedAt, completedAt sql.NullTime
var blendMetadataJSON sql.NullString
var errorMessage sql.NullString
var frameStart, frameEnd sql.NullInt64
var outputFormat sql.NullString
var allowParallelRunners sql.NullBool
err := rows.Scan(
&job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress,
&frameStart, &frameEnd, &outputFormat, &allowParallelRunners, &job.TimeoutSeconds,
&blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &errorMessage,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan job: %v", err))
return
}
job.JobType = types.JobType(jobType)
if frameStart.Valid {
fs := int(frameStart.Int64)
job.FrameStart = &fs
}
if frameEnd.Valid {
fe := int(frameEnd.Int64)
job.FrameEnd = &fe
}
if outputFormat.Valid {
job.OutputFormat = &outputFormat.String
}
if allowParallelRunners.Valid {
job.AllowParallelRunners = &allowParallelRunners.Bool
}
if startedAt.Valid {
job.StartedAt = &startedAt.Time
}
if completedAt.Valid {
job.CompletedAt = &completedAt.Time
}
if blendMetadataJSON.Valid && blendMetadataJSON.String != "" {
var metadata types.BlendMetadata
if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err == nil {
job.BlendMetadata = &metadata
}
}
if errorMessage.Valid {
job.ErrorMessage = errorMessage.String
}
jobs = append(jobs, job)
}
s.respondJSON(w, http.StatusOK, jobs)
}
// handleGetJob gets a specific job
func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
var job types.Job
var jobType string
var startedAt, completedAt sql.NullTime
var blendMetadataJSON sql.NullString
var errorMessage sql.NullString
var frameStart, frameEnd sql.NullInt64
var outputFormat sql.NullString
var allowParallelRunners sql.NullBool
// Allow admins to view any job, regular users can only view their own (and not metadata jobs)
isAdmin := isAdminUser(r)
var err2 error
if isAdmin {
err2 = s.db.QueryRow(
`SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format,
allow_parallel_runners, timeout_seconds, blend_metadata, created_at, started_at, completed_at, error_message
FROM jobs WHERE id = ?`,
jobID,
).Scan(
&job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress,
&frameStart, &frameEnd, &outputFormat, &allowParallelRunners, &job.TimeoutSeconds,
&blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &errorMessage,
)
} else {
err2 = s.db.QueryRow(
`SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format,
allow_parallel_runners, timeout_seconds, blend_metadata, created_at, started_at, completed_at, error_message
FROM jobs WHERE id = ? AND user_id = ? AND job_type != ?`,
jobID, userID, types.JobTypeMetadata,
).Scan(
&job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress,
&frameStart, &frameEnd, &outputFormat, &allowParallelRunners, &job.TimeoutSeconds,
&blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &errorMessage,
)
}
if err2 == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err2 != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err2))
return
}
job.JobType = types.JobType(jobType)
if frameStart.Valid {
fs := int(frameStart.Int64)
job.FrameStart = &fs
}
if frameEnd.Valid {
fe := int(frameEnd.Int64)
job.FrameEnd = &fe
}
if outputFormat.Valid {
job.OutputFormat = &outputFormat.String
}
if allowParallelRunners.Valid {
job.AllowParallelRunners = &allowParallelRunners.Bool
}
if startedAt.Valid {
job.StartedAt = &startedAt.Time
}
if completedAt.Valid {
job.CompletedAt = &completedAt.Time
}
if blendMetadataJSON.Valid && blendMetadataJSON.String != "" {
var metadata types.BlendMetadata
if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err == nil {
job.BlendMetadata = &metadata
}
}
if errorMessage.Valid {
job.ErrorMessage = errorMessage.String
}
s.respondJSON(w, http.StatusOK, job)
}
// handleCancelJob cancels a job
func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Check if this is a metadata extraction job - if so, don't cancel running metadata tasks
var jobType string
var jobStatus string
err = s.db.QueryRow("SELECT job_type, status FROM jobs WHERE id = ? AND user_id = ?", jobID, userID).Scan(&jobType, &jobStatus)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
// Don't allow cancelling already completed or cancelled jobs
if jobStatus == string(types.JobStatusCompleted) || jobStatus == string(types.JobStatusCancelled) {
s.respondJSON(w, http.StatusOK, map[string]string{"message": "Job already " + jobStatus})
return
}
isMetadataExtractionJob := types.JobType(jobType) == types.JobTypeMetadata
result, err := s.db.Exec(
`UPDATE jobs SET status = ? WHERE id = ? AND user_id = ?`,
types.JobStatusCancelled, jobID, userID,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to cancel job: %v", err))
return
}
rowsAffected, _ := result.RowsAffected()
if rowsAffected == 0 {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
log.Printf("Cancelling job %d (type: %s, isMetadataExtraction: %v)", jobID, jobType, isMetadataExtractionJob)
// For metadata extraction jobs, be more careful - only cancel if no metadata task is running
if isMetadataExtractionJob {
// Check if there's a running metadata task
var runningMetadataTask int
s.db.QueryRow(
`SELECT COUNT(*) FROM tasks WHERE job_id = ? AND task_type = ? AND status = ?`,
jobID, types.TaskTypeMetadata, types.TaskStatusRunning,
).Scan(&runningMetadataTask)
if runningMetadataTask > 0 {
log.Printf("Job %d has running metadata task, preserving it", jobID)
// Don't cancel running metadata tasks - let them complete
// Only cancel pending tasks that aren't metadata
_, err = s.db.Exec(
`UPDATE tasks SET status = ?
WHERE job_id = ? AND status = ? AND task_type != ?`,
types.TaskStatusFailed, jobID, types.TaskStatusPending, types.TaskTypeMetadata,
)
} else {
// No running metadata task, safe to cancel pending metadata tasks
_, err = s.db.Exec(
`UPDATE tasks SET status = ?
WHERE job_id = ? AND status = ?`,
types.TaskStatusFailed, jobID, types.TaskStatusPending,
)
}
} else {
// For regular jobs, cancel pending tasks (but preserve running metadata tasks)
_, err = s.db.Exec(
`UPDATE tasks SET status = ?
WHERE job_id = ? AND status = ?
AND NOT (task_type = ? AND runner_id IS NOT NULL)`,
types.TaskStatusFailed, jobID, types.TaskStatusPending, types.TaskTypeMetadata,
)
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to cancel tasks: %v", err))
return
}
s.respondJSON(w, http.StatusOK, map[string]string{"message": "Job cancelled"})
}
// handleDeleteJob permanently deletes a job and all its associated data
func (s *Server) handleDeleteJob(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Verify job belongs to user (unless admin) and check status
isAdmin := isAdminUser(r)
var jobUserID int64
var jobStatus string
if isAdmin {
err = s.db.QueryRow("SELECT user_id, status FROM jobs WHERE id = ?", jobID).Scan(&jobUserID, &jobStatus)
} else {
// Non-admin users can only delete their own jobs, and not metadata jobs
err = s.db.QueryRow("SELECT user_id, status FROM jobs WHERE id = ? AND user_id = ? AND job_type != ?", jobID, userID, types.JobTypeMetadata).Scan(&jobUserID, &jobStatus)
}
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if !isAdmin && jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
// Prevent deletion of jobs that are still cancellable (pending or running)
if jobStatus == string(types.JobStatusPending) || jobStatus == string(types.JobStatusRunning) {
s.respondError(w, http.StatusBadRequest, "Cannot delete a job that is pending or running. Please cancel it first.")
return
}
// Delete in transaction to ensure consistency
tx, err := s.db.Begin()
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to start transaction: %v", err))
return
}
defer tx.Rollback()
// Delete task logs
_, err = tx.Exec(`DELETE FROM task_logs WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID)
if err != nil {
tx.Rollback()
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete task logs: %v", err))
return
}
// Delete task steps
_, err = tx.Exec(`DELETE FROM task_steps WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID)
if err != nil {
tx.Rollback()
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete task steps: %v", err))
return
}
// Delete tasks
_, err = tx.Exec("DELETE FROM tasks WHERE job_id = ?", jobID)
if err != nil {
tx.Rollback()
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete tasks: %v", err))
return
}
// Delete job files
_, err = tx.Exec("DELETE FROM job_files WHERE job_id = ?", jobID)
if err != nil {
tx.Rollback()
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete job files: %v", err))
return
}
// Delete the job
_, err = tx.Exec("DELETE FROM jobs WHERE id = ?", jobID)
if err != nil {
tx.Rollback()
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete job: %v", err))
return
}
// Commit transaction
if err = tx.Commit(); err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to commit transaction: %v", err))
return
}
// Delete physical files
if err := s.storage.DeleteJobFiles(jobID); err != nil {
log.Printf("Warning: Failed to delete job files for job %d: %v", jobID, err)
// Don't fail the request if file deletion fails - the database records are already deleted
}
log.Printf("Deleted job %d (user: %d, admin: %v)", jobID, jobUserID, isAdmin)
s.respondJSON(w, http.StatusOK, map[string]string{"message": "Job deleted"})
}
// cleanupOldMetadataJobs periodically deletes metadata jobs older than 1 day
func (s *Server) cleanupOldMetadataJobs() {
// Run cleanup every hour
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
// Run once immediately on startup
s.cleanupMetadataJobs()
s.cleanupOldRenderJobs()
for range ticker.C {
s.cleanupMetadataJobs()
s.cleanupOldRenderJobs()
}
}
// cleanupMetadataJobs finds and deletes metadata jobs older than 1 day
func (s *Server) cleanupMetadataJobs() {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic in cleanupMetadataJobs: %v", r)
}
}()
// Find metadata jobs older than 1 day
rows, err := s.db.Query(
`SELECT id FROM jobs
WHERE job_type = ?
AND created_at < CURRENT_TIMESTAMP - INTERVAL '1 day'`,
types.JobTypeMetadata,
)
if err != nil {
log.Printf("Failed to query old metadata jobs: %v", err)
return
}
defer rows.Close()
var jobIDs []int64
for rows.Next() {
var jobID int64
if err := rows.Scan(&jobID); err == nil {
jobIDs = append(jobIDs, jobID)
}
}
rows.Close()
if len(jobIDs) == 0 {
return
}
log.Printf("Cleaning up %d old metadata jobs", len(jobIDs))
// Delete each job
for _, jobID := range jobIDs {
// Delete in transaction to ensure consistency
tx, err := s.db.Begin()
if err != nil {
log.Printf("Failed to start transaction for job %d: %v", jobID, err)
continue
}
// Delete task logs
_, err = tx.Exec(`DELETE FROM task_logs WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete task logs for job %d: %v", jobID, err)
continue
}
// Delete task steps
_, err = tx.Exec(`DELETE FROM task_steps WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete task steps for job %d: %v", jobID, err)
continue
}
// Delete tasks
_, err = tx.Exec("DELETE FROM tasks WHERE job_id = ?", jobID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete tasks for job %d: %v", jobID, err)
continue
}
// Delete job files
_, err = tx.Exec("DELETE FROM job_files WHERE job_id = ?", jobID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete job files for job %d: %v", jobID, err)
continue
}
// Delete the job
_, err = tx.Exec("DELETE FROM jobs WHERE id = ?", jobID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete job %d: %v", jobID, err)
continue
}
// Commit transaction
if err = tx.Commit(); err != nil {
log.Printf("Failed to commit transaction for job %d: %v", jobID, err)
continue
}
// Delete physical files (best effort, don't fail if this errors)
if err := s.storage.DeleteJobFiles(jobID); err != nil {
log.Printf("Warning: Failed to delete files for metadata job %d: %v", jobID, err)
}
}
log.Printf("Cleaned up %d old metadata jobs", len(jobIDs))
}
// cleanupOldRenderJobs finds and deletes render jobs older than 1 month that are completed, failed, or cancelled
func (s *Server) cleanupOldRenderJobs() {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic in cleanupOldRenderJobs: %v", r)
}
}()
// Find render jobs older than 1 month that are in a final state (completed, failed, or cancelled)
// Don't delete running or pending jobs
rows, err := s.db.Query(
`SELECT id FROM jobs
WHERE job_type = ?
AND status IN (?, ?, ?)
AND created_at < CURRENT_TIMESTAMP - INTERVAL '1 month'`,
types.JobTypeRender,
types.JobStatusCompleted,
types.JobStatusFailed,
types.JobStatusCancelled,
)
if err != nil {
log.Printf("Failed to query old render jobs: %v", err)
return
}
defer rows.Close()
var jobIDs []int64
for rows.Next() {
var jobID int64
if err := rows.Scan(&jobID); err == nil {
jobIDs = append(jobIDs, jobID)
}
}
rows.Close()
if len(jobIDs) == 0 {
return
}
log.Printf("Cleaning up %d old render jobs", len(jobIDs))
// Delete each job
for _, jobID := range jobIDs {
// Delete in transaction to ensure consistency
tx, err := s.db.Begin()
if err != nil {
log.Printf("Failed to start transaction for job %d: %v", jobID, err)
continue
}
// Delete task logs
_, err = tx.Exec(`DELETE FROM task_logs WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete task logs for job %d: %v", jobID, err)
continue
}
// Delete task steps
_, err = tx.Exec(`DELETE FROM task_steps WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete task steps for job %d: %v", jobID, err)
continue
}
// Delete tasks
_, err = tx.Exec("DELETE FROM tasks WHERE job_id = ?", jobID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete tasks for job %d: %v", jobID, err)
continue
}
// Delete job files
_, err = tx.Exec("DELETE FROM job_files WHERE job_id = ?", jobID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete job files for job %d: %v", jobID, err)
continue
}
// Delete the job
_, err = tx.Exec("DELETE FROM jobs WHERE id = ?", jobID)
if err != nil {
tx.Rollback()
log.Printf("Failed to delete job %d: %v", jobID, err)
continue
}
// Commit transaction
if err = tx.Commit(); err != nil {
log.Printf("Failed to commit transaction for job %d: %v", jobID, err)
continue
}
// Delete physical files (best effort, don't fail if this errors)
if err := s.storage.DeleteJobFiles(jobID); err != nil {
log.Printf("Warning: Failed to delete files for render job %d: %v", jobID, err)
}
}
log.Printf("Cleaned up %d old render jobs", len(jobIDs))
}
// handleUploadJobFile handles file upload for a job
func (s *Server) handleUploadJobFile(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Verify job belongs to user
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
// Parse multipart form
err = r.ParseMultipartForm(500 << 20) // 500 MB (larger for ZIP files)
if err != nil {
s.respondError(w, http.StatusBadRequest, "Failed to parse form")
return
}
file, header, err := r.FormFile("file")
if err != nil {
s.respondError(w, http.StatusBadRequest, "No file provided")
return
}
defer file.Close()
jobPath := s.storage.JobPath(jobID)
if err := os.MkdirAll(jobPath, 0755); err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create job directory: %v", err))
return
}
var fileID int64
var mainBlendFile string
var extractedFiles []string
// Check if this is a ZIP file
if strings.HasSuffix(strings.ToLower(header.Filename), ".zip") {
// Extract ZIP file
zipPath := filepath.Join(jobPath, header.Filename)
zipFile, err := os.Create(zipPath)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create ZIP file: %v", err))
return
}
_, err = io.Copy(zipFile, file)
zipFile.Close()
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to save ZIP file: %v", err))
return
}
// Record ZIP file in database
zipInfo, _ := os.Stat(zipPath)
err = s.db.QueryRow(
`INSERT INTO job_files (job_id, file_type, file_path, file_name, file_size)
VALUES (?, ?, ?, ?, ?)
RETURNING id`,
jobID, types.JobFileTypeInput, zipPath, header.Filename, zipInfo.Size(),
).Scan(&fileID)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to record ZIP file: %v", err))
return
}
// Extract ZIP file
extractedFiles, err = s.storage.ExtractZip(zipPath, jobPath)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to extract ZIP file: %v", err))
return
}
// Find main blend file (check for user selection first, then auto-detect)
mainBlendParam := r.FormValue("main_blend_file")
if mainBlendParam != "" {
// User specified main blend file
mainBlendFile = filepath.Join(jobPath, mainBlendParam)
if _, err := os.Stat(mainBlendFile); err != nil {
s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Specified main blend file not found: %s", mainBlendParam))
return
}
} else {
// Auto-detect: find blend files in root directory
blendFiles := []string{}
err := filepath.Walk(jobPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// Only check files in root directory (not subdirectories)
relPath, _ := filepath.Rel(jobPath, path)
if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".blend") {
// Check if it's in root (no path separators)
if !strings.Contains(relPath, string(filepath.Separator)) {
blendFiles = append(blendFiles, path)
}
}
return nil
})
if err == nil && len(blendFiles) == 1 {
// Only one blend file in root - use it
mainBlendFile = blendFiles[0]
} else if len(blendFiles) > 1 {
// Multiple blend files - need user to specify
// Return list of blend files for user to choose
blendFileNames := []string{}
for _, f := range blendFiles {
rel, _ := filepath.Rel(jobPath, f)
blendFileNames = append(blendFileNames, rel)
}
s.respondJSON(w, http.StatusOK, map[string]interface{}{
"zip_extracted": true,
"blend_files": blendFileNames,
"message": "Multiple blend files found. Please specify the main blend file.",
})
return
}
}
// Record extracted files in database
for _, extractedFile := range extractedFiles {
relPath, _ := filepath.Rel(jobPath, extractedFile)
info, err := os.Stat(extractedFile)
if err == nil && !info.IsDir() {
_, _ = s.db.Exec(
`INSERT INTO job_files (job_id, file_type, file_path, file_name, file_size)
VALUES (?, ?, ?, ?, ?)`,
jobID, types.JobFileTypeInput, extractedFile, relPath, info.Size(),
)
}
}
} else {
// Regular file upload (not ZIP)
filePath, err := s.storage.SaveUpload(jobID, header.Filename, file)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to save file: %v", err))
return
}
// Record in database
err = s.db.QueryRow(
`INSERT INTO job_files (job_id, file_type, file_path, file_name, file_size)
VALUES (?, ?, ?, ?, ?)
RETURNING id`,
jobID, types.JobFileTypeInput, filePath, header.Filename, header.Size,
).Scan(&fileID)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to record file: %v", err))
return
}
mainBlendFile = filePath
}
// If we have a main blend file (from ZIP extraction or direct upload), create metadata extraction task
// But ONLY for metadata extraction jobs (temporary jobs created during the initial two-step submission flow)
// Never create metadata tasks for regular render jobs, even if they receive blend files later
blendFileToCheck := mainBlendFile
if blendFileToCheck == "" && strings.HasSuffix(strings.ToLower(header.Filename), ".blend") {
// Direct blend file upload (not from ZIP)
blendFileToCheck = s.storage.JobPath(jobID)
blendFileToCheck = filepath.Join(blendFileToCheck, header.Filename)
}
if blendFileToCheck != "" && strings.HasSuffix(strings.ToLower(blendFileToCheck), ".blend") {
// Check if this is a metadata extraction job
var jobType string
var hasRenderTasks int
err = s.db.QueryRow(
`SELECT j.job_type, COUNT(t.id)
FROM jobs j
LEFT JOIN tasks t ON j.id = t.job_id AND t.task_type = 'render'
WHERE j.id = ?
GROUP BY j.id, j.job_type`,
jobID,
).Scan(&jobType, &hasRenderTasks)
// Only create metadata extraction task if:
// 1. Job type is "metadata" (temporary job from initial submission)
// 2. Job has no render tasks (not a regular render job)
if err == nil && types.JobType(jobType) == types.JobTypeMetadata && hasRenderTasks == 0 {
// Check if metadata task already exists to avoid duplicates
var existingMetadataTask int
s.db.QueryRow(
`SELECT COUNT(*) FROM tasks WHERE job_id = ? AND task_type = 'metadata'`,
jobID,
).Scan(&existingMetadataTask)
if existingMetadataTask == 0 {
// Create metadata extraction task
metadataTaskTimeout := 300 // 5 minutes for metadata extraction
var metadataTaskID int64
err = s.db.QueryRow(
`INSERT INTO tasks (job_id, frame_start, frame_end, task_type, status, timeout_seconds, max_retries)
VALUES (?, ?, ?, ?, ?, ?, ?)
RETURNING id`,
jobID, 0, 0, types.TaskTypeMetadata, types.TaskStatusPending, metadataTaskTimeout, 1,
).Scan(&metadataTaskID)
if err != nil {
log.Printf("Failed to create metadata extraction task: %v", err)
} else {
log.Printf("Created metadata extraction task %d for job %d (initial submission)", metadataTaskID, jobID)
// Log task creation to task logs
s.logTaskEvent(metadataTaskID, nil, types.LogLevelInfo, "Metadata extraction task created", "")
// Try to distribute the task immediately (with small delay to ensure transaction is committed)
go func() {
time.Sleep(100 * time.Millisecond) // Small delay to ensure transaction is committed
s.distributeTasksToRunners()
}()
}
} else {
log.Printf("Skipping metadata extraction task creation for job %d (metadata task already exists)", jobID)
}
} else {
log.Printf("Skipping metadata extraction task creation for job %d (not an initial metadata extraction job)", jobID)
}
}
response := map[string]interface{}{
"id": fileID,
"file_name": header.Filename,
"file_size": header.Size,
}
if strings.HasSuffix(strings.ToLower(header.Filename), ".zip") {
response["zip_extracted"] = true
response["extracted_files_count"] = len(extractedFiles)
if mainBlendFile != "" {
relPath, _ := filepath.Rel(s.storage.JobPath(jobID), mainBlendFile)
response["main_blend_file"] = relPath
}
} else {
response["file_path"] = s.storage.JobPath(jobID)
response["file_path"] = filepath.Join(response["file_path"].(string), header.Filename)
}
s.respondJSON(w, http.StatusCreated, response)
}
// handleListJobFiles lists files for a job
func (s *Server) handleListJobFiles(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Verify job belongs to user (unless admin)
isAdmin := isAdminUser(r)
if !isAdmin {
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
} else {
// Admin: verify job exists
var exists bool
err = s.db.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists)
if err != nil || !exists {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
}
rows, err := s.db.Query(
`SELECT id, job_id, file_type, file_path, file_name, file_size, created_at
FROM job_files WHERE job_id = ? ORDER BY created_at DESC`,
jobID,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query files: %v", err))
return
}
defer rows.Close()
files := []types.JobFile{}
for rows.Next() {
var file types.JobFile
err := rows.Scan(
&file.ID, &file.JobID, &file.FileType, &file.FilePath,
&file.FileName, &file.FileSize, &file.CreatedAt,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan file: %v", err))
return
}
files = append(files, file)
}
s.respondJSON(w, http.StatusOK, files)
}
// handleDownloadJobFile downloads a job file
func (s *Server) handleDownloadJobFile(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
fileID, err := parseID(r, "fileId")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Verify job belongs to user (unless admin)
isAdmin := isAdminUser(r)
if !isAdmin {
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
} else {
// Admin: verify job exists
var exists bool
err = s.db.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists)
if err != nil || !exists {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
}
// Get file info
var filePath, fileName string
err = s.db.QueryRow(
`SELECT file_path, file_name FROM job_files WHERE id = ? AND job_id = ?`,
fileID, jobID,
).Scan(&filePath, &fileName)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "File not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query file: %v", err))
return
}
// Open file
file, err := s.storage.GetFile(filePath)
if err != nil {
s.respondError(w, http.StatusNotFound, "File not found on disk")
return
}
defer file.Close()
// Determine content type based on file extension
contentType := "application/octet-stream"
disposition := "attachment"
fileNameLower := strings.ToLower(fileName)
switch {
case strings.HasSuffix(fileNameLower, ".png"):
contentType = "image/png"
disposition = "inline"
case strings.HasSuffix(fileNameLower, ".jpg") || strings.HasSuffix(fileNameLower, ".jpeg"):
contentType = "image/jpeg"
disposition = "inline"
case strings.HasSuffix(fileNameLower, ".gif"):
contentType = "image/gif"
disposition = "inline"
case strings.HasSuffix(fileNameLower, ".webp"):
contentType = "image/webp"
disposition = "inline"
case strings.HasSuffix(fileNameLower, ".bmp"):
contentType = "image/bmp"
disposition = "inline"
case strings.HasSuffix(fileNameLower, ".svg"):
contentType = "image/svg+xml"
disposition = "inline"
}
// Set headers
w.Header().Set("Content-Disposition", fmt.Sprintf("%s; filename=%s", disposition, fileName))
w.Header().Set("Content-Type", contentType)
// Stream file
io.Copy(w, file)
}
// handleStreamVideo streams MP4 video file with range support
func (s *Server) handleStreamVideo(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Verify job belongs to user (unless admin)
isAdmin := isAdminUser(r)
var jobUserID int64
var outputFormat string
if isAdmin {
err = s.db.QueryRow("SELECT user_id, output_format FROM jobs WHERE id = ?", jobID).Scan(&jobUserID, &outputFormat)
} else {
err = s.db.QueryRow("SELECT user_id, output_format FROM jobs WHERE id = ? AND user_id = ?", jobID, userID).Scan(&jobUserID, &outputFormat)
}
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err))
return
}
if !isAdmin && jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
// Find MP4 file
var filePath, fileName string
err = s.db.QueryRow(
`SELECT file_path, file_name FROM job_files
WHERE job_id = ? AND file_type = ? AND file_name LIKE '%.mp4'
ORDER BY created_at DESC LIMIT 1`,
jobID, types.JobFileTypeOutput,
).Scan(&filePath, &fileName)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Video file not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query file: %v", err))
return
}
// Open file
file, err := s.storage.GetFile(filePath)
if err != nil {
s.respondError(w, http.StatusNotFound, "File not found on disk")
return
}
defer file.Close()
// Get file info
fileInfo, err := file.Stat()
if err != nil {
s.respondError(w, http.StatusInternalServerError, "Failed to get file info")
return
}
fileSize := fileInfo.Size()
// Handle range requests for video seeking
rangeHeader := r.Header.Get("Range")
if rangeHeader != "" {
// Parse range header
var start, end int64
fmt.Sscanf(rangeHeader, "bytes=%d-%d", &start, &end)
if end == 0 {
end = fileSize - 1
}
// Seek to start position
file.Seek(start, 0)
// Set headers for partial content
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, fileSize))
w.Header().Set("Accept-Ranges", "bytes")
w.Header().Set("Content-Length", fmt.Sprintf("%d", end-start+1))
w.Header().Set("Content-Type", "video/mp4")
w.WriteHeader(http.StatusPartialContent)
// Copy partial content
io.CopyN(w, file, end-start+1)
} else {
// Full file
w.Header().Set("Content-Type", "video/mp4")
w.Header().Set("Content-Length", fmt.Sprintf("%d", fileSize))
w.Header().Set("Accept-Ranges", "bytes")
io.Copy(w, file)
}
}
// handleListJobTasks lists all tasks for a job
func (s *Server) handleListJobTasks(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
// Verify job belongs to user (unless admin)
isAdmin := isAdminUser(r)
if !isAdmin {
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
} else {
// Admin: verify job exists
var exists bool
err = s.db.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists)
if err != nil || !exists {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
}
rows, err := s.db.Query(
`SELECT id, job_id, runner_id, frame_start, frame_end, status, task_type,
current_step, retry_count, max_retries, output_path, created_at, started_at,
completed_at, error_message, timeout_seconds
FROM tasks WHERE job_id = ? ORDER BY frame_start ASC`,
jobID,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query tasks: %v", err))
return
}
defer rows.Close()
tasks := []types.Task{}
for rows.Next() {
var task types.Task
var runnerID sql.NullInt64
var startedAt, completedAt sql.NullTime
var timeoutSeconds sql.NullInt64
var errorMessage sql.NullString
var currentStep sql.NullString
var outputPath sql.NullString
err := rows.Scan(
&task.ID, &task.JobID, &runnerID, &task.FrameStart, &task.FrameEnd,
&task.Status, &task.TaskType, &currentStep, &task.RetryCount,
&task.MaxRetries, &outputPath, &task.CreatedAt, &startedAt,
&completedAt, &errorMessage, &timeoutSeconds,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan task: %v", err))
return
}
if runnerID.Valid {
task.RunnerID = &runnerID.Int64
}
if startedAt.Valid {
task.StartedAt = &startedAt.Time
}
if completedAt.Valid {
task.CompletedAt = &completedAt.Time
}
if timeoutSeconds.Valid {
timeout := int(timeoutSeconds.Int64)
task.TimeoutSeconds = &timeout
}
if errorMessage.Valid {
task.ErrorMessage = errorMessage.String
}
if currentStep.Valid {
task.CurrentStep = currentStep.String
}
if outputPath.Valid {
task.OutputPath = outputPath.String
}
tasks = append(tasks, task)
}
s.respondJSON(w, http.StatusOK, tasks)
}
// handleGetTaskLogs retrieves logs for a specific task
func (s *Server) handleGetTaskLogs(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
taskIDStr := chi.URLParam(r, "taskId")
taskID, err := strconv.ParseInt(taskIDStr, 10, 64)
if err != nil {
s.respondError(w, http.StatusBadRequest, "Invalid task ID")
return
}
// Verify job belongs to user (unless admin)
isAdmin := isAdminUser(r)
if !isAdmin {
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
} else {
// Admin: verify job exists
var exists bool
err = s.db.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists)
if err != nil || !exists {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
}
// Verify task belongs to job
var taskJobID int64
err = s.db.QueryRow("SELECT job_id FROM tasks WHERE id = ?", taskID).Scan(&taskJobID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Task not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err))
return
}
if taskJobID != jobID {
s.respondError(w, http.StatusBadRequest, "Task does not belong to this job")
return
}
// Get query parameters for filtering
stepName := r.URL.Query().Get("step_name")
logLevel := r.URL.Query().Get("log_level")
limitStr := r.URL.Query().Get("limit")
limit := 1000 // default
if limitStr != "" {
if l, err := strconv.Atoi(limitStr); err == nil && l > 0 {
limit = l
}
}
// Build query
query := `SELECT id, task_id, runner_id, log_level, message, step_name, created_at
FROM task_logs WHERE task_id = ?`
args := []interface{}{taskID}
if stepName != "" {
query += " AND step_name = ?"
args = append(args, stepName)
}
if logLevel != "" {
query += " AND log_level = ?"
args = append(args, logLevel)
}
query += " ORDER BY created_at ASC LIMIT ?"
args = append(args, limit)
rows, err := s.db.Query(query, args...)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query logs: %v", err))
return
}
defer rows.Close()
logs := []types.TaskLog{}
for rows.Next() {
var log types.TaskLog
var runnerID sql.NullInt64
err := rows.Scan(
&log.ID, &log.TaskID, &runnerID, &log.LogLevel, &log.Message,
&log.StepName, &log.CreatedAt,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan log: %v", err))
return
}
if runnerID.Valid {
log.RunnerID = &runnerID.Int64
}
logs = append(logs, log)
}
s.respondJSON(w, http.StatusOK, logs)
}
// handleGetTaskSteps retrieves step timeline for a specific task
func (s *Server) handleGetTaskSteps(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
taskIDStr := chi.URLParam(r, "taskId")
taskID, err := strconv.ParseInt(taskIDStr, 10, 64)
if err != nil {
s.respondError(w, http.StatusBadRequest, "Invalid task ID")
return
}
// Verify job belongs to user (unless admin)
isAdmin := isAdminUser(r)
if !isAdmin {
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
} else {
// Admin: verify job exists
var exists bool
err = s.db.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists)
if err != nil || !exists {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
}
// Verify task belongs to job
var taskJobID int64
err = s.db.QueryRow("SELECT job_id FROM tasks WHERE id = ?", taskID).Scan(&taskJobID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Task not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err))
return
}
if taskJobID != jobID {
s.respondError(w, http.StatusBadRequest, "Task does not belong to this job")
return
}
rows, err := s.db.Query(
`SELECT id, task_id, step_name, status, started_at, completed_at, duration_ms, error_message
FROM task_steps WHERE task_id = ? ORDER BY started_at ASC`,
taskID,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query steps: %v", err))
return
}
defer rows.Close()
steps := []types.TaskStep{}
for rows.Next() {
var step types.TaskStep
var startedAt, completedAt sql.NullTime
var durationMs sql.NullInt64
var errorMessage sql.NullString
err := rows.Scan(
&step.ID, &step.TaskID, &step.StepName, &step.Status,
&startedAt, &completedAt, &durationMs, &errorMessage,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan step: %v", err))
return
}
if startedAt.Valid {
step.StartedAt = &startedAt.Time
}
if completedAt.Valid {
step.CompletedAt = &completedAt.Time
}
if durationMs.Valid {
duration := int(durationMs.Int64)
step.DurationMs = &duration
}
if errorMessage.Valid {
step.ErrorMessage = errorMessage.String
}
steps = append(steps, step)
}
s.respondJSON(w, http.StatusOK, steps)
}
// handleRetryTask retries a failed task
func (s *Server) handleRetryTask(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
s.respondError(w, http.StatusUnauthorized, err.Error())
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
taskIDStr := chi.URLParam(r, "taskId")
taskID, err := strconv.ParseInt(taskIDStr, 10, 64)
if err != nil {
s.respondError(w, http.StatusBadRequest, "Invalid task ID")
return
}
// Verify job belongs to user
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
// Verify task belongs to job and is in a retryable state
var taskJobID int64
var taskStatus string
var retryCount, maxRetries int
err = s.db.QueryRow(
"SELECT job_id, status, retry_count, max_retries FROM tasks WHERE id = ?",
taskID,
).Scan(&taskJobID, &taskStatus, &retryCount, &maxRetries)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Task not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err))
return
}
if taskJobID != jobID {
s.respondError(w, http.StatusBadRequest, "Task does not belong to this job")
return
}
if taskStatus != string(types.TaskStatusFailed) {
s.respondError(w, http.StatusBadRequest, "Task is not in failed state")
return
}
if retryCount >= maxRetries {
s.respondError(w, http.StatusBadRequest, "Maximum retries exceeded")
return
}
// Reset task to pending
_, err = s.db.Exec(
`UPDATE tasks SET status = ?, runner_id = NULL, current_step = NULL,
error_message = NULL, started_at = NULL, completed_at = NULL
WHERE id = ?`,
types.TaskStatusPending, taskID,
)
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to retry task: %v", err))
return
}
s.respondJSON(w, http.StatusOK, map[string]string{"message": "Task queued for retry"})
}
// handleStreamTaskLogsWebSocket streams task logs via WebSocket
// Note: This is called after auth middleware, so userID is already verified
func (s *Server) handleStreamTaskLogsWebSocket(w http.ResponseWriter, r *http.Request) {
userID, err := getUserID(r)
if err != nil {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
jobID, err := parseID(r, "id")
if err != nil {
s.respondError(w, http.StatusBadRequest, err.Error())
return
}
taskIDStr := chi.URLParam(r, "taskId")
taskID, err := strconv.ParseInt(taskIDStr, 10, 64)
if err != nil {
s.respondError(w, http.StatusBadRequest, "Invalid task ID")
return
}
// Verify job belongs to user
var jobUserID int64
err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err))
return
}
if jobUserID != userID {
s.respondError(w, http.StatusForbidden, "Access denied")
return
}
// Verify task belongs to job
var taskJobID int64
err = s.db.QueryRow("SELECT job_id FROM tasks WHERE id = ?", taskID).Scan(&taskJobID)
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Task not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err))
return
}
if taskJobID != jobID {
s.respondError(w, http.StatusBadRequest, "Task does not belong to this job")
return
}
// Upgrade to WebSocket
conn, err := s.wsUpgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Failed to upgrade WebSocket: %v", err)
return
}
defer conn.Close()
key := fmt.Sprintf("%d:%d", jobID, taskID)
s.frontendConnsMu.Lock()
s.frontendConns[key] = conn
s.frontendConnsMu.Unlock()
// Create a write mutex for this connection
s.frontendConnsWriteMuMu.Lock()
s.frontendConnsWriteMu[key] = &sync.Mutex{}
writeMu := s.frontendConnsWriteMu[key]
s.frontendConnsWriteMuMu.Unlock()
defer func() {
s.frontendConnsMu.Lock()
delete(s.frontendConns, key)
s.frontendConnsMu.Unlock()
s.frontendConnsWriteMuMu.Lock()
delete(s.frontendConnsWriteMu, key)
s.frontendConnsWriteMuMu.Unlock()
}()
// Send initial connection message
writeMu.Lock()
err = conn.WriteJSON(map[string]interface{}{
"type": "connected",
"timestamp": time.Now().Unix(),
})
writeMu.Unlock()
if err != nil {
log.Printf("Failed to send initial connection message: %v", err)
return
}
// Get last log ID to start streaming from
lastIDStr := r.URL.Query().Get("last_id")
lastID := int64(0)
if lastIDStr != "" {
if id, err := strconv.ParseInt(lastIDStr, 10, 64); err == nil {
lastID = id
}
}
// Send existing logs
// Order by id ASC to ensure consistent ordering and avoid race conditions
rows, err := s.db.Query(
`SELECT id, task_id, runner_id, log_level, message, step_name, created_at
FROM task_logs WHERE task_id = ? AND id > ? ORDER BY id ASC LIMIT 100`,
taskID, lastID,
)
if err == nil {
defer rows.Close()
for rows.Next() {
var log types.TaskLog
var runnerID sql.NullInt64
err := rows.Scan(
&log.ID, &log.TaskID, &runnerID, &log.LogLevel, &log.Message,
&log.StepName, &log.CreatedAt,
)
if err != nil {
continue
}
if runnerID.Valid {
log.RunnerID = &runnerID.Int64
}
// Always update lastID to the highest ID we've seen
if log.ID > lastID {
lastID = log.ID
}
// Serialize writes to prevent concurrent write panics
writeMu.Lock()
writeErr := conn.WriteJSON(map[string]interface{}{
"type": "log",
"data": log,
"timestamp": time.Now().Unix(),
})
writeMu.Unlock()
if writeErr != nil {
// Connection closed, exit the loop
return
}
}
}
// Poll for new logs and send them
// Use shorter interval for more responsive updates, but order by id for consistency
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
rows, err := s.db.Query(
`SELECT id, task_id, runner_id, log_level, message, step_name, created_at
FROM task_logs WHERE task_id = ? AND id > ? ORDER BY id ASC LIMIT 100`,
taskID, lastID,
)
if err != nil {
continue
}
for rows.Next() {
var log types.TaskLog
var runnerID sql.NullInt64
err := rows.Scan(
&log.ID, &log.TaskID, &runnerID, &log.LogLevel, &log.Message,
&log.StepName, &log.CreatedAt,
)
if err != nil {
continue
}
if runnerID.Valid {
log.RunnerID = &runnerID.Int64
}
// Always update lastID to the highest ID we've seen
if log.ID > lastID {
lastID = log.ID
}
// Serialize writes to prevent concurrent write panics
writeMu.Lock()
err = conn.WriteJSON(map[string]interface{}{
"type": "log",
"data": log,
"timestamp": time.Now().Unix(),
})
writeMu.Unlock()
if err != nil {
// Connection closed, exit the loop
return
}
}
rows.Close()
}
}
}