package api import ( "context" "database/sql" "encoding/json" "fmt" "io" "log" "net/http" "net/url" "path/filepath" "strconv" "strings" "sync" "time" "jiggablend/internal/auth" "jiggablend/pkg/types" "github.com/go-chi/chi/v5" "github.com/gorilla/websocket" ) type contextKey string const runnerIDContextKey contextKey = "runner_id" // runnerAuthMiddleware verifies runner requests using API key func (s *Manager) runnerAuthMiddleware(next http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { // Get API key from header apiKey := r.Header.Get("Authorization") if apiKey == "" { // Try alternative header apiKey = r.Header.Get("X-API-Key") } if apiKey == "" { s.respondError(w, http.StatusUnauthorized, "API key required") return } // Remove "Bearer " prefix if present apiKey = strings.TrimPrefix(apiKey, "Bearer ") // Validate API key and get its ID apiKeyID, _, err := s.secrets.ValidateRunnerAPIKey(apiKey) if err != nil { log.Printf("API key validation failed: %v", err) s.respondError(w, http.StatusUnauthorized, "invalid API key") return } // Get runner ID from query string or find runner by API key runnerIDStr := r.URL.Query().Get("runner_id") var runnerID int64 if runnerIDStr != "" { // Runner ID provided - verify it belongs to this API key _, err := fmt.Sscanf(runnerIDStr, "%d", &runnerID) if err != nil { s.respondError(w, http.StatusBadRequest, "invalid runner_id") return } // For fixed API keys, skip database verification if apiKeyID != -1 { // Verify runner exists and uses this API key var dbAPIKeyID sql.NullInt64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT api_key_id FROM runners WHERE id = ?", runnerID).Scan(&dbAPIKeyID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "runner not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to query runner API key: %v", err)) return } if !dbAPIKeyID.Valid || dbAPIKeyID.Int64 != apiKeyID { s.respondError(w, http.StatusForbidden, "runner does not belong to this API key") return } } } else { // No runner ID provided - find the runner for this API key // For simplicity, assume each API key has one runner err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT id FROM runners WHERE api_key_id = ?", apiKeyID).Scan(&runnerID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "no runner found for this API key") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to query runner by API key: %v", err)) return } } // Add runner ID to context ctx := r.Context() ctx = context.WithValue(ctx, runnerIDContextKey, runnerID) next(w, r.WithContext(ctx)) } } // handleRegisterRunner registers a new runner using an API key func (s *Manager) handleRegisterRunner(w http.ResponseWriter, r *http.Request) { var req struct { types.RegisterRunnerRequest APIKey string `json:"api_key"` Fingerprint string `json:"fingerprint,omitempty"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } // Lock to prevent concurrent registrations that could create duplicate runners s.secrets.RegistrationMu.Lock() defer s.secrets.RegistrationMu.Unlock() // Validate runner name if req.Name == "" { s.respondError(w, http.StatusBadRequest, "Runner name is required") return } if len(req.Name) > 255 { s.respondError(w, http.StatusBadRequest, "Runner name must be 255 characters or less") return } // Validate hostname if req.Hostname != "" { // Basic hostname validation (allow IP addresses and domain names) if len(req.Hostname) > 253 { s.respondError(w, http.StatusBadRequest, "Hostname must be 253 characters or less") return } } // Validate capabilities JSON if provided if req.Capabilities != "" { var testCapabilities map[string]interface{} if err := json.Unmarshal([]byte(req.Capabilities), &testCapabilities); err != nil { s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid capabilities JSON: %v", err)) return } } if req.APIKey == "" { s.respondError(w, http.StatusBadRequest, "API key is required") return } // Validate API key apiKeyID, apiKeyScope, err := s.secrets.ValidateRunnerAPIKey(req.APIKey) if err != nil { s.respondError(w, http.StatusUnauthorized, fmt.Sprintf("Invalid API key: %v", err)) return } // For fixed API keys (keyID = -1), skip fingerprint checking // Set default priority if not provided priority := 100 if req.Priority != nil { priority = *req.Priority } // Register runner var runnerID int64 // For fixed API keys, don't store api_key_id in database var dbAPIKeyID interface{} if apiKeyID == -1 { dbAPIKeyID = nil // NULL for fixed API keys } else { dbAPIKeyID = apiKeyID } // Determine fingerprint value fingerprint := req.Fingerprint if apiKeyID == -1 || fingerprint == "" { // For fixed API keys or when no fingerprint provided, generate a unique fingerprint // to avoid conflicts while still maintaining some uniqueness fingerprint = fmt.Sprintf("fixed-%s-%d", req.Name, time.Now().UnixNano()) } // Check fingerprint uniqueness only for non-fixed API keys if apiKeyID != -1 && req.Fingerprint != "" { var existingRunnerID int64 var existingAPIKeyID sql.NullInt64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow( "SELECT id, api_key_id FROM runners WHERE fingerprint = ?", req.Fingerprint, ).Scan(&existingRunnerID, &existingAPIKeyID) }) if err == nil { // Runner already exists with this fingerprint if existingAPIKeyID.Valid && existingAPIKeyID.Int64 == apiKeyID { // Same API key - update and return existing runner log.Printf("Runner with fingerprint %s already exists (ID: %d), updating info", req.Fingerprint, existingRunnerID) err = s.db.With(func(conn *sql.DB) error { _, err := conn.Exec( `UPDATE runners SET name = ?, hostname = ?, capabilities = ?, status = ?, last_heartbeat = ? WHERE id = ?`, req.Name, req.Hostname, req.Capabilities, types.RunnerStatusOnline, time.Now(), existingRunnerID, ) return err }) if err != nil { log.Printf("Warning: Failed to update existing runner info: %v", err) } s.respondJSON(w, http.StatusOK, map[string]interface{}{ "id": existingRunnerID, "name": req.Name, "hostname": req.Hostname, "status": types.RunnerStatusOnline, "reused": true, // Indicates this was a re-registration }) return } else { // Different API key - reject registration s.respondError(w, http.StatusConflict, "Runner with this fingerprint already registered with different API key") return } } // If err is not nil, it means no existing runner with this fingerprint - proceed with new registration } // Insert runner err = s.db.With(func(conn *sql.DB) error { result, err := conn.Exec( `INSERT INTO runners (name, hostname, ip_address, status, last_heartbeat, capabilities, api_key_id, api_key_scope, priority, fingerprint) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, req.Name, req.Hostname, "", types.RunnerStatusOnline, time.Now(), req.Capabilities, dbAPIKeyID, apiKeyScope, priority, fingerprint, ) if err != nil { return err } runnerID, err = result.LastInsertId() return err }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to register runner: %v", err)) return } log.Printf("Registered new runner %s (ID: %d) with API key ID: %d", req.Name, runnerID, apiKeyID) // Return runner info s.respondJSON(w, http.StatusCreated, map[string]interface{}{ "id": runnerID, "name": req.Name, "hostname": req.Hostname, "status": types.RunnerStatusOnline, }) } // NextJobResponse is the response for the next-job endpoint type NextJobResponse struct { JobToken string `json:"job_token"` JobPath string `json:"job_path"` Task NextJobTaskInfo `json:"task"` } // NextJobTaskInfo contains task information for the next-job response type NextJobTaskInfo struct { TaskID int64 `json:"task_id"` JobID int64 `json:"job_id"` JobName string `json:"job_name"` Frame int `json:"frame"` TaskType string `json:"task_type"` Metadata *types.BlendMetadata `json:"metadata,omitempty"` } // handleNextJob handles the polling endpoint for runners to get their next job // GET /api/runner/workers/:id/next-job func (s *Manager) handleNextJob(w http.ResponseWriter, r *http.Request) { // Get runner ID from URL path runnerIDStr := chi.URLParam(r, "id") if runnerIDStr == "" { s.respondError(w, http.StatusBadRequest, "runner ID required") return } var runnerID int64 if _, err := fmt.Sscanf(runnerIDStr, "%d", &runnerID); err != nil { s.respondError(w, http.StatusBadRequest, "invalid runner ID") return } // Get API key from header apiKey := r.Header.Get("Authorization") apiKey = strings.TrimPrefix(apiKey, "Bearer ") if apiKey == "" { s.respondError(w, http.StatusUnauthorized, "API key required") return } // Validate API key apiKeyID, apiKeyScope, err := s.secrets.ValidateRunnerAPIKey(apiKey) if err != nil { s.respondError(w, http.StatusUnauthorized, fmt.Sprintf("Invalid API key: %v", err)) return } // Verify runner exists and belongs to this API key var dbAPIKeyID sql.NullInt64 var runnerCapabilitiesJSON sql.NullString err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT api_key_id, capabilities FROM runners WHERE id = ?", runnerID).Scan(&dbAPIKeyID, &runnerCapabilitiesJSON) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "runner not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to query runner: %v", err)) return } // For non-fixed API keys, verify ownership if apiKeyID != -1 { if !dbAPIKeyID.Valid || dbAPIKeyID.Int64 != apiKeyID { s.respondError(w, http.StatusForbidden, "runner does not belong to this API key") return } } // Update runner heartbeat s.db.With(func(conn *sql.DB) error { _, _ = conn.Exec( `UPDATE runners SET last_heartbeat = ?, status = ? WHERE id = ?`, time.Now(), types.RunnerStatusOnline, runnerID, ) return nil }) // Parse runner capabilities var runnerCapabilities map[string]interface{} if runnerCapabilitiesJSON.Valid && runnerCapabilitiesJSON.String != "" { if err := json.Unmarshal([]byte(runnerCapabilitiesJSON.String), &runnerCapabilities); err != nil { runnerCapabilities = make(map[string]interface{}) } } else { runnerCapabilities = make(map[string]interface{}) } // Check if runner already has an active task var activeTaskCount int err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow( `SELECT COUNT(*) FROM tasks WHERE runner_id = ? AND status IN (?, ?)`, runnerID, types.TaskStatusPending, types.TaskStatusRunning, ).Scan(&activeTaskCount) }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to check active tasks: %v", err)) return } if activeTaskCount > 0 { // Runner is busy, return 204 w.WriteHeader(http.StatusNoContent) return } // Find next pending task for this runner // Query pending tasks ordered by created_at (oldest first) type taskCandidate struct { TaskID int64 JobID int64 Frame int TaskType string JobName string JobUserID int64 BlendMetadata sql.NullString } var candidates []taskCandidate err = s.db.With(func(conn *sql.DB) error { rows, err := conn.Query( `SELECT t.id, t.job_id, t.frame, t.task_type, j.name as job_name, j.user_id, j.blend_metadata, t.condition FROM tasks t JOIN jobs j ON t.job_id = j.id WHERE t.status = ? AND j.status != ? ORDER BY t.created_at ASC LIMIT 50`, types.TaskStatusPending, types.JobStatusCancelled, ) if err != nil { return err } defer rows.Close() for rows.Next() { var task taskCandidate var condition sql.NullString err := rows.Scan(&task.TaskID, &task.JobID, &task.Frame, &task.TaskType, &task.JobName, &task.JobUserID, &task.BlendMetadata, &condition) if err != nil { continue } // Check if task condition is met before adding to candidates conditionStr := "" if condition.Valid { conditionStr = condition.String } if !s.evaluateTaskCondition(task.TaskID, task.JobID, conditionStr) { continue // Skip tasks whose conditions are not met } candidates = append(candidates, task) } return rows.Err() }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to query tasks: %v", err)) return } // Find a suitable task from candidates var selectedTask *taskCandidate for i := range candidates { task := &candidates[i] // Check runner scope if apiKeyScope == "user" && task.JobUserID != 0 { // User-scoped runner - check if they can work on this job var apiKeyCreatedBy int64 if apiKeyID != -1 { s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT created_by FROM runner_api_keys WHERE id = ?", apiKeyID).Scan(&apiKeyCreatedBy) }) if apiKeyCreatedBy != task.JobUserID { continue // Skip this task } } } // Check required capability (only for ffmpeg - blender is assumed installed) if task.TaskType == string(types.TaskTypeEncode) { hasFFmpeg := false if reqVal, ok := runnerCapabilities["ffmpeg"]; ok { if reqBool, ok := reqVal.(bool); ok { hasFFmpeg = reqBool } else if reqFloat, ok := reqVal.(float64); ok { hasFFmpeg = reqFloat > 0 } } if !hasFFmpeg { continue // Runner doesn't have ffmpeg capability } } // Found a suitable task selectedTask = task break } if selectedTask == nil { // No task available w.WriteHeader(http.StatusNoContent) return } // Atomically assign task to runner now := time.Now() var rowsAffected int64 err = s.db.WithTx(func(tx *sql.Tx) error { result, err := tx.Exec( `UPDATE tasks SET runner_id = ?, status = ?, started_at = ? WHERE id = ? AND runner_id IS NULL AND status = ?`, runnerID, types.TaskStatusRunning, now, selectedTask.TaskID, types.TaskStatusPending, ) if err != nil { return err } rowsAffected, err = result.RowsAffected() if err != nil { return err } // Also update job's assigned_runner_id to track current worker // For parallel jobs, this will be updated each time a new runner picks up a task _, err = tx.Exec( `UPDATE jobs SET assigned_runner_id = ? WHERE id = ?`, runnerID, selectedTask.JobID, ) return err }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to assign task: %v", err)) return } if rowsAffected == 0 { // Task was already assigned by another runner, return 204 to retry w.WriteHeader(http.StatusNoContent) return } // Generate job token jobToken, err := auth.GenerateJobToken(selectedTask.JobID, runnerID, selectedTask.TaskID) if err != nil { // Rollback task assignment and job runner assignment s.db.With(func(conn *sql.DB) error { _, _ = conn.Exec( `UPDATE tasks SET runner_id = NULL, status = ?, started_at = NULL WHERE id = ?`, types.TaskStatusPending, selectedTask.TaskID, ) _, _ = conn.Exec( `UPDATE jobs SET assigned_runner_id = NULL WHERE id = ?`, selectedTask.JobID, // Fixed: was selectedTask.TaskID ) return nil }) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to generate job token: %v", err)) return } // Parse metadata var metadata *types.BlendMetadata if selectedTask.BlendMetadata.Valid && selectedTask.BlendMetadata.String != "" { metadata = &types.BlendMetadata{} if err := json.Unmarshal([]byte(selectedTask.BlendMetadata.String), metadata); err != nil { metadata = nil } } // Log task assignment log.Printf("Assigned task %d (type: %s, job: %d) to runner %d via polling", selectedTask.TaskID, selectedTask.TaskType, selectedTask.JobID, runnerID) s.logTaskEvent(selectedTask.TaskID, nil, types.LogLevelInfo, fmt.Sprintf("Task assigned to runner %d", runnerID), "") // Broadcast task update to frontend s.broadcastTaskUpdate(selectedTask.JobID, selectedTask.TaskID, "task_update", map[string]interface{}{ "status": types.TaskStatusRunning, "runner_id": runnerID, "started_at": now, }) // Update job status s.updateJobStatusFromTasks(selectedTask.JobID) // Build response response := NextJobResponse{ JobToken: jobToken, JobPath: fmt.Sprintf("/api/runner/jobs/%d", selectedTask.JobID), Task: NextJobTaskInfo{ TaskID: selectedTask.TaskID, JobID: selectedTask.JobID, JobName: selectedTask.JobName, Frame: selectedTask.Frame, TaskType: selectedTask.TaskType, Metadata: metadata, }, } s.respondJSON(w, http.StatusOK, response) } // handleUpdateTaskProgress updates task progress func (s *Manager) handleUpdateTaskProgress(w http.ResponseWriter, r *http.Request) { _, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } var req struct { Progress float64 `json:"progress"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } // This is mainly for logging/debugging, actual progress is calculated from completed tasks s.respondJSON(w, http.StatusOK, map[string]string{"message": "Progress updated"}) } // handleUpdateTaskStep handles step start/complete events from runners func (s *Manager) handleUpdateTaskStep(w http.ResponseWriter, r *http.Request) { // Get runner ID from context (set by runnerAuthMiddleware) runnerID, ok := r.Context().Value(runnerIDContextKey).(int64) if !ok { s.respondError(w, http.StatusUnauthorized, "runner_id not found in context") return } taskID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } var req struct { StepName string `json:"step_name"` Status string `json:"status"` // "pending", "running", "completed", "failed", "skipped" DurationMs *int `json:"duration_ms,omitempty"` ErrorMessage string `json:"error_message,omitempty"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } // Verify task belongs to runner var taskRunnerID sql.NullInt64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT runner_id FROM tasks WHERE id = ?", taskID).Scan(&taskRunnerID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Task not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err)) return } if !taskRunnerID.Valid || taskRunnerID.Int64 != runnerID { s.respondError(w, http.StatusForbidden, "Task does not belong to this runner") return } now := time.Now() var stepID int64 // Check if step already exists var existingStepID sql.NullInt64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow( `SELECT id FROM task_steps WHERE task_id = ? AND step_name = ?`, taskID, req.StepName, ).Scan(&existingStepID) }) if err == sql.ErrNoRows || !existingStepID.Valid { // Create new step var startedAt *time.Time var completedAt *time.Time if req.Status == string(types.StepStatusRunning) || req.Status == string(types.StepStatusCompleted) || req.Status == string(types.StepStatusFailed) { startedAt = &now } if req.Status == string(types.StepStatusCompleted) || req.Status == string(types.StepStatusFailed) { completedAt = &now } err = s.db.With(func(conn *sql.DB) error { result, err := conn.Exec( `INSERT INTO task_steps (task_id, step_name, status, started_at, completed_at, duration_ms, error_message) VALUES (?, ?, ?, ?, ?, ?, ?)`, taskID, req.StepName, req.Status, startedAt, completedAt, req.DurationMs, req.ErrorMessage, ) if err != nil { return err } stepID, err = result.LastInsertId() return err }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create step: %v", err)) return } } else { // Update existing step stepID = existingStepID.Int64 var startedAt *time.Time var completedAt *time.Time // Get existing started_at if status is running/completed/failed if req.Status == string(types.StepStatusRunning) || req.Status == string(types.StepStatusCompleted) || req.Status == string(types.StepStatusFailed) { var existingStartedAt sql.NullTime s.db.With(func(conn *sql.DB) error { return conn.QueryRow(`SELECT started_at FROM task_steps WHERE id = ?`, stepID).Scan(&existingStartedAt) }) if existingStartedAt.Valid { startedAt = &existingStartedAt.Time } else { startedAt = &now } } if req.Status == string(types.StepStatusCompleted) || req.Status == string(types.StepStatusFailed) { completedAt = &now } err = s.db.With(func(conn *sql.DB) error { _, err := conn.Exec( `UPDATE task_steps SET status = ?, started_at = ?, completed_at = ?, duration_ms = ?, error_message = ? WHERE id = ?`, req.Status, startedAt, completedAt, req.DurationMs, req.ErrorMessage, stepID, ) return err }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to update step: %v", err)) return } } // Get job ID for broadcasting var jobID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT job_id FROM tasks WHERE id = ?", taskID).Scan(&jobID) }) if err == nil { // Broadcast step update to frontend s.broadcastTaskUpdate(jobID, taskID, "step_update", map[string]interface{}{ "step_id": stepID, "step_name": req.StepName, "status": req.Status, "duration_ms": req.DurationMs, "error_message": req.ErrorMessage, }) } s.respondJSON(w, http.StatusOK, map[string]interface{}{ "step_id": stepID, "message": "Step updated successfully", }) } // handleDownloadJobContext allows runners to download the job context tar // DEPRECATED: Use handleDownloadJobContextWithToken for new polling-based flow func (s *Manager) handleDownloadJobContext(w http.ResponseWriter, r *http.Request) { jobID, err := parseID(r, "jobId") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Construct the context file path contextPath := filepath.Join(s.storage.JobPath(jobID), "context.tar") // Check if context file exists if !s.storage.FileExists(contextPath) { log.Printf("Context archive not found for job %d", jobID) s.respondError(w, http.StatusNotFound, "Context archive not found. The file may not have been uploaded successfully.") return } // Open and serve file file, err := s.storage.GetFile(contextPath) if err != nil { s.respondError(w, http.StatusNotFound, "Context file not found on disk") return } defer file.Close() // Set appropriate headers for tar file w.Header().Set("Content-Type", "application/x-tar") w.Header().Set("Content-Disposition", "attachment; filename=context.tar") // Stream the file to the response io.Copy(w, file) } // handleDownloadJobContextWithToken allows runners to download job context using job_token // GET /api/runner/jobs/:jobId/context.tar func (s *Manager) handleDownloadJobContextWithToken(w http.ResponseWriter, r *http.Request) { jobID, err := parseID(r, "jobId") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Get job token from Authorization header jobToken := r.Header.Get("Authorization") jobToken = strings.TrimPrefix(jobToken, "Bearer ") if jobToken == "" { s.respondError(w, http.StatusUnauthorized, "job token required") return } // Validate job token claims, err := auth.ValidateJobToken(jobToken) if err != nil { s.respondError(w, http.StatusUnauthorized, fmt.Sprintf("invalid job token: %v", err)) return } // Verify job ID matches if claims.JobID != jobID { s.respondError(w, http.StatusForbidden, "job ID mismatch") return } // Construct the context file path contextPath := filepath.Join(s.storage.JobPath(jobID), "context.tar") // Check if context file exists if !s.storage.FileExists(contextPath) { log.Printf("Context archive not found for job %d", jobID) s.respondError(w, http.StatusNotFound, "Context archive not found. The file may not have been uploaded successfully.") return } // Open and serve file file, err := s.storage.GetFile(contextPath) if err != nil { s.respondError(w, http.StatusNotFound, "Context file not found on disk") return } defer file.Close() // Set appropriate headers for tar file w.Header().Set("Content-Type", "application/x-tar") w.Header().Set("Content-Disposition", "attachment; filename=context.tar") // Stream the file to the response io.Copy(w, file) } // handleUploadFileFromRunner allows runners to upload output files // DEPRECATED: Use handleUploadFileWithToken for new polling-based flow func (s *Manager) handleUploadFileFromRunner(w http.ResponseWriter, r *http.Request) { jobID, err := parseID(r, "jobId") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } err = r.ParseMultipartForm(MaxUploadSize) // 50 GB (for large output files) if err != nil { s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Failed to parse multipart form: %v", err)) return } file, header, err := r.FormFile("file") if err != nil { s.respondError(w, http.StatusBadRequest, "No file provided") return } defer file.Close() // Save file filePath, err := s.storage.SaveOutput(jobID, header.Filename, file) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to save file: %v", err)) return } // Record in database - check for existing file first to avoid duplicates var fileID int64 err = s.db.With(func(conn *sql.DB) error { // Check if file with same name already exists var existingID int64 err := conn.QueryRow( `SELECT id FROM job_files WHERE job_id = ? AND file_type = ? AND file_name = ?`, jobID, types.JobFileTypeOutput, header.Filename, ).Scan(&existingID) switch err { case nil: // File exists - update it instead of creating duplicate log.Printf("File %s already exists for job %d (ID: %d), updating record", header.Filename, jobID, existingID) _, err = conn.Exec( `UPDATE job_files SET file_path = ?, file_size = ? WHERE id = ?`, filePath, header.Size, existingID, ) if err != nil { return err } fileID = existingID return nil case sql.ErrNoRows: // File doesn't exist - insert new record result, err := conn.Exec( `INSERT INTO job_files (job_id, file_type, file_path, file_name, file_size) VALUES (?, ?, ?, ?, ?)`, jobID, types.JobFileTypeOutput, filePath, header.Filename, header.Size, ) if err != nil { return err } fileID, err = result.LastInsertId() return err default: return err } }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to record file: %v", err)) return } // Broadcast file addition s.broadcastJobUpdate(jobID, "file_added", map[string]interface{}{ "file_id": fileID, "file_type": types.JobFileTypeOutput, "file_name": header.Filename, "file_size": header.Size, }) s.respondJSON(w, http.StatusCreated, map[string]interface{}{ "file_path": filePath, "file_name": header.Filename, }) } // handleUploadFileWithToken allows runners to upload output files using job_token // POST /api/runner/jobs/:jobId/upload func (s *Manager) handleUploadFileWithToken(w http.ResponseWriter, r *http.Request) { jobID, err := parseID(r, "jobId") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Get job token from Authorization header jobToken := r.Header.Get("Authorization") jobToken = strings.TrimPrefix(jobToken, "Bearer ") if jobToken == "" { s.respondError(w, http.StatusUnauthorized, "job token required") return } // Validate job token claims, err := auth.ValidateJobToken(jobToken) if err != nil { s.respondError(w, http.StatusUnauthorized, fmt.Sprintf("invalid job token: %v", err)) return } // Verify job ID matches if claims.JobID != jobID { s.respondError(w, http.StatusForbidden, "job ID mismatch") return } err = r.ParseMultipartForm(MaxUploadSize) // 50 GB (for large output files) if err != nil { s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Failed to parse multipart form: %v", err)) return } file, header, err := r.FormFile("file") if err != nil { s.respondError(w, http.StatusBadRequest, "No file provided") return } defer file.Close() // Save file filePath, err := s.storage.SaveOutput(jobID, header.Filename, file) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to save file: %v", err)) return } // Record in database var fileID int64 err = s.db.With(func(conn *sql.DB) error { result, err := conn.Exec( `INSERT INTO job_files (job_id, file_type, file_path, file_name, file_size) VALUES (?, ?, ?, ?, ?)`, jobID, types.JobFileTypeOutput, filePath, header.Filename, header.Size, ) if err != nil { return err } fileID, err = result.LastInsertId() return err }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to record file: %v", err)) return } // Broadcast file addition s.broadcastJobUpdate(jobID, "file_added", map[string]interface{}{ "file_id": fileID, "file_type": types.JobFileTypeOutput, "file_name": header.Filename, "file_size": header.Size, }) log.Printf("Runner uploaded file %s for job %d (task %d)", header.Filename, jobID, claims.TaskID) s.respondJSON(w, http.StatusCreated, map[string]interface{}{ "file_id": fileID, "file_path": filePath, "file_name": header.Filename, }) } // handleGetJobStatusForRunner allows runners to check job status func (s *Manager) handleGetJobStatusForRunner(w http.ResponseWriter, r *http.Request) { jobID, err := parseID(r, "jobId") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } var job types.Job var startedAt, completedAt sql.NullTime var errorMessage sql.NullString var jobType string var frameStart, frameEnd sql.NullInt64 var outputFormat sql.NullString err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow( `SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format, created_at, started_at, completed_at, error_message FROM jobs WHERE id = ?`, jobID, ).Scan( &job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress, &frameStart, &frameEnd, &outputFormat, &job.CreatedAt, &startedAt, &completedAt, &errorMessage, ) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err)) return } job.JobType = types.JobType(jobType) if frameStart.Valid { fs := int(frameStart.Int64) job.FrameStart = &fs } if frameEnd.Valid { fe := int(frameEnd.Int64) job.FrameEnd = &fe } if outputFormat.Valid { job.OutputFormat = &outputFormat.String } if startedAt.Valid { job.StartedAt = &startedAt.Time } if completedAt.Valid { job.CompletedAt = &completedAt.Time } if errorMessage.Valid { job.ErrorMessage = errorMessage.String } s.respondJSON(w, http.StatusOK, job) } // handleGetJobFilesForRunner allows runners to get job files func (s *Manager) handleGetJobFilesForRunner(w http.ResponseWriter, r *http.Request) { jobID, err := parseID(r, "jobId") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } runnerID := r.URL.Query().Get("runner_id") log.Printf("GetJobFiles request for job %d from runner %s", jobID, runnerID) var rows *sql.Rows var fileCount int err = s.db.With(func(conn *sql.DB) error { var err error rows, err = conn.Query( `SELECT id, job_id, file_type, file_path, file_name, file_size, created_at FROM job_files WHERE job_id = ? ORDER BY file_name`, jobID, ) if err != nil { return err } // Count files var count int err = conn.QueryRow(`SELECT COUNT(*) FROM job_files WHERE job_id = ?`, jobID).Scan(&count) if err == nil { fileCount = count } return nil }) if err != nil { log.Printf("GetJobFiles query error for job %d: %v", jobID, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query files: %v", err)) return } defer rows.Close() files := []types.JobFile{} for rows.Next() { var file types.JobFile err := rows.Scan( &file.ID, &file.JobID, &file.FileType, &file.FilePath, &file.FileName, &file.FileSize, &file.CreatedAt, ) if err != nil { log.Printf("GetJobFiles scan error for job %d: %v", jobID, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan file: %v", err)) return } files = append(files, file) log.Printf("GetJobFiles: returning file %s (type: %s, size: %d) for job %d", file.FileName, file.FileType, file.FileSize, jobID) } log.Printf("GetJobFiles returning %d files for job %d (total in DB: %d)", len(files), jobID, fileCount) s.respondJSON(w, http.StatusOK, files) } // handleGetJobMetadataForRunner allows runners to get job metadata func (s *Manager) handleGetJobMetadataForRunner(w http.ResponseWriter, r *http.Request) { jobID, err := parseID(r, "jobId") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } var blendMetadataJSON sql.NullString err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow( `SELECT blend_metadata FROM jobs WHERE id = ?`, jobID, ).Scan(&blendMetadataJSON) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err)) return } if !blendMetadataJSON.Valid || blendMetadataJSON.String == "" { s.respondJSON(w, http.StatusOK, nil) return } var metadata types.BlendMetadata if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to parse metadata JSON: %v", err)) return } s.respondJSON(w, http.StatusOK, metadata) } // handleDownloadFileForRunner allows runners to download a file by fileName func (s *Manager) handleDownloadFileForRunner(w http.ResponseWriter, r *http.Request) { jobID, err := parseID(r, "jobId") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Get fileName from URL path (may need URL decoding) fileName := chi.URLParam(r, "fileName") if fileName == "" { s.respondError(w, http.StatusBadRequest, "fileName is required") return } // URL decode the fileName in case it contains encoded characters decodedFileName, err := url.QueryUnescape(fileName) if err != nil { // If decoding fails, use original fileName decodedFileName = fileName } // Get file info from database var filePath string err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow( `SELECT file_path FROM job_files WHERE job_id = ? AND file_name = ?`, jobID, decodedFileName, ).Scan(&filePath) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "File not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query file: %v", err)) return } // Open file file, err := s.storage.GetFile(filePath) if err != nil { s.respondError(w, http.StatusNotFound, "File not found on disk") return } defer file.Close() // Determine content type based on file extension contentType := "application/octet-stream" fileNameLower := strings.ToLower(decodedFileName) switch { case strings.HasSuffix(fileNameLower, ".png"): contentType = "image/png" case strings.HasSuffix(fileNameLower, ".jpg") || strings.HasSuffix(fileNameLower, ".jpeg"): contentType = "image/jpeg" case strings.HasSuffix(fileNameLower, ".gif"): contentType = "image/gif" case strings.HasSuffix(fileNameLower, ".webp"): contentType = "image/webp" case strings.HasSuffix(fileNameLower, ".exr") || strings.HasSuffix(fileNameLower, ".EXR"): contentType = "image/x-exr" case strings.HasSuffix(fileNameLower, ".mp4"): contentType = "video/mp4" case strings.HasSuffix(fileNameLower, ".webm"): contentType = "video/webm" } // Set headers w.Header().Set("Content-Type", contentType) w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", decodedFileName)) // Stream file io.Copy(w, file) } // WebSocket message types type WSMessage struct { Type string `json:"type"` Data json.RawMessage `json:"data"` Timestamp int64 `json:"timestamp"` } type WSTaskAssignment struct { TaskID int64 `json:"task_id"` JobID int64 `json:"job_id"` JobName string `json:"job_name"` OutputFormat string `json:"output_format"` Frame int `json:"frame"` TaskType string `json:"task_type"` InputFiles []string `json:"input_files"` } type WSLogEntry struct { TaskID int64 `json:"task_id"` LogLevel string `json:"log_level"` Message string `json:"message"` StepName string `json:"step_name,omitempty"` } type WSTaskUpdate struct { TaskID int64 `json:"task_id"` Status string `json:"status"` OutputPath string `json:"output_path,omitempty"` Success bool `json:"success"` Error string `json:"error,omitempty"` } // handleRunnerJobWebSocket handles per-job WebSocket connections from runners // WS /api/runner/jobs/:job_id/ws func (s *Manager) handleRunnerJobWebSocket(w http.ResponseWriter, r *http.Request) { // Get job ID from URL path jobIDStr := chi.URLParam(r, "jobId") if jobIDStr == "" { s.respondError(w, http.StatusBadRequest, "job ID required") return } var jobID int64 if _, err := fmt.Sscanf(jobIDStr, "%d", &jobID); err != nil { s.respondError(w, http.StatusBadRequest, "invalid job ID") return } // Upgrade to WebSocket conn, err := s.wsUpgrader.Upgrade(w, r, nil) if err != nil { log.Printf("Failed to upgrade job WebSocket: %v", err) return } defer conn.Close() // First message must be auth conn.SetReadDeadline(time.Now().Add(WSPingInterval)) var authMsg struct { Type string `json:"type"` JobToken string `json:"job_token"` } if err := conn.ReadJSON(&authMsg); err != nil { log.Printf("Job WebSocket auth read error: %v", err) conn.WriteJSON(map[string]string{"type": "error", "message": "failed to read auth message"}) return } if authMsg.Type != "auth" { conn.WriteJSON(map[string]string{"type": "error", "message": "first message must be auth"}) return } // Validate job token claims, err := auth.ValidateJobToken(authMsg.JobToken) if err != nil { log.Printf("Job WebSocket invalid token: %v", err) conn.WriteJSON(map[string]string{"type": "error", "message": fmt.Sprintf("invalid job token: %v", err)}) return } // Verify job ID matches if claims.JobID != jobID { conn.WriteJSON(map[string]string{"type": "error", "message": "job ID mismatch"}) return } runnerID := claims.RunnerID taskID := claims.TaskID // Verify task is still assigned to this runner var taskRunnerID sql.NullInt64 var taskStatus string err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT runner_id, status FROM tasks WHERE id = ?", taskID).Scan(&taskRunnerID, &taskStatus) }) if err != nil { conn.WriteJSON(map[string]string{"type": "error", "message": "task not found"}) return } if !taskRunnerID.Valid || taskRunnerID.Int64 != runnerID { conn.WriteJSON(map[string]string{"type": "error", "message": "task not assigned to this runner"}) return } // Send auth_ok if err := conn.WriteJSON(map[string]string{"type": "auth_ok"}); err != nil { log.Printf("Failed to send auth_ok: %v", err) return } log.Printf("Job WebSocket authenticated: job=%d, runner=%d, task=%d", jobID, runnerID, taskID) // Track this connection for the task connKey := fmt.Sprintf("job-%d-task-%d", jobID, taskID) var writeMu sync.Mutex // Store connection for potential server->runner messages s.runnerJobConnsMu.Lock() s.runnerJobConns[connKey] = conn s.runnerJobConnsWriteMu[connKey] = &writeMu s.runnerJobConnsMu.Unlock() // Cleanup on disconnect defer func() { s.runnerJobConnsMu.Lock() delete(s.runnerJobConns, connKey) delete(s.runnerJobConnsWriteMu, connKey) s.runnerJobConnsMu.Unlock() // Check if task is still running - if so, mark as failed var currentStatus string s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT status FROM tasks WHERE id = ?", taskID).Scan(¤tStatus) }) if currentStatus == string(types.TaskStatusRunning) { log.Printf("Job WebSocket disconnected unexpectedly for task %d, marking as failed", taskID) s.db.With(func(conn *sql.DB) error { _, err := conn.Exec( `UPDATE tasks SET status = ?, error_message = ?, completed_at = ? WHERE id = ?`, types.TaskStatusFailed, "WebSocket connection lost", time.Now(), taskID, ) return err }) s.broadcastTaskUpdate(jobID, taskID, "task_update", map[string]interface{}{ "status": types.TaskStatusFailed, "error_message": "WebSocket connection lost", }) s.updateJobStatusFromTasks(jobID) } log.Printf("Job WebSocket closed: job=%d, runner=%d, task=%d", jobID, runnerID, taskID) }() // Set up ping/pong keepalive conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(WSReadDeadline)) return nil }) // Send pings periodically go func() { ticker := time.NewTicker(WSPingInterval) defer ticker.Stop() for range ticker.C { s.runnerJobConnsMu.RLock() currentConn, exists := s.runnerJobConns[connKey] mu, hasMu := s.runnerJobConnsWriteMu[connKey] s.runnerJobConnsMu.RUnlock() if !exists || currentConn != conn || !hasMu { return } mu.Lock() err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WSWriteDeadline)) mu.Unlock() if err != nil { return } } }() // Handle incoming messages for { conn.SetReadDeadline(time.Now().Add(WSReadDeadline)) var msg WSMessage err := conn.ReadJSON(&msg) if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { log.Printf("Job WebSocket error for task %d: %v", taskID, err) } break } switch msg.Type { case "log_entry": var logEntry WSLogEntry if err := json.Unmarshal(msg.Data, &logEntry); err == nil { // Verify task ID matches if logEntry.TaskID == taskID { s.handleWebSocketLog(runnerID, logEntry) } } case "progress": var progress struct { TaskID int64 `json:"task_id"` Progress float64 `json:"progress"` } if err := json.Unmarshal(msg.Data, &progress); err == nil { if progress.TaskID == taskID { // Broadcast progress update s.broadcastTaskUpdate(jobID, taskID, "progress", map[string]interface{}{ "progress": progress.Progress, }) } } case "output_uploaded": var output struct { TaskID int64 `json:"task_id"` FileName string `json:"file_name"` } if err := json.Unmarshal(msg.Data, &output); err == nil { if output.TaskID == taskID { log.Printf("Task %d uploaded output: %s", taskID, output.FileName) // Broadcast file upload notification s.broadcastJobUpdate(jobID, "file_uploaded", map[string]interface{}{ "task_id": taskID, "file_name": output.FileName, }) } } case "task_complete": var taskUpdate WSTaskUpdate if err := json.Unmarshal(msg.Data, &taskUpdate); err == nil { if taskUpdate.TaskID == taskID { s.handleWebSocketTaskComplete(runnerID, taskUpdate) // Task is done, close connection return } } case "runner_heartbeat": // Lookup runner ID from job's assigned_runner_id var assignedRunnerID sql.NullInt64 err := s.db.With(func(db *sql.DB) error { return db.QueryRow( "SELECT assigned_runner_id FROM jobs WHERE id = ?", jobID, ).Scan(&assignedRunnerID) }) if err != nil { log.Printf("Failed to lookup runner for job %d heartbeat: %v", jobID, err) // Send error response response := map[string]interface{}{ "type": "error", "message": "Failed to process heartbeat", } s.sendWebSocketMessage(conn, response) continue } if !assignedRunnerID.Valid { log.Printf("Job %d has no assigned runner, skipping heartbeat update", jobID) // Send acknowledgment but no database update response := map[string]interface{}{ "type": "heartbeat_ack", "timestamp": time.Now().Unix(), "message": "No assigned runner for this job", } s.sendWebSocketMessage(conn, response) continue } runnerID := assignedRunnerID.Int64 // Update runner heartbeat err = s.db.With(func(db *sql.DB) error { _, err := db.Exec( "UPDATE runners SET last_heartbeat = ?, status = ? WHERE id = ?", time.Now(), types.RunnerStatusOnline, runnerID, ) return err }) if err != nil { log.Printf("Failed to update runner %d heartbeat for job %d: %v", runnerID, jobID, err) // Send error response response := map[string]interface{}{ "type": "error", "message": "Failed to update heartbeat", } s.sendWebSocketMessage(conn, response) continue } // Send acknowledgment response := map[string]interface{}{ "type": "heartbeat_ack", "timestamp": time.Now().Unix(), } s.sendWebSocketMessage(conn, response) continue } } } // handleWebSocketLog handles log entries from WebSocket func (s *Manager) handleWebSocketLog(runnerID int64, logEntry WSLogEntry) { // Store log in database err := s.db.With(func(conn *sql.DB) error { _, err := conn.Exec( `INSERT INTO task_logs (task_id, runner_id, log_level, message, step_name, created_at) VALUES (?, ?, ?, ?, ?, ?)`, logEntry.TaskID, runnerID, logEntry.LogLevel, logEntry.Message, logEntry.StepName, time.Now(), ) return err }) if err != nil { log.Printf("Failed to store log: %v", err) return } // Broadcast to frontend clients s.broadcastLogToFrontend(logEntry.TaskID, logEntry) // If this log contains a frame number (Fra:), update progress for single-runner render jobs if strings.Contains(logEntry.Message, "Fra:") { // Get job ID from task var jobID int64 err := s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT job_id FROM tasks WHERE id = ?", logEntry.TaskID).Scan(&jobID) }) if err == nil { // Throttle progress updates (max once per 2 seconds per job) s.progressUpdateTimesMu.RLock() lastUpdate, exists := s.progressUpdateTimes[jobID] s.progressUpdateTimesMu.RUnlock() shouldUpdate := !exists || time.Since(lastUpdate) >= ProgressUpdateThrottle if shouldUpdate { s.progressUpdateTimesMu.Lock() s.progressUpdateTimes[jobID] = time.Now() s.progressUpdateTimesMu.Unlock() // Update progress in background to avoid blocking log processing go s.updateJobStatusFromTasks(jobID) } } } } // handleWebSocketTaskUpdate handles task status updates from WebSocket func (s *Manager) handleWebSocketTaskUpdate(runnerID int64, taskUpdate WSTaskUpdate) { // This can be used for progress updates // For now, we'll just log it log.Printf("Task %d update from runner %d: %s", taskUpdate.TaskID, runnerID, taskUpdate.Status) } // handleWebSocketTaskComplete handles task completion from WebSocket func (s *Manager) handleWebSocketTaskComplete(runnerID int64, taskUpdate WSTaskUpdate) { // Verify task belongs to runner and get task info var taskRunnerID sql.NullInt64 var jobID int64 var retryCount, maxRetries int err := s.db.With(func(conn *sql.DB) error { return conn.QueryRow( "SELECT runner_id, job_id, retry_count, max_retries FROM tasks WHERE id = ?", taskUpdate.TaskID, ).Scan(&taskRunnerID, &jobID, &retryCount, &maxRetries) }) if err != nil { log.Printf("Failed to get task %d info: %v", taskUpdate.TaskID, err) return } if !taskRunnerID.Valid || taskRunnerID.Int64 != runnerID { log.Printf("Task %d does not belong to runner %d", taskUpdate.TaskID, runnerID) return } now := time.Now() // Handle successful completion if taskUpdate.Success { err = s.db.WithTx(func(tx *sql.Tx) error { _, err := tx.Exec(`UPDATE tasks SET status = ? WHERE id = ?`, types.TaskStatusCompleted, taskUpdate.TaskID) if err != nil { return err } if taskUpdate.OutputPath != "" { _, err = tx.Exec(`UPDATE tasks SET output_path = ? WHERE id = ?`, taskUpdate.OutputPath, taskUpdate.TaskID) if err != nil { return err } } _, err = tx.Exec(`UPDATE tasks SET completed_at = ? WHERE id = ?`, now, taskUpdate.TaskID) return err }) if err != nil { log.Printf("Failed to update task %d: %v", taskUpdate.TaskID, err) return } // Broadcast task update s.broadcastTaskUpdate(jobID, taskUpdate.TaskID, "task_update", map[string]interface{}{ "status": types.TaskStatusCompleted, "output_path": taskUpdate.OutputPath, "completed_at": now, }) s.updateJobStatusFromTasks(jobID) return } // Handle task failure - this is an actual task failure (e.g., Blender crash) // Check if we have retries remaining if retryCount < maxRetries { // Reset to pending for retry - increment retry_count err = s.db.WithTx(func(tx *sql.Tx) error { _, err := tx.Exec( `UPDATE tasks SET status = ?, runner_id = NULL, current_step = NULL, retry_count = retry_count + 1, started_at = NULL, completed_at = NULL WHERE id = ?`, types.TaskStatusPending, taskUpdate.TaskID, ) if err != nil { return err } // Clear steps and logs for fresh retry _, err = tx.Exec(`DELETE FROM task_steps WHERE task_id = ?`, taskUpdate.TaskID) if err != nil { return err } _, err = tx.Exec(`DELETE FROM task_logs WHERE task_id = ?`, taskUpdate.TaskID) return err }) if err != nil { log.Printf("Failed to reset task %d for retry: %v", taskUpdate.TaskID, err) return } // Broadcast task reset to clients (includes steps_cleared and logs_cleared flags) s.broadcastTaskUpdate(jobID, taskUpdate.TaskID, "task_reset", map[string]interface{}{ "status": types.TaskStatusPending, "retry_count": retryCount + 1, "error_message": taskUpdate.Error, "steps_cleared": true, "logs_cleared": true, }) log.Printf("Task %d failed but has retries remaining (%d/%d), reset to pending", taskUpdate.TaskID, retryCount+1, maxRetries) } else { // No retries remaining - mark as failed err = s.db.WithTx(func(tx *sql.Tx) error { _, err := tx.Exec(`UPDATE tasks SET status = ? WHERE id = ?`, types.TaskStatusFailed, taskUpdate.TaskID) if err != nil { return err } _, err = tx.Exec(`UPDATE tasks SET completed_at = ? WHERE id = ?`, now, taskUpdate.TaskID) if err != nil { return err } if taskUpdate.Error != "" { _, err = tx.Exec(`UPDATE tasks SET error_message = ? WHERE id = ?`, taskUpdate.Error, taskUpdate.TaskID) if err != nil { return err } } return nil }) if err != nil { log.Printf("Failed to mark task %d as failed: %v", taskUpdate.TaskID, err) return } // Log the final failure s.logTaskEvent(taskUpdate.TaskID, &runnerID, types.LogLevelError, fmt.Sprintf("Task failed permanently after %d retries: %s", maxRetries, taskUpdate.Error), "") // Broadcast task update s.broadcastTaskUpdate(jobID, taskUpdate.TaskID, "task_update", map[string]interface{}{ "status": types.TaskStatusFailed, "completed_at": now, "error_message": taskUpdate.Error, }) log.Printf("Task %d failed permanently after %d retries", taskUpdate.TaskID, maxRetries) } // Update job status and progress s.updateJobStatusFromTasks(jobID) } // parseBlenderFrame extracts the current frame number from Blender log messages // Looks for patterns like "Fra:2470" in log messages func parseBlenderFrame(logMessage string) (int, bool) { // Look for "Fra:" followed by digits // Pattern: "Fra:2470" or "Fra: 2470" or similar variations fraIndex := strings.Index(logMessage, "Fra:") if fraIndex == -1 { return 0, false } // Find the number after "Fra:" start := fraIndex + 4 // Skip "Fra:" // Skip whitespace for start < len(logMessage) && (logMessage[start] == ' ' || logMessage[start] == '\t') { start++ } // Extract digits end := start for end < len(logMessage) && logMessage[end] >= '0' && logMessage[end] <= '9' { end++ } if end > start { frame, err := strconv.Atoi(logMessage[start:end]) if err == nil { return frame, true } } return 0, false } // getCurrentFrameFromLogs gets the highest frame number found in logs for a job's render tasks func (s *Manager) getCurrentFrameFromLogs(jobID int64) (int, bool) { // Get all render tasks for this job var rows *sql.Rows err := s.db.With(func(conn *sql.DB) error { var err error rows, err = conn.Query( `SELECT id FROM tasks WHERE job_id = ? AND task_type = ? AND status = ?`, jobID, types.TaskTypeRender, types.TaskStatusRunning, ) return err }) if err != nil { return 0, false } defer rows.Close() maxFrame := 0 found := false for rows.Next() { var taskID int64 if err := rows.Scan(&taskID); err != nil { log.Printf("Failed to scan task ID in getCurrentFrameFromLogs: %v", err) continue } // Get the most recent log entries for this task (last 100 to avoid scanning all logs) var logRows *sql.Rows err := s.db.With(func(conn *sql.DB) error { var err error logRows, err = conn.Query( `SELECT message FROM task_logs WHERE task_id = ? AND message LIKE '%Fra:%' ORDER BY id DESC LIMIT 100`, taskID, ) return err }) if err != nil { continue } for logRows.Next() { var message string if err := logRows.Scan(&message); err != nil { continue } if frame, ok := parseBlenderFrame(message); ok { if frame > maxFrame { maxFrame = frame found = true } } } logRows.Close() } return maxFrame, found } // resetFailedTasksAndRedistribute resets all failed tasks for a job to pending and redistributes them func (s *Manager) resetFailedTasksAndRedistribute(jobID int64) error { // Reset all failed tasks to pending and clear their retry_count err := s.db.With(func(conn *sql.DB) error { _, err := conn.Exec( `UPDATE tasks SET status = ?, retry_count = 0, runner_id = NULL, started_at = NULL, completed_at = NULL, error_message = NULL WHERE job_id = ? AND status = ?`, types.TaskStatusPending, jobID, types.TaskStatusFailed, ) if err != nil { return fmt.Errorf("failed to reset failed tasks: %v", err) } // Increment job retry_count _, err = conn.Exec( `UPDATE jobs SET retry_count = retry_count + 1 WHERE id = ?`, jobID, ) if err != nil { return fmt.Errorf("failed to increment job retry_count: %v", err) } return nil }) if err != nil { return err } log.Printf("Reset failed tasks for job %d and incremented retry_count", jobID) return nil } // cancelActiveTasksForJob cancels all active (pending or running) tasks for a job func (s *Manager) cancelActiveTasksForJob(jobID int64) error { // Tasks don't have a cancelled status - mark them as failed instead err := s.db.With(func(conn *sql.DB) error { _, err := conn.Exec( `UPDATE tasks SET status = ?, error_message = ? WHERE job_id = ? AND status IN (?, ?)`, types.TaskStatusFailed, "Job cancelled", jobID, types.TaskStatusPending, types.TaskStatusRunning, ) if err != nil { return fmt.Errorf("failed to cancel active tasks: %v", err) } return nil }) if err != nil { return err } log.Printf("Cancelled all active tasks for job %d", jobID) return nil } // evaluateTaskCondition checks if a task's condition is met // Returns true if the task can be assigned, false otherwise func (s *Manager) evaluateTaskCondition(taskID int64, jobID int64, conditionJSON string) bool { if conditionJSON == "" { // No condition means task can always be assigned return true } var condition map[string]interface{} if err := json.Unmarshal([]byte(conditionJSON), &condition); err != nil { log.Printf("Failed to parse condition for task %d: %v", taskID, err) // If we can't parse the condition, err on the side of caution and don't assign return false } conditionType, ok := condition["type"].(string) if !ok { log.Printf("Invalid condition format for task %d: missing type", taskID) return false } switch conditionType { case "all_render_tasks_completed": // Check if all render tasks for this job are completed var totalRenderTasks, completedRenderTasks int err := s.db.With(func(conn *sql.DB) error { conn.QueryRow( `SELECT COUNT(*) FROM tasks WHERE job_id = ? AND task_type = ? AND status IN (?, ?, ?)`, jobID, types.TaskTypeRender, types.TaskStatusPending, types.TaskStatusRunning, types.TaskStatusCompleted, ).Scan(&totalRenderTasks) conn.QueryRow( `SELECT COUNT(*) FROM tasks WHERE job_id = ? AND task_type = ? AND status = ?`, jobID, types.TaskTypeRender, types.TaskStatusCompleted, ).Scan(&completedRenderTasks) return nil }) if err != nil { log.Printf("Failed to check render task completion for task %d: %v", taskID, err) return false } return totalRenderTasks > 0 && completedRenderTasks == totalRenderTasks default: log.Printf("Unknown condition type '%s' for task %d", conditionType, taskID) return false } } // getJobStatusUpdateMutex returns the mutex for a specific jobID, creating it if needed. // This ensures serialized execution of updateJobStatusFromTasks per job to prevent race conditions. func (s *Manager) getJobStatusUpdateMutex(jobID int64) *sync.Mutex { s.jobStatusUpdateMuMu.Lock() defer s.jobStatusUpdateMuMu.Unlock() mu, exists := s.jobStatusUpdateMu[jobID] if !exists { mu = &sync.Mutex{} s.jobStatusUpdateMu[jobID] = mu } return mu } // cleanupJobStatusUpdateMutex removes the mutex for a jobID after it's no longer needed. // Should only be called when the job is in a final state (completed/failed) and no more updates are expected. func (s *Manager) cleanupJobStatusUpdateMutex(jobID int64) { s.jobStatusUpdateMuMu.Lock() defer s.jobStatusUpdateMuMu.Unlock() delete(s.jobStatusUpdateMu, jobID) } // updateJobStatusFromTasks updates job status and progress based on task states // This function is serialized per jobID to prevent race conditions when multiple tasks // complete concurrently and trigger status updates simultaneously. func (s *Manager) updateJobStatusFromTasks(jobID int64) { // Serialize updates per job to prevent race conditions mu := s.getJobStatusUpdateMutex(jobID) mu.Lock() defer mu.Unlock() now := time.Now() // All jobs now use parallel runners (one task per frame), so we always use task-based progress // Get current job status to detect changes var currentStatus string err := s.db.With(func(conn *sql.DB) error { return conn.QueryRow(`SELECT status FROM jobs WHERE id = ?`, jobID).Scan(¤tStatus) }) if err != nil { log.Printf("Failed to get current job status for job %d: %v", jobID, err) return } // Count total tasks and completed tasks var totalTasks, completedTasks int err = s.db.With(func(conn *sql.DB) error { err := conn.QueryRow( `SELECT COUNT(*) FROM tasks WHERE job_id = ? AND status IN (?, ?, ?, ?)`, jobID, types.TaskStatusPending, types.TaskStatusRunning, types.TaskStatusCompleted, types.TaskStatusFailed, ).Scan(&totalTasks) if err != nil { return err } return conn.QueryRow( `SELECT COUNT(*) FROM tasks WHERE job_id = ? AND status = ?`, jobID, types.TaskStatusCompleted, ).Scan(&completedTasks) }) if err != nil { log.Printf("Failed to count completed tasks for job %d: %v", jobID, err) return } // Calculate progress var progress float64 if totalTasks == 0 { // All tasks cancelled or no tasks, set progress to 0 progress = 0.0 } else { // Standard task-based progress progress = float64(completedTasks) / float64(totalTasks) * 100.0 } var jobStatus string // Check if all non-cancelled tasks are completed var pendingOrRunningTasks int err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow( `SELECT COUNT(*) FROM tasks WHERE job_id = ? AND status IN (?, ?)`, jobID, types.TaskStatusPending, types.TaskStatusRunning, ).Scan(&pendingOrRunningTasks) }) if err != nil { log.Printf("Failed to count pending/running tasks for job %d: %v", jobID, err) return } if pendingOrRunningTasks == 0 && totalTasks > 0 { // All tasks are either completed or failed/cancelled // Check if any tasks failed var failedTasks int s.db.With(func(conn *sql.DB) error { conn.QueryRow( `SELECT COUNT(*) FROM tasks WHERE job_id = ? AND status = ?`, jobID, types.TaskStatusFailed, ).Scan(&failedTasks) return nil }) if failedTasks > 0 { // Some tasks failed - check if job has retries left var retryCount, maxRetries int err := s.db.With(func(conn *sql.DB) error { return conn.QueryRow( `SELECT retry_count, max_retries FROM jobs WHERE id = ?`, jobID, ).Scan(&retryCount, &maxRetries) }) if err != nil { log.Printf("Failed to get retry info for job %d: %v", jobID, err) // Fall back to marking job as failed jobStatus = string(types.JobStatusFailed) } else if retryCount < maxRetries { // Job has retries left - reset failed tasks and redistribute if err := s.resetFailedTasksAndRedistribute(jobID); err != nil { log.Printf("Failed to reset failed tasks for job %d: %v", jobID, err) // If reset fails, mark job as failed jobStatus = string(types.JobStatusFailed) } else { // Tasks reset successfully - job remains in running/pending state // Don't update job status, just update progress jobStatus = currentStatus // Keep current status // Recalculate progress after reset (failed tasks are now pending again) var newTotalTasks, newCompletedTasks int s.db.With(func(conn *sql.DB) error { conn.QueryRow( `SELECT COUNT(*) FROM tasks WHERE job_id = ? AND status IN (?, ?, ?, ?)`, jobID, types.TaskStatusPending, types.TaskStatusRunning, types.TaskStatusCompleted, types.TaskStatusFailed, ).Scan(&newTotalTasks) conn.QueryRow( `SELECT COUNT(*) FROM tasks WHERE job_id = ? AND status = ?`, jobID, types.TaskStatusCompleted, ).Scan(&newCompletedTasks) return nil }) if newTotalTasks > 0 { progress = float64(newCompletedTasks) / float64(newTotalTasks) * 100.0 } // Update progress only err := s.db.With(func(conn *sql.DB) error { _, err := conn.Exec( `UPDATE jobs SET progress = ? WHERE id = ?`, progress, jobID, ) return err }) if err != nil { log.Printf("Failed to update job %d progress: %v", jobID, err) } else { // Broadcast job update via WebSocket s.broadcastJobUpdate(jobID, "job_update", map[string]interface{}{ "status": jobStatus, "progress": progress, }) } return // Exit early since we've handled the retry } } else { // No retries left - mark job as failed and cancel active tasks jobStatus = string(types.JobStatusFailed) if err := s.cancelActiveTasksForJob(jobID); err != nil { log.Printf("Failed to cancel active tasks for job %d: %v", jobID, err) } } } else { // All tasks completed successfully jobStatus = string(types.JobStatusCompleted) progress = 100.0 // Ensure progress is 100% when all tasks complete } // Update job status (if we didn't return early from retry logic) if jobStatus != "" { err := s.db.With(func(conn *sql.DB) error { _, err := conn.Exec( `UPDATE jobs SET status = ?, progress = ?, completed_at = ? WHERE id = ?`, jobStatus, progress, now, jobID, ) return err }) if err != nil { log.Printf("Failed to update job %d status to %s: %v", jobID, jobStatus, err) } else { // Only log if status actually changed if currentStatus != jobStatus { log.Printf("Updated job %d status from %s to %s (progress: %.1f%%, completed tasks: %d/%d)", jobID, currentStatus, jobStatus, progress, completedTasks, totalTasks) } // Broadcast job update via WebSocket s.broadcastJobUpdate(jobID, "job_update", map[string]interface{}{ "status": jobStatus, "progress": progress, "completed_at": now, }) // Clean up mutex for jobs in final states (completed or failed) // No more status updates will occur for these jobs if jobStatus == string(types.JobStatusCompleted) || jobStatus == string(types.JobStatusFailed) { s.cleanupJobStatusUpdateMutex(jobID) } } } // Encode tasks are now created immediately when the job is created // with a condition that prevents assignment until all render tasks are completed. // No need to create them here anymore. } else { // Job has pending or running tasks - determine if it's running or still pending var runningTasks int s.db.With(func(conn *sql.DB) error { conn.QueryRow( `SELECT COUNT(*) FROM tasks WHERE job_id = ? AND status = ?`, jobID, types.TaskStatusRunning, ).Scan(&runningTasks) return nil }) if runningTasks > 0 { // Has running tasks - job is running jobStatus = string(types.JobStatusRunning) var startedAt sql.NullTime s.db.With(func(conn *sql.DB) error { conn.QueryRow(`SELECT started_at FROM jobs WHERE id = ?`, jobID).Scan(&startedAt) if !startedAt.Valid { conn.Exec(`UPDATE jobs SET started_at = ? WHERE id = ?`, now, jobID) } return nil }) } else { // All tasks are pending - job is pending jobStatus = string(types.JobStatusPending) } err := s.db.With(func(conn *sql.DB) error { _, err := conn.Exec( `UPDATE jobs SET status = ?, progress = ? WHERE id = ?`, jobStatus, progress, jobID, ) return err }) if err != nil { log.Printf("Failed to update job %d status to %s: %v", jobID, jobStatus, err) } else { // Only log if status actually changed if currentStatus != jobStatus { log.Printf("Updated job %d status from %s to %s (progress: %.1f%%, completed: %d/%d, pending: %d, running: %d)", jobID, currentStatus, jobStatus, progress, completedTasks, totalTasks, pendingOrRunningTasks-runningTasks, runningTasks) } // Broadcast job update during execution (not just on completion) s.broadcastJobUpdate(jobID, "job_update", map[string]interface{}{ "status": jobStatus, "progress": progress, }) } } } // broadcastLogToFrontend broadcasts log to connected frontend clients func (s *Manager) broadcastLogToFrontend(taskID int64, logEntry WSLogEntry) { // Get job_id, user_id, and task status from task var jobID, userID int64 var taskStatus string var taskRunnerID sql.NullInt64 var taskStartedAt sql.NullTime err := s.db.With(func(conn *sql.DB) error { return conn.QueryRow( `SELECT t.job_id, j.user_id, t.status, t.runner_id, t.started_at FROM tasks t JOIN jobs j ON t.job_id = j.id WHERE t.id = ?`, taskID, ).Scan(&jobID, &userID, &taskStatus, &taskRunnerID, &taskStartedAt) }) if err != nil { return } // Get full log entry from database for consistency // Use a more reliable query that gets the most recent log with matching message // This avoids race conditions with concurrent inserts var taskLog types.TaskLog var runnerID sql.NullInt64 var stepName sql.NullString err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow( `SELECT id, task_id, runner_id, log_level, message, step_name, created_at FROM task_logs WHERE task_id = ? AND message = ? ORDER BY id DESC LIMIT 1`, taskID, logEntry.Message, ).Scan(&taskLog.ID, &taskLog.TaskID, &runnerID, &taskLog.LogLevel, &taskLog.Message, &stepName, &taskLog.CreatedAt) }) if err != nil { return } if runnerID.Valid { taskLog.RunnerID = &runnerID.Int64 } if stepName.Valid { taskLog.StepName = stepName.String } msg := map[string]interface{}{ "type": "log", "task_id": taskID, "job_id": jobID, "data": taskLog, "timestamp": time.Now().Unix(), } // Only broadcast if client is connected if !s.isClientConnected(userID) { if s.verboseWSLogging { log.Printf("broadcastLogToFrontend: Client %d not connected, skipping log broadcast for task %d (job %d)", userID, taskID, jobID) } // Still broadcast to old WebSocket connections for backwards compatibility } else { // Broadcast to client WebSocket if subscribed to logs:{jobId}:{taskId} channel := fmt.Sprintf("logs:%d:%d", jobID, taskID) if s.verboseWSLogging { runnerIDStr := "none" if taskRunnerID.Valid { runnerIDStr = fmt.Sprintf("%d", taskRunnerID.Int64) } log.Printf("broadcastLogToFrontend: Broadcasting log for task %d (job %d, user %d) on channel %s, log_id=%d, task_status=%s, runner_id=%s", taskID, jobID, userID, channel, taskLog.ID, taskStatus, runnerIDStr) } s.broadcastToClient(userID, channel, msg) } // If task status is pending but logs are coming in, log a warning // This indicates the initial assignment broadcast may have been missed or the database update failed if taskStatus == string(types.TaskStatusPending) { log.Printf("broadcastLogToFrontend: ERROR - Task %d has logs but status is 'pending'. This indicates the initial task assignment failed or the task_update broadcast was missed.", taskID) } // Also broadcast to old WebSocket connection (for backwards compatibility during migration) key := fmt.Sprintf("%d:%d", jobID, taskID) s.frontendConnsMu.RLock() conn, exists := s.frontendConns[key] s.frontendConnsMu.RUnlock() if exists && conn != nil { // Serialize writes to prevent concurrent write panics s.frontendConnsWriteMuMu.RLock() writeMu, hasMu := s.frontendConnsWriteMu[key] s.frontendConnsWriteMuMu.RUnlock() if hasMu && writeMu != nil { writeMu.Lock() conn.WriteJSON(msg) writeMu.Unlock() } else { // Fallback if mutex doesn't exist yet (shouldn't happen, but be safe) conn.WriteJSON(msg) } } } // resetRunnerTasks resets tasks assigned to a disconnected/dead runner // In the polling model, tasks are picked up by runners when they poll func (s *Manager) resetRunnerTasks(runnerID int64) { log.Printf("Resetting tasks for disconnected runner %d", runnerID) // Find running tasks assigned to this runner (exclude completed/failed for safety) var taskRows *sql.Rows err := s.db.With(func(conn *sql.DB) error { var err error taskRows, err = conn.Query( `SELECT id, job_id FROM tasks WHERE runner_id = ? AND status = ? AND (completed_at IS NULL OR completed_at < datetime('now', '-30 seconds'))`, runnerID, types.TaskStatusRunning, ) return err }) if err != nil { log.Printf("Failed to query tasks for runner %d: %v", runnerID, err) return } defer taskRows.Close() var tasksToReset []struct { ID int64 JobID int64 } for taskRows.Next() { var t struct { ID int64 JobID int64 } if err := taskRows.Scan(&t.ID, &t.JobID); err != nil { log.Printf("Failed to scan task for runner %d: %v", runnerID, err) continue } tasksToReset = append(tasksToReset, t) } if len(tasksToReset) == 0 { log.Printf("No running tasks found for runner %d to redistribute", runnerID) return } log.Printf("Redistributing %d running tasks from disconnected runner %d", len(tasksToReset), runnerID) // Runner disconnections always get retried - increment runner_failure_count for tracking only // This does NOT count against the task's retry_count (which is for actual task failures like Blender crashes) resetCount := 0 for _, task := range tasksToReset { // Always reset to pending - runner failures retry indefinitely err = s.db.With(func(conn *sql.DB) error { _, err := conn.Exec( `UPDATE tasks SET status = ?, runner_id = NULL, current_step = NULL, runner_failure_count = runner_failure_count + 1, started_at = NULL WHERE id = ? AND runner_id = ?`, types.TaskStatusPending, task.ID, runnerID, ) if err != nil { return err } // Clear steps and logs for fresh retry _, err = conn.Exec(`DELETE FROM task_steps WHERE task_id = ?`, task.ID) if err != nil { return err } _, err = conn.Exec(`DELETE FROM task_logs WHERE task_id = ?`, task.ID) return err }) if err != nil { log.Printf("Failed to reset task %d: %v", task.ID, err) } else { resetCount++ // Broadcast task reset to clients (includes steps_cleared and logs_cleared flags) s.broadcastTaskUpdate(task.JobID, task.ID, "task_reset", map[string]interface{}{ "status": types.TaskStatusPending, "runner_id": nil, "current_step": nil, "started_at": nil, "steps_cleared": true, "logs_cleared": true, }) } } log.Printf("Task reset complete for runner %d: %d tasks reset for retry", runnerID, resetCount) // Update job statuses for affected jobs jobIDs := make(map[int64]bool) for _, task := range tasksToReset { jobIDs[task.JobID] = true } for jobID := range jobIDs { // Update job status based on remaining tasks go s.updateJobStatusFromTasks(jobID) } } // logTaskEvent logs an event to a task's log (manager-side logging) func (s *Manager) logTaskEvent(taskID int64, runnerID *int64, logLevel types.LogLevel, message, stepName string) { var runnerIDValue interface{} if runnerID != nil { runnerIDValue = *runnerID } err := s.db.With(func(conn *sql.DB) error { _, err := conn.Exec( `INSERT INTO task_logs (task_id, runner_id, log_level, message, step_name, created_at) VALUES (?, ?, ?, ?, ?, ?)`, taskID, runnerIDValue, logLevel, message, stepName, time.Now(), ) return err }) if err != nil { log.Printf("Failed to log task event for task %d: %v", taskID, err) return } // Broadcast to frontend if there are connected clients s.broadcastLogToFrontend(taskID, WSLogEntry{ TaskID: taskID, LogLevel: string(logLevel), Message: message, StepName: stepName, }) } // cleanupOldOfflineRunners periodically deletes runners that have been offline for more than 1 month func (s *Manager) cleanupOldOfflineRunners() { // Run cleanup every 24 hours ticker := time.NewTicker(24 * time.Hour) defer ticker.Stop() // Run once immediately on startup s.cleanupOldOfflineRunnersOnce() for range ticker.C { s.cleanupOldOfflineRunnersOnce() } } // cleanupOldOfflineRunnersOnce finds and deletes runners that have been offline for more than 1 month func (s *Manager) cleanupOldOfflineRunnersOnce() { defer func() { if r := recover(); r != nil { log.Printf("Panic in cleanupOldOfflineRunners: %v", r) } }() // Find runners that: // 1. Are offline // 2. Haven't had a heartbeat in over 1 month var rows *sql.Rows err := s.db.With(func(conn *sql.DB) error { var err error rows, err = conn.Query( `SELECT id, name FROM runners WHERE status = ? AND last_heartbeat < datetime('now', '-1 month')`, types.RunnerStatusOffline, ) return err }) if err != nil { log.Printf("Failed to query old offline runners: %v", err) return } defer rows.Close() type runnerInfo struct { ID int64 Name string } var runnersToDelete []runnerInfo for rows.Next() { var info runnerInfo if err := rows.Scan(&info.ID, &info.Name); err == nil { runnersToDelete = append(runnersToDelete, info) } } rows.Close() if len(runnersToDelete) == 0 { return } log.Printf("Cleaning up %d old offline runners (offline for more than 1 month)", len(runnersToDelete)) // Delete each runner for _, runner := range runnersToDelete { // First, check if there are any tasks still assigned to this runner // If so, reset them to pending before deleting the runner var assignedTaskCount int err := s.db.With(func(conn *sql.DB) error { return conn.QueryRow( `SELECT COUNT(*) FROM tasks WHERE runner_id = ? AND status IN (?, ?)`, runner.ID, types.TaskStatusRunning, types.TaskStatusPending, ).Scan(&assignedTaskCount) }) if err != nil { log.Printf("Failed to check assigned tasks for runner %d: %v", runner.ID, err) continue } if assignedTaskCount > 0 { // Reset any tasks assigned to this runner log.Printf("Resetting %d tasks assigned to runner %d before deletion", assignedTaskCount, runner.ID) err = s.db.With(func(conn *sql.DB) error { _, err := conn.Exec( `UPDATE tasks SET runner_id = NULL, status = ? WHERE runner_id = ? AND status IN (?, ?)`, types.TaskStatusPending, runner.ID, types.TaskStatusRunning, types.TaskStatusPending, ) return err }) if err != nil { log.Printf("Failed to reset tasks for runner %d: %v", runner.ID, err) continue } } // Delete the runner err = s.db.With(func(conn *sql.DB) error { _, err := conn.Exec("DELETE FROM runners WHERE id = ?", runner.ID) return err }) if err != nil { log.Printf("Failed to delete runner %d (%s): %v", runner.ID, runner.Name, err) continue } log.Printf("Deleted old offline runner: %d (%s)", runner.ID, runner.Name) } } // sendWebSocketMessage safely sends a message over a WebSocket connection with write locking func (s *Manager) sendWebSocketMessage(conn *websocket.Conn, msg interface{}) error { // For simplicity in the polling model, we'll use a global write mutex // since we typically have one connection per job/task s.runnerJobConnsMu.RLock() defer s.runnerJobConnsMu.RUnlock() // Set write deadline conn.SetWriteDeadline(time.Now().Add(WSWriteDeadline)) // Write the message directly - the RWMutex read lock provides basic synchronization // For production, consider using a per-connection mutex pool if err := conn.WriteJSON(msg); err != nil { log.Printf("Failed to send WebSocket message: %v", err) return err } return nil }