package api import ( "database/sql" "encoding/json" "fmt" "io" "log" "net/http" "os" "path/filepath" "strconv" "strings" "sync" "time" authpkg "jiggablend/internal/auth" "jiggablend/pkg/types" "github.com/go-chi/chi/v5" ) // isAdminUser checks if the current user is an admin func isAdminUser(r *http.Request) bool { return authpkg.IsAdmin(r.Context()) } // handleCreateJob creates a new job func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } var req types.CreateJobRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.respondError(w, http.StatusBadRequest, "Invalid request body") return } // Validate job type if req.JobType != types.JobTypeMetadata && req.JobType != types.JobTypeRender { s.respondError(w, http.StatusBadRequest, "Invalid job_type: must be 'metadata' or 'render'") return } if req.Name == "" { s.respondError(w, http.StatusBadRequest, "Job name is required") return } // Validate render job requirements if req.JobType == types.JobTypeRender { if req.FrameStart == nil || req.FrameEnd == nil { s.respondError(w, http.StatusBadRequest, "frame_start and frame_end are required for render jobs") return } if *req.FrameStart < 0 || *req.FrameEnd < *req.FrameStart { s.respondError(w, http.StatusBadRequest, "Invalid frame range") return } // Validate frame range limits (prevent abuse) const maxFrameRange = 10000 if *req.FrameEnd-*req.FrameStart+1 > maxFrameRange { s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Frame range too large. Maximum allowed: %d frames", maxFrameRange)) return } if req.OutputFormat == nil || *req.OutputFormat == "" { defaultFormat := "PNG" req.OutputFormat = &defaultFormat } } // Default allow_parallel_runners to true for render jobs if not provided var allowParallelRunners *bool if req.JobType == types.JobTypeRender { allowParallelRunners = new(bool) *allowParallelRunners = true if req.AllowParallelRunners != nil { *allowParallelRunners = *req.AllowParallelRunners } } // Set job timeout to 24 hours (86400 seconds) jobTimeout := 86400 // Build SQL query based on job type var jobID int64 if req.JobType == types.JobTypeMetadata { // Metadata jobs don't need frame range or output format err = s.db.QueryRow( `INSERT INTO jobs (user_id, job_type, name, status, progress, timeout_seconds) VALUES (?, ?, ?, ?, ?, ?) RETURNING id`, userID, req.JobType, req.Name, types.JobStatusPending, 0.0, jobTimeout, ).Scan(&jobID) } else { // Render jobs need all fields err = s.db.QueryRow( `INSERT INTO jobs (user_id, job_type, name, status, progress, frame_start, frame_end, output_format, allow_parallel_runners, timeout_seconds) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id`, userID, req.JobType, req.Name, types.JobStatusPending, 0.0, *req.FrameStart, *req.FrameEnd, *req.OutputFormat, allowParallelRunners, jobTimeout, ).Scan(&jobID) } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create job: %v", err)) return } // For render jobs, copy input files from metadata job if specified if req.JobType == types.JobTypeRender && req.MetadataJobID != nil { // Verify metadata job exists and belongs to the same user var metadataJobUserID int64 err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ? AND job_type = ?", *req.MetadataJobID, types.JobTypeMetadata).Scan(&metadataJobUserID) if err == nil && metadataJobUserID == userID { // Copy input files from metadata job to render job _, err = s.db.Exec( `INSERT INTO job_files (job_id, file_type, file_path, file_name, file_size) SELECT ?, file_type, file_path, file_name, file_size FROM job_files WHERE job_id = ? AND file_type = ?`, jobID, *req.MetadataJobID, types.JobFileTypeInput, ) if err != nil { log.Printf("Warning: Failed to copy input files from metadata job %d to render job %d: %v", *req.MetadataJobID, jobID, err) } else { log.Printf("Copied input files from metadata job %d to render job %d", *req.MetadataJobID, jobID) } } else { log.Printf("Warning: Metadata job %d not found or doesn't belong to user %d, skipping file copy", *req.MetadataJobID, userID) } } // Only create render tasks for render jobs if req.JobType == types.JobTypeRender { // Determine task timeout based on output format taskTimeout := 300 // Default: 5 minutes for frame rendering if *req.OutputFormat == "MP4" { // For MP4, we'll create frame tasks with 5 min timeout // Video generation tasks will be created later with 24h timeout taskTimeout = 300 } // Create tasks for the job // If allow_parallel_runners is false, create a single task for all frames // Otherwise, create one task per frame for parallel processing if allowParallelRunners != nil && !*allowParallelRunners { // Single task for entire frame range _, err = s.db.Exec( `INSERT INTO tasks (job_id, frame_start, frame_end, task_type, status, timeout_seconds, max_retries) VALUES (?, ?, ?, ?, ?, ?, ?)`, jobID, *req.FrameStart, *req.FrameEnd, types.TaskTypeRender, types.TaskStatusPending, taskTimeout, 3, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create task: %v", err)) return } log.Printf("Created 1 render task for job %d (frames %d-%d, single runner)", jobID, *req.FrameStart, *req.FrameEnd) } else { // One task per frame for parallel processing for frame := *req.FrameStart; frame <= *req.FrameEnd; frame++ { _, err = s.db.Exec( `INSERT INTO tasks (job_id, frame_start, frame_end, task_type, status, timeout_seconds, max_retries) VALUES (?, ?, ?, ?, ?, ?, ?)`, jobID, frame, frame, types.TaskTypeRender, types.TaskStatusPending, taskTimeout, 3, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create tasks: %v", err)) return } } log.Printf("Created %d render tasks for job %d (frames %d-%d, parallel)", *req.FrameEnd-*req.FrameStart+1, jobID, *req.FrameStart, *req.FrameEnd) } // Update job status (should be pending since tasks are pending) s.updateJobStatusFromTasks(jobID) } else { log.Printf("Created metadata extraction job %d (no render tasks)", jobID) } // Build response job object job := types.Job{ ID: jobID, UserID: userID, JobType: req.JobType, Name: req.Name, Status: types.JobStatusPending, Progress: 0.0, TimeoutSeconds: jobTimeout, CreatedAt: time.Now(), } if req.JobType == types.JobTypeRender { job.FrameStart = req.FrameStart job.FrameEnd = req.FrameEnd job.OutputFormat = req.OutputFormat job.AllowParallelRunners = allowParallelRunners } // Immediately try to distribute tasks to connected runners go s.distributeTasksToRunners() s.respondJSON(w, http.StatusCreated, job) } // handleListJobs lists jobs for the current user func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } // Filter out metadata jobs for non-admin users isAdmin := isAdminUser(r) var query string if isAdmin { query = `SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format, allow_parallel_runners, timeout_seconds, blend_metadata, created_at, started_at, completed_at, error_message FROM jobs WHERE user_id = ? ORDER BY created_at DESC` } else { query = `SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format, allow_parallel_runners, timeout_seconds, blend_metadata, created_at, started_at, completed_at, error_message FROM jobs WHERE user_id = ? AND job_type != ? ORDER BY created_at DESC` } var rows *sql.Rows if isAdmin { rows, err = s.db.Query(query, userID) } else { rows, err = s.db.Query(query, userID, types.JobTypeMetadata) } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query jobs: %v", err)) return } defer rows.Close() jobs := []types.Job{} for rows.Next() { var job types.Job var jobType string var startedAt, completedAt sql.NullTime var blendMetadataJSON sql.NullString var errorMessage sql.NullString var frameStart, frameEnd sql.NullInt64 var outputFormat sql.NullString var allowParallelRunners sql.NullBool err := rows.Scan( &job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress, &frameStart, &frameEnd, &outputFormat, &allowParallelRunners, &job.TimeoutSeconds, &blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &errorMessage, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan job: %v", err)) return } job.JobType = types.JobType(jobType) if frameStart.Valid { fs := int(frameStart.Int64) job.FrameStart = &fs } if frameEnd.Valid { fe := int(frameEnd.Int64) job.FrameEnd = &fe } if outputFormat.Valid { job.OutputFormat = &outputFormat.String } if allowParallelRunners.Valid { job.AllowParallelRunners = &allowParallelRunners.Bool } if startedAt.Valid { job.StartedAt = &startedAt.Time } if completedAt.Valid { job.CompletedAt = &completedAt.Time } if blendMetadataJSON.Valid && blendMetadataJSON.String != "" { var metadata types.BlendMetadata if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err == nil { job.BlendMetadata = &metadata } } if errorMessage.Valid { job.ErrorMessage = errorMessage.String } jobs = append(jobs, job) } s.respondJSON(w, http.StatusOK, jobs) } // handleGetJob gets a specific job func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } var job types.Job var jobType string var startedAt, completedAt sql.NullTime var blendMetadataJSON sql.NullString var errorMessage sql.NullString var frameStart, frameEnd sql.NullInt64 var outputFormat sql.NullString var allowParallelRunners sql.NullBool // Allow admins to view any job, regular users can only view their own (and not metadata jobs) isAdmin := isAdminUser(r) var err2 error if isAdmin { err2 = s.db.QueryRow( `SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format, allow_parallel_runners, timeout_seconds, blend_metadata, created_at, started_at, completed_at, error_message FROM jobs WHERE id = ?`, jobID, ).Scan( &job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress, &frameStart, &frameEnd, &outputFormat, &allowParallelRunners, &job.TimeoutSeconds, &blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &errorMessage, ) } else { err2 = s.db.QueryRow( `SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format, allow_parallel_runners, timeout_seconds, blend_metadata, created_at, started_at, completed_at, error_message FROM jobs WHERE id = ? AND user_id = ? AND job_type != ?`, jobID, userID, types.JobTypeMetadata, ).Scan( &job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress, &frameStart, &frameEnd, &outputFormat, &allowParallelRunners, &job.TimeoutSeconds, &blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &errorMessage, ) } if err2 == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err2 != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err2)) return } job.JobType = types.JobType(jobType) if frameStart.Valid { fs := int(frameStart.Int64) job.FrameStart = &fs } if frameEnd.Valid { fe := int(frameEnd.Int64) job.FrameEnd = &fe } if outputFormat.Valid { job.OutputFormat = &outputFormat.String } if allowParallelRunners.Valid { job.AllowParallelRunners = &allowParallelRunners.Bool } if startedAt.Valid { job.StartedAt = &startedAt.Time } if completedAt.Valid { job.CompletedAt = &completedAt.Time } if blendMetadataJSON.Valid && blendMetadataJSON.String != "" { var metadata types.BlendMetadata if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err == nil { job.BlendMetadata = &metadata } } if errorMessage.Valid { job.ErrorMessage = errorMessage.String } s.respondJSON(w, http.StatusOK, job) } // handleCancelJob cancels a job func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Check if this is a metadata extraction job - if so, don't cancel running metadata tasks var jobType string var jobStatus string err = s.db.QueryRow("SELECT job_type, status FROM jobs WHERE id = ? AND user_id = ?", jobID, userID).Scan(&jobType, &jobStatus) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } // Don't allow cancelling already completed or cancelled jobs if jobStatus == string(types.JobStatusCompleted) || jobStatus == string(types.JobStatusCancelled) { s.respondJSON(w, http.StatusOK, map[string]string{"message": "Job already " + jobStatus}) return } isMetadataExtractionJob := types.JobType(jobType) == types.JobTypeMetadata result, err := s.db.Exec( `UPDATE jobs SET status = ? WHERE id = ? AND user_id = ?`, types.JobStatusCancelled, jobID, userID, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to cancel job: %v", err)) return } rowsAffected, _ := result.RowsAffected() if rowsAffected == 0 { s.respondError(w, http.StatusNotFound, "Job not found") return } log.Printf("Cancelling job %d (type: %s, isMetadataExtraction: %v)", jobID, jobType, isMetadataExtractionJob) // For metadata extraction jobs, be more careful - only cancel if no metadata task is running if isMetadataExtractionJob { // Check if there's a running metadata task var runningMetadataTask int s.db.QueryRow( `SELECT COUNT(*) FROM tasks WHERE job_id = ? AND task_type = ? AND status = ?`, jobID, types.TaskTypeMetadata, types.TaskStatusRunning, ).Scan(&runningMetadataTask) if runningMetadataTask > 0 { log.Printf("Job %d has running metadata task, preserving it", jobID) // Don't cancel running metadata tasks - let them complete // Only cancel pending tasks that aren't metadata _, err = s.db.Exec( `UPDATE tasks SET status = ? WHERE job_id = ? AND status = ? AND task_type != ?`, types.TaskStatusFailed, jobID, types.TaskStatusPending, types.TaskTypeMetadata, ) } else { // No running metadata task, safe to cancel pending metadata tasks _, err = s.db.Exec( `UPDATE tasks SET status = ? WHERE job_id = ? AND status = ?`, types.TaskStatusFailed, jobID, types.TaskStatusPending, ) } } else { // For regular jobs, cancel pending tasks (but preserve running metadata tasks) _, err = s.db.Exec( `UPDATE tasks SET status = ? WHERE job_id = ? AND status = ? AND NOT (task_type = ? AND runner_id IS NOT NULL)`, types.TaskStatusFailed, jobID, types.TaskStatusPending, types.TaskTypeMetadata, ) } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to cancel tasks: %v", err)) return } s.respondJSON(w, http.StatusOK, map[string]string{"message": "Job cancelled"}) } // handleDeleteJob permanently deletes a job and all its associated data func (s *Server) handleDeleteJob(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) and check status isAdmin := isAdminUser(r) var jobUserID int64 var jobStatus string if isAdmin { err = s.db.QueryRow("SELECT user_id, status FROM jobs WHERE id = ?", jobID).Scan(&jobUserID, &jobStatus) } else { // Non-admin users can only delete their own jobs, and not metadata jobs err = s.db.QueryRow("SELECT user_id, status FROM jobs WHERE id = ? AND user_id = ? AND job_type != ?", jobID, userID, types.JobTypeMetadata).Scan(&jobUserID, &jobStatus) } if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if !isAdmin && jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } // Prevent deletion of jobs that are still cancellable (pending or running) if jobStatus == string(types.JobStatusPending) || jobStatus == string(types.JobStatusRunning) { s.respondError(w, http.StatusBadRequest, "Cannot delete a job that is pending or running. Please cancel it first.") return } // Delete in transaction to ensure consistency tx, err := s.db.Begin() if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to start transaction: %v", err)) return } defer tx.Rollback() // Delete task logs _, err = tx.Exec(`DELETE FROM task_logs WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID) if err != nil { tx.Rollback() s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete task logs: %v", err)) return } // Delete task steps _, err = tx.Exec(`DELETE FROM task_steps WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID) if err != nil { tx.Rollback() s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete task steps: %v", err)) return } // Delete tasks _, err = tx.Exec("DELETE FROM tasks WHERE job_id = ?", jobID) if err != nil { tx.Rollback() s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete tasks: %v", err)) return } // Delete job files _, err = tx.Exec("DELETE FROM job_files WHERE job_id = ?", jobID) if err != nil { tx.Rollback() s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete job files: %v", err)) return } // Delete the job _, err = tx.Exec("DELETE FROM jobs WHERE id = ?", jobID) if err != nil { tx.Rollback() s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete job: %v", err)) return } // Commit transaction if err = tx.Commit(); err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to commit transaction: %v", err)) return } // Delete physical files if err := s.storage.DeleteJobFiles(jobID); err != nil { log.Printf("Warning: Failed to delete job files for job %d: %v", jobID, err) // Don't fail the request if file deletion fails - the database records are already deleted } log.Printf("Deleted job %d (user: %d, admin: %v)", jobID, jobUserID, isAdmin) s.respondJSON(w, http.StatusOK, map[string]string{"message": "Job deleted"}) } // cleanupOldMetadataJobs periodically deletes metadata jobs older than 1 day func (s *Server) cleanupOldMetadataJobs() { // Run cleanup every hour ticker := time.NewTicker(1 * time.Hour) defer ticker.Stop() // Run once immediately on startup s.cleanupMetadataJobs() s.cleanupOldRenderJobs() for range ticker.C { s.cleanupMetadataJobs() s.cleanupOldRenderJobs() } } // cleanupMetadataJobs finds and deletes metadata jobs older than 1 day func (s *Server) cleanupMetadataJobs() { defer func() { if r := recover(); r != nil { log.Printf("Panic in cleanupMetadataJobs: %v", r) } }() // Find metadata jobs older than 1 day rows, err := s.db.Query( `SELECT id FROM jobs WHERE job_type = ? AND created_at < CURRENT_TIMESTAMP - INTERVAL '1 day'`, types.JobTypeMetadata, ) if err != nil { log.Printf("Failed to query old metadata jobs: %v", err) return } defer rows.Close() var jobIDs []int64 for rows.Next() { var jobID int64 if err := rows.Scan(&jobID); err == nil { jobIDs = append(jobIDs, jobID) } } rows.Close() if len(jobIDs) == 0 { return } log.Printf("Cleaning up %d old metadata jobs", len(jobIDs)) // Delete each job for _, jobID := range jobIDs { // Delete in transaction to ensure consistency tx, err := s.db.Begin() if err != nil { log.Printf("Failed to start transaction for job %d: %v", jobID, err) continue } // Delete task logs _, err = tx.Exec(`DELETE FROM task_logs WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID) if err != nil { tx.Rollback() log.Printf("Failed to delete task logs for job %d: %v", jobID, err) continue } // Delete task steps _, err = tx.Exec(`DELETE FROM task_steps WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID) if err != nil { tx.Rollback() log.Printf("Failed to delete task steps for job %d: %v", jobID, err) continue } // Delete tasks _, err = tx.Exec("DELETE FROM tasks WHERE job_id = ?", jobID) if err != nil { tx.Rollback() log.Printf("Failed to delete tasks for job %d: %v", jobID, err) continue } // Delete job files _, err = tx.Exec("DELETE FROM job_files WHERE job_id = ?", jobID) if err != nil { tx.Rollback() log.Printf("Failed to delete job files for job %d: %v", jobID, err) continue } // Delete the job _, err = tx.Exec("DELETE FROM jobs WHERE id = ?", jobID) if err != nil { tx.Rollback() log.Printf("Failed to delete job %d: %v", jobID, err) continue } // Commit transaction if err = tx.Commit(); err != nil { log.Printf("Failed to commit transaction for job %d: %v", jobID, err) continue } // Delete physical files (best effort, don't fail if this errors) if err := s.storage.DeleteJobFiles(jobID); err != nil { log.Printf("Warning: Failed to delete files for metadata job %d: %v", jobID, err) } } log.Printf("Cleaned up %d old metadata jobs", len(jobIDs)) } // cleanupOldRenderJobs finds and deletes render jobs older than 1 month that are completed, failed, or cancelled func (s *Server) cleanupOldRenderJobs() { defer func() { if r := recover(); r != nil { log.Printf("Panic in cleanupOldRenderJobs: %v", r) } }() // Find render jobs older than 1 month that are in a final state (completed, failed, or cancelled) // Don't delete running or pending jobs rows, err := s.db.Query( `SELECT id FROM jobs WHERE job_type = ? AND status IN (?, ?, ?) AND created_at < CURRENT_TIMESTAMP - INTERVAL '1 month'`, types.JobTypeRender, types.JobStatusCompleted, types.JobStatusFailed, types.JobStatusCancelled, ) if err != nil { log.Printf("Failed to query old render jobs: %v", err) return } defer rows.Close() var jobIDs []int64 for rows.Next() { var jobID int64 if err := rows.Scan(&jobID); err == nil { jobIDs = append(jobIDs, jobID) } } rows.Close() if len(jobIDs) == 0 { return } log.Printf("Cleaning up %d old render jobs", len(jobIDs)) // Delete each job for _, jobID := range jobIDs { // Delete in transaction to ensure consistency tx, err := s.db.Begin() if err != nil { log.Printf("Failed to start transaction for job %d: %v", jobID, err) continue } // Delete task logs _, err = tx.Exec(`DELETE FROM task_logs WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID) if err != nil { tx.Rollback() log.Printf("Failed to delete task logs for job %d: %v", jobID, err) continue } // Delete task steps _, err = tx.Exec(`DELETE FROM task_steps WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID) if err != nil { tx.Rollback() log.Printf("Failed to delete task steps for job %d: %v", jobID, err) continue } // Delete tasks _, err = tx.Exec("DELETE FROM tasks WHERE job_id = ?", jobID) if err != nil { tx.Rollback() log.Printf("Failed to delete tasks for job %d: %v", jobID, err) continue } // Delete job files _, err = tx.Exec("DELETE FROM job_files WHERE job_id = ?", jobID) if err != nil { tx.Rollback() log.Printf("Failed to delete job files for job %d: %v", jobID, err) continue } // Delete the job _, err = tx.Exec("DELETE FROM jobs WHERE id = ?", jobID) if err != nil { tx.Rollback() log.Printf("Failed to delete job %d: %v", jobID, err) continue } // Commit transaction if err = tx.Commit(); err != nil { log.Printf("Failed to commit transaction for job %d: %v", jobID, err) continue } // Delete physical files (best effort, don't fail if this errors) if err := s.storage.DeleteJobFiles(jobID); err != nil { log.Printf("Warning: Failed to delete files for render job %d: %v", jobID, err) } } log.Printf("Cleaned up %d old render jobs", len(jobIDs)) } // handleUploadJobFile handles file upload for a job func (s *Server) handleUploadJobFile(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user var jobUserID int64 err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } // Parse multipart form with large limit for big files // Note: For very large files, this will use temporary files on disk err = r.ParseMultipartForm(20 << 30) // 20 GB (for large ZIP files and blend files) if err != nil { log.Printf("Error parsing multipart form for job %d: %v", jobID, err) s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Failed to parse form: %v", err)) return } file, header, err := r.FormFile("file") if err != nil { log.Printf("Error getting file from form for job %d: %v", jobID, err) s.respondError(w, http.StatusBadRequest, fmt.Sprintf("No file provided: %v", err)) return } defer file.Close() log.Printf("Uploading file '%s' (size: %d bytes) for job %d", header.Filename, header.Size, jobID) jobPath := s.storage.JobPath(jobID) if err := os.MkdirAll(jobPath, 0755); err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create job directory: %v", err)) return } var fileID int64 var mainBlendFile string var extractedFiles []string // Check if this is a ZIP file if strings.HasSuffix(strings.ToLower(header.Filename), ".zip") { log.Printf("Processing ZIP file '%s' for job %d", header.Filename, jobID) // Extract ZIP file zipPath := filepath.Join(jobPath, header.Filename) log.Printf("Creating ZIP file at: %s", zipPath) zipFile, err := os.Create(zipPath) if err != nil { log.Printf("ERROR: Failed to create ZIP file for job %d: %v", jobID, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create ZIP file: %v", err)) return } log.Printf("Copying %d bytes to ZIP file for job %d...", header.Size, jobID) copied, err := io.Copy(zipFile, file) zipFile.Close() if err != nil { log.Printf("ERROR: Failed to save ZIP file for job %d (copied %d bytes): %v", jobID, copied, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to save ZIP file: %v", err)) return } log.Printf("Successfully copied %d bytes to ZIP file for job %d", copied, jobID) // Record ZIP file in database zipInfo, err := os.Stat(zipPath) if err != nil { log.Printf("ERROR: Failed to stat ZIP file for job %d: %v", jobID, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to stat ZIP file: %v", err)) return } log.Printf("Recording ZIP file in database for job %d (size: %d bytes)", jobID, zipInfo.Size()) err = s.db.QueryRow( `INSERT INTO job_files (job_id, file_type, file_path, file_name, file_size) VALUES (?, ?, ?, ?, ?) RETURNING id`, jobID, types.JobFileTypeInput, zipPath, header.Filename, zipInfo.Size(), ).Scan(&fileID) if err != nil { log.Printf("ERROR: Failed to record ZIP file in database for job %d: %v", jobID, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to record ZIP file: %v", err)) return } log.Printf("ZIP file recorded in database with ID %d for job %d", fileID, jobID) // Extract ZIP file log.Printf("Extracting ZIP file for job %d...", jobID) extractedFiles, err = s.storage.ExtractZip(zipPath, jobPath) if err != nil { log.Printf("ERROR: Failed to extract ZIP file for job %d: %v", jobID, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to extract ZIP file: %v", err)) return } log.Printf("Successfully extracted %d files from ZIP for job %d", len(extractedFiles), jobID) // Find main blend file (check for user selection first, then auto-detect) mainBlendParam := r.FormValue("main_blend_file") if mainBlendParam != "" { // User specified main blend file mainBlendFile = filepath.Join(jobPath, mainBlendParam) if _, err := os.Stat(mainBlendFile); err != nil { s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Specified main blend file not found: %s", mainBlendParam)) return } } else { // Auto-detect: find blend files in root directory blendFiles := []string{} err := filepath.Walk(jobPath, func(path string, info os.FileInfo, err error) error { if err != nil { return err } // Only check files in root directory (not subdirectories) relPath, _ := filepath.Rel(jobPath, path) if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".blend") { // Check if it's in root (no path separators) if !strings.Contains(relPath, string(filepath.Separator)) { blendFiles = append(blendFiles, path) } } return nil }) if err == nil && len(blendFiles) == 1 { // Only one blend file in root - use it mainBlendFile = blendFiles[0] } else if len(blendFiles) > 1 { // Multiple blend files - need user to specify // Return list of blend files for user to choose blendFileNames := []string{} for _, f := range blendFiles { rel, _ := filepath.Rel(jobPath, f) blendFileNames = append(blendFileNames, rel) } s.respondJSON(w, http.StatusOK, map[string]interface{}{ "zip_extracted": true, "blend_files": blendFileNames, "message": "Multiple blend files found. Please specify the main blend file.", }) return } } // Record extracted files in database for _, extractedFile := range extractedFiles { relPath, _ := filepath.Rel(jobPath, extractedFile) info, err := os.Stat(extractedFile) if err == nil && !info.IsDir() { _, _ = s.db.Exec( `INSERT INTO job_files (job_id, file_type, file_path, file_name, file_size) VALUES (?, ?, ?, ?, ?)`, jobID, types.JobFileTypeInput, extractedFile, relPath, info.Size(), ) } } } else { // Regular file upload (not ZIP) filePath, err := s.storage.SaveUpload(jobID, header.Filename, file) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to save file: %v", err)) return } // Record in database err = s.db.QueryRow( `INSERT INTO job_files (job_id, file_type, file_path, file_name, file_size) VALUES (?, ?, ?, ?, ?) RETURNING id`, jobID, types.JobFileTypeInput, filePath, header.Filename, header.Size, ).Scan(&fileID) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to record file: %v", err)) return } mainBlendFile = filePath } // If we have a main blend file (from ZIP extraction or direct upload), create metadata extraction task // But ONLY for metadata extraction jobs (temporary jobs created during the initial two-step submission flow) // Never create metadata tasks for regular render jobs, even if they receive blend files later blendFileToCheck := mainBlendFile if blendFileToCheck == "" && strings.HasSuffix(strings.ToLower(header.Filename), ".blend") { // Direct blend file upload (not from ZIP) blendFileToCheck = s.storage.JobPath(jobID) blendFileToCheck = filepath.Join(blendFileToCheck, header.Filename) } if blendFileToCheck != "" && strings.HasSuffix(strings.ToLower(blendFileToCheck), ".blend") { // Check if this is a metadata extraction job var jobType string var hasRenderTasks int err = s.db.QueryRow( `SELECT j.job_type, COUNT(t.id) FROM jobs j LEFT JOIN tasks t ON j.id = t.job_id AND t.task_type = 'render' WHERE j.id = ? GROUP BY j.id, j.job_type`, jobID, ).Scan(&jobType, &hasRenderTasks) // Only create metadata extraction task if: // 1. Job type is "metadata" (temporary job from initial submission) // 2. Job has no render tasks (not a regular render job) if err == nil && types.JobType(jobType) == types.JobTypeMetadata && hasRenderTasks == 0 { // Check if metadata task already exists to avoid duplicates var existingMetadataTask int s.db.QueryRow( `SELECT COUNT(*) FROM tasks WHERE job_id = ? AND task_type = 'metadata'`, jobID, ).Scan(&existingMetadataTask) if existingMetadataTask == 0 { // Create metadata extraction task metadataTaskTimeout := 300 // 5 minutes for metadata extraction var metadataTaskID int64 err = s.db.QueryRow( `INSERT INTO tasks (job_id, frame_start, frame_end, task_type, status, timeout_seconds, max_retries) VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id`, jobID, 0, 0, types.TaskTypeMetadata, types.TaskStatusPending, metadataTaskTimeout, 1, ).Scan(&metadataTaskID) if err != nil { log.Printf("Failed to create metadata extraction task: %v", err) } else { log.Printf("Created metadata extraction task %d for job %d (initial submission)", metadataTaskID, jobID) // Log task creation to task logs s.logTaskEvent(metadataTaskID, nil, types.LogLevelInfo, "Metadata extraction task created", "") // Try to distribute the task immediately (with small delay to ensure transaction is committed) go func() { time.Sleep(100 * time.Millisecond) // Small delay to ensure transaction is committed s.distributeTasksToRunners() }() } } else { log.Printf("Skipping metadata extraction task creation for job %d (metadata task already exists)", jobID) } } else { log.Printf("Skipping metadata extraction task creation for job %d (not an initial metadata extraction job)", jobID) } } response := map[string]interface{}{ "id": fileID, "file_name": header.Filename, "file_size": header.Size, } if strings.HasSuffix(strings.ToLower(header.Filename), ".zip") { response["zip_extracted"] = true response["extracted_files_count"] = len(extractedFiles) if mainBlendFile != "" { relPath, _ := filepath.Rel(s.storage.JobPath(jobID), mainBlendFile) response["main_blend_file"] = relPath } } else { response["file_path"] = s.storage.JobPath(jobID) response["file_path"] = filepath.Join(response["file_path"].(string), header.Filename) } s.respondJSON(w, http.StatusCreated, response) } // handleListJobFiles lists files for a job func (s *Server) handleListJobFiles(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } else { // Admin: verify job exists var exists bool err = s.db.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) if err != nil || !exists { s.respondError(w, http.StatusNotFound, "Job not found") return } } rows, err := s.db.Query( `SELECT id, job_id, file_type, file_path, file_name, file_size, created_at FROM job_files WHERE job_id = ? ORDER BY created_at DESC`, jobID, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query files: %v", err)) return } defer rows.Close() files := []types.JobFile{} for rows.Next() { var file types.JobFile err := rows.Scan( &file.ID, &file.JobID, &file.FileType, &file.FilePath, &file.FileName, &file.FileSize, &file.CreatedAt, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan file: %v", err)) return } files = append(files, file) } s.respondJSON(w, http.StatusOK, files) } // handleDownloadJobFile downloads a job file func (s *Server) handleDownloadJobFile(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } fileID, err := parseID(r, "fileId") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } else { // Admin: verify job exists var exists bool err = s.db.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) if err != nil || !exists { s.respondError(w, http.StatusNotFound, "Job not found") return } } // Get file info var filePath, fileName string err = s.db.QueryRow( `SELECT file_path, file_name FROM job_files WHERE id = ? AND job_id = ?`, fileID, jobID, ).Scan(&filePath, &fileName) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "File not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query file: %v", err)) return } // Open file file, err := s.storage.GetFile(filePath) if err != nil { s.respondError(w, http.StatusNotFound, "File not found on disk") return } defer file.Close() // Determine content type based on file extension contentType := "application/octet-stream" disposition := "attachment" fileNameLower := strings.ToLower(fileName) switch { case strings.HasSuffix(fileNameLower, ".png"): contentType = "image/png" disposition = "inline" case strings.HasSuffix(fileNameLower, ".jpg") || strings.HasSuffix(fileNameLower, ".jpeg"): contentType = "image/jpeg" disposition = "inline" case strings.HasSuffix(fileNameLower, ".gif"): contentType = "image/gif" disposition = "inline" case strings.HasSuffix(fileNameLower, ".webp"): contentType = "image/webp" disposition = "inline" case strings.HasSuffix(fileNameLower, ".bmp"): contentType = "image/bmp" disposition = "inline" case strings.HasSuffix(fileNameLower, ".svg"): contentType = "image/svg+xml" disposition = "inline" } // Set headers w.Header().Set("Content-Disposition", fmt.Sprintf("%s; filename=%s", disposition, fileName)) w.Header().Set("Content-Type", contentType) // Stream file io.Copy(w, file) } // handleStreamVideo streams MP4 video file with range support func (s *Server) handleStreamVideo(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) var jobUserID int64 var outputFormat string if isAdmin { err = s.db.QueryRow("SELECT user_id, output_format FROM jobs WHERE id = ?", jobID).Scan(&jobUserID, &outputFormat) } else { err = s.db.QueryRow("SELECT user_id, output_format FROM jobs WHERE id = ? AND user_id = ?", jobID, userID).Scan(&jobUserID, &outputFormat) } if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err)) return } if !isAdmin && jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } // Find MP4 file var filePath, fileName string err = s.db.QueryRow( `SELECT file_path, file_name FROM job_files WHERE job_id = ? AND file_type = ? AND file_name LIKE '%.mp4' ORDER BY created_at DESC LIMIT 1`, jobID, types.JobFileTypeOutput, ).Scan(&filePath, &fileName) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Video file not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query file: %v", err)) return } // Open file file, err := s.storage.GetFile(filePath) if err != nil { s.respondError(w, http.StatusNotFound, "File not found on disk") return } defer file.Close() // Get file info fileInfo, err := file.Stat() if err != nil { s.respondError(w, http.StatusInternalServerError, "Failed to get file info") return } fileSize := fileInfo.Size() // Handle range requests for video seeking rangeHeader := r.Header.Get("Range") if rangeHeader != "" { // Parse range header var start, end int64 fmt.Sscanf(rangeHeader, "bytes=%d-%d", &start, &end) if end == 0 { end = fileSize - 1 } // Seek to start position file.Seek(start, 0) // Set headers for partial content w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, fileSize)) w.Header().Set("Accept-Ranges", "bytes") w.Header().Set("Content-Length", fmt.Sprintf("%d", end-start+1)) w.Header().Set("Content-Type", "video/mp4") w.WriteHeader(http.StatusPartialContent) // Copy partial content io.CopyN(w, file, end-start+1) } else { // Full file w.Header().Set("Content-Type", "video/mp4") w.Header().Set("Content-Length", fmt.Sprintf("%d", fileSize)) w.Header().Set("Accept-Ranges", "bytes") io.Copy(w, file) } } // handleListJobTasks lists all tasks for a job func (s *Server) handleListJobTasks(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } else { // Admin: verify job exists var exists bool err = s.db.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) if err != nil || !exists { s.respondError(w, http.StatusNotFound, "Job not found") return } } rows, err := s.db.Query( `SELECT id, job_id, runner_id, frame_start, frame_end, status, task_type, current_step, retry_count, max_retries, output_path, created_at, started_at, completed_at, error_message, timeout_seconds FROM tasks WHERE job_id = ? ORDER BY frame_start ASC`, jobID, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query tasks: %v", err)) return } defer rows.Close() tasks := []types.Task{} for rows.Next() { var task types.Task var runnerID sql.NullInt64 var startedAt, completedAt sql.NullTime var timeoutSeconds sql.NullInt64 var errorMessage sql.NullString var currentStep sql.NullString var outputPath sql.NullString err := rows.Scan( &task.ID, &task.JobID, &runnerID, &task.FrameStart, &task.FrameEnd, &task.Status, &task.TaskType, ¤tStep, &task.RetryCount, &task.MaxRetries, &outputPath, &task.CreatedAt, &startedAt, &completedAt, &errorMessage, &timeoutSeconds, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan task: %v", err)) return } if runnerID.Valid { task.RunnerID = &runnerID.Int64 } if startedAt.Valid { task.StartedAt = &startedAt.Time } if completedAt.Valid { task.CompletedAt = &completedAt.Time } if timeoutSeconds.Valid { timeout := int(timeoutSeconds.Int64) task.TimeoutSeconds = &timeout } if errorMessage.Valid { task.ErrorMessage = errorMessage.String } if currentStep.Valid { task.CurrentStep = currentStep.String } if outputPath.Valid { task.OutputPath = outputPath.String } tasks = append(tasks, task) } s.respondJSON(w, http.StatusOK, tasks) } // handleGetTaskLogs retrieves logs for a specific task func (s *Server) handleGetTaskLogs(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } taskIDStr := chi.URLParam(r, "taskId") taskID, err := strconv.ParseInt(taskIDStr, 10, 64) if err != nil { s.respondError(w, http.StatusBadRequest, "Invalid task ID") return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } else { // Admin: verify job exists var exists bool err = s.db.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) if err != nil || !exists { s.respondError(w, http.StatusNotFound, "Job not found") return } } // Verify task belongs to job var taskJobID int64 err = s.db.QueryRow("SELECT job_id FROM tasks WHERE id = ?", taskID).Scan(&taskJobID) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Task not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err)) return } if taskJobID != jobID { s.respondError(w, http.StatusBadRequest, "Task does not belong to this job") return } // Get query parameters for filtering stepName := r.URL.Query().Get("step_name") logLevel := r.URL.Query().Get("log_level") limitStr := r.URL.Query().Get("limit") limit := 1000 // default if limitStr != "" { if l, err := strconv.Atoi(limitStr); err == nil && l > 0 { limit = l } } // Build query query := `SELECT id, task_id, runner_id, log_level, message, step_name, created_at FROM task_logs WHERE task_id = ?` args := []interface{}{taskID} if stepName != "" { query += " AND step_name = ?" args = append(args, stepName) } if logLevel != "" { query += " AND log_level = ?" args = append(args, logLevel) } query += " ORDER BY created_at ASC LIMIT ?" args = append(args, limit) rows, err := s.db.Query(query, args...) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query logs: %v", err)) return } defer rows.Close() logs := []types.TaskLog{} for rows.Next() { var log types.TaskLog var runnerID sql.NullInt64 err := rows.Scan( &log.ID, &log.TaskID, &runnerID, &log.LogLevel, &log.Message, &log.StepName, &log.CreatedAt, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan log: %v", err)) return } if runnerID.Valid { log.RunnerID = &runnerID.Int64 } logs = append(logs, log) } s.respondJSON(w, http.StatusOK, logs) } // handleGetTaskSteps retrieves step timeline for a specific task func (s *Server) handleGetTaskSteps(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } taskIDStr := chi.URLParam(r, "taskId") taskID, err := strconv.ParseInt(taskIDStr, 10, 64) if err != nil { s.respondError(w, http.StatusBadRequest, "Invalid task ID") return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } else { // Admin: verify job exists var exists bool err = s.db.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) if err != nil || !exists { s.respondError(w, http.StatusNotFound, "Job not found") return } } // Verify task belongs to job var taskJobID int64 err = s.db.QueryRow("SELECT job_id FROM tasks WHERE id = ?", taskID).Scan(&taskJobID) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Task not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err)) return } if taskJobID != jobID { s.respondError(w, http.StatusBadRequest, "Task does not belong to this job") return } rows, err := s.db.Query( `SELECT id, task_id, step_name, status, started_at, completed_at, duration_ms, error_message FROM task_steps WHERE task_id = ? ORDER BY started_at ASC`, taskID, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query steps: %v", err)) return } defer rows.Close() steps := []types.TaskStep{} for rows.Next() { var step types.TaskStep var startedAt, completedAt sql.NullTime var durationMs sql.NullInt64 var errorMessage sql.NullString err := rows.Scan( &step.ID, &step.TaskID, &step.StepName, &step.Status, &startedAt, &completedAt, &durationMs, &errorMessage, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan step: %v", err)) return } if startedAt.Valid { step.StartedAt = &startedAt.Time } if completedAt.Valid { step.CompletedAt = &completedAt.Time } if durationMs.Valid { duration := int(durationMs.Int64) step.DurationMs = &duration } if errorMessage.Valid { step.ErrorMessage = errorMessage.String } steps = append(steps, step) } s.respondJSON(w, http.StatusOK, steps) } // handleRetryTask retries a failed task func (s *Server) handleRetryTask(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } taskIDStr := chi.URLParam(r, "taskId") taskID, err := strconv.ParseInt(taskIDStr, 10, 64) if err != nil { s.respondError(w, http.StatusBadRequest, "Invalid task ID") return } // Verify job belongs to user var jobUserID int64 err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } // Verify task belongs to job and is in a retryable state var taskJobID int64 var taskStatus string var retryCount, maxRetries int err = s.db.QueryRow( "SELECT job_id, status, retry_count, max_retries FROM tasks WHERE id = ?", taskID, ).Scan(&taskJobID, &taskStatus, &retryCount, &maxRetries) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Task not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err)) return } if taskJobID != jobID { s.respondError(w, http.StatusBadRequest, "Task does not belong to this job") return } if taskStatus != string(types.TaskStatusFailed) { s.respondError(w, http.StatusBadRequest, "Task is not in failed state") return } if retryCount >= maxRetries { s.respondError(w, http.StatusBadRequest, "Maximum retries exceeded") return } // Reset task to pending _, err = s.db.Exec( `UPDATE tasks SET status = ?, runner_id = NULL, current_step = NULL, error_message = NULL, started_at = NULL, completed_at = NULL WHERE id = ?`, types.TaskStatusPending, taskID, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to retry task: %v", err)) return } s.respondJSON(w, http.StatusOK, map[string]string{"message": "Task queued for retry"}) } // handleStreamTaskLogsWebSocket streams task logs via WebSocket // Note: This is called after auth middleware, so userID is already verified func (s *Server) handleStreamTaskLogsWebSocket(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } taskIDStr := chi.URLParam(r, "taskId") taskID, err := strconv.ParseInt(taskIDStr, 10, 64) if err != nil { s.respondError(w, http.StatusBadRequest, "Invalid task ID") return } // Verify job belongs to user var jobUserID int64 err = s.db.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } // Verify task belongs to job var taskJobID int64 err = s.db.QueryRow("SELECT job_id FROM tasks WHERE id = ?", taskID).Scan(&taskJobID) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Task not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err)) return } if taskJobID != jobID { s.respondError(w, http.StatusBadRequest, "Task does not belong to this job") return } // Upgrade to WebSocket conn, err := s.wsUpgrader.Upgrade(w, r, nil) if err != nil { log.Printf("Failed to upgrade WebSocket: %v", err) return } defer conn.Close() key := fmt.Sprintf("%d:%d", jobID, taskID) s.frontendConnsMu.Lock() s.frontendConns[key] = conn s.frontendConnsMu.Unlock() // Create a write mutex for this connection s.frontendConnsWriteMuMu.Lock() s.frontendConnsWriteMu[key] = &sync.Mutex{} writeMu := s.frontendConnsWriteMu[key] s.frontendConnsWriteMuMu.Unlock() defer func() { s.frontendConnsMu.Lock() delete(s.frontendConns, key) s.frontendConnsMu.Unlock() s.frontendConnsWriteMuMu.Lock() delete(s.frontendConnsWriteMu, key) s.frontendConnsWriteMuMu.Unlock() }() // Send initial connection message writeMu.Lock() err = conn.WriteJSON(map[string]interface{}{ "type": "connected", "timestamp": time.Now().Unix(), }) writeMu.Unlock() if err != nil { log.Printf("Failed to send initial connection message: %v", err) return } // Get last log ID to start streaming from lastIDStr := r.URL.Query().Get("last_id") lastID := int64(0) if lastIDStr != "" { if id, err := strconv.ParseInt(lastIDStr, 10, 64); err == nil { lastID = id } } // Send existing logs // Order by id ASC to ensure consistent ordering and avoid race conditions rows, err := s.db.Query( `SELECT id, task_id, runner_id, log_level, message, step_name, created_at FROM task_logs WHERE task_id = ? AND id > ? ORDER BY id ASC LIMIT 100`, taskID, lastID, ) if err == nil { defer rows.Close() for rows.Next() { var log types.TaskLog var runnerID sql.NullInt64 err := rows.Scan( &log.ID, &log.TaskID, &runnerID, &log.LogLevel, &log.Message, &log.StepName, &log.CreatedAt, ) if err != nil { continue } if runnerID.Valid { log.RunnerID = &runnerID.Int64 } // Always update lastID to the highest ID we've seen if log.ID > lastID { lastID = log.ID } // Serialize writes to prevent concurrent write panics writeMu.Lock() writeErr := conn.WriteJSON(map[string]interface{}{ "type": "log", "data": log, "timestamp": time.Now().Unix(), }) writeMu.Unlock() if writeErr != nil { // Connection closed, exit the loop return } } } // Poll for new logs and send them // Use shorter interval for more responsive updates, but order by id for consistency ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() ctx := r.Context() for { select { case <-ctx.Done(): return case <-ticker.C: rows, err := s.db.Query( `SELECT id, task_id, runner_id, log_level, message, step_name, created_at FROM task_logs WHERE task_id = ? AND id > ? ORDER BY id ASC LIMIT 100`, taskID, lastID, ) if err != nil { continue } for rows.Next() { var log types.TaskLog var runnerID sql.NullInt64 err := rows.Scan( &log.ID, &log.TaskID, &runnerID, &log.LogLevel, &log.Message, &log.StepName, &log.CreatedAt, ) if err != nil { continue } if runnerID.Valid { log.RunnerID = &runnerID.Int64 } // Always update lastID to the highest ID we've seen if log.ID > lastID { lastID = log.ID } // Serialize writes to prevent concurrent write panics writeMu.Lock() err = conn.WriteJSON(map[string]interface{}{ "type": "log", "data": log, "timestamp": time.Now().Unix(), }) writeMu.Unlock() if err != nil { // Connection closed, exit the loop return } } rows.Close() } } }