From b51b96a61801a7d958b80759d0e8bc1c7c216c42 Mon Sep 17 00:00:00 2001 From: Justin Harms Date: Fri, 2 Jan 2026 18:22:55 -0600 Subject: [PATCH] Refactor job status handling to prevent race conditions - Removed redundant error handling in handleListJobTasks. - Introduced per-job mutexes in Manager to serialize updateJobStatusFromTasks calls, ensuring thread safety during concurrent task completions. - Added methods to manage job status update mutexes, including creation and cleanup after job completion or failure. - Improved error handling in handleGetJobStatusForRunner by consolidating error checks. --- internal/manager/jobs.go | 3 --- internal/manager/manager.go | 5 ++++ internal/manager/runners.go | 47 ++++++++++++++++++++++++++++++------- 3 files changed, 43 insertions(+), 12 deletions(-) diff --git a/internal/manager/jobs.go b/internal/manager/jobs.go index 9bd8e8e..8ab6157 100644 --- a/internal/manager/jobs.go +++ b/internal/manager/jobs.go @@ -3024,9 +3024,6 @@ func (s *Manager) handleListJobTasks(w http.ResponseWriter, r *http.Request) { return } defer rows.Close() - if err != nil { - total = -1 - } tasks := []types.Task{} for rows.Next() { diff --git a/internal/manager/manager.go b/internal/manager/manager.go index f9658ea..ab77f11 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -89,6 +89,9 @@ type Manager struct { // Throttling for task status updates (per task) taskUpdateTimes map[int64]time.Time // key: taskID taskUpdateTimesMu sync.RWMutex + // Per-job mutexes to serialize updateJobStatusFromTasks calls and prevent race conditions + jobStatusUpdateMu map[int64]*sync.Mutex // key: jobID + jobStatusUpdateMuMu sync.RWMutex // Client WebSocket connections (new unified WebSocket) // Key is "userID:connID" to support multiple tabs per user @@ -162,6 +165,8 @@ func NewManager(db *database.DB, cfg *config.Config, auth *authpkg.Auth, storage runnerJobConns: make(map[string]*websocket.Conn), runnerJobConnsWriteMu: make(map[string]*sync.Mutex), runnerJobConnsWriteMuMu: sync.RWMutex{}, // Initialize the new field + // Per-job mutexes for serializing status updates + jobStatusUpdateMu: make(map[int64]*sync.Mutex), } // Check for required external tools diff --git a/internal/manager/runners.go b/internal/manager/runners.go index bc0d85a..07e75d9 100644 --- a/internal/manager/runners.go +++ b/internal/manager/runners.go @@ -1019,6 +1019,10 @@ func (s *Manager) handleGetJobStatusForRunner(w http.ResponseWriter, r *http.Req &job.CreatedAt, &startedAt, &completedAt, &errorMessage, ) }) + if err == sql.ErrNoRows { + s.respondError(w, http.StatusNotFound, "Job not found") + return + } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err)) return @@ -1037,15 +1041,6 @@ func (s *Manager) handleGetJobStatusForRunner(w http.ResponseWriter, r *http.Req job.OutputFormat = &outputFormat.String } - if err == sql.ErrNoRows { - s.respondError(w, http.StatusNotFound, "Job not found") - return - } - if err != nil { - s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err)) - return - } - if startedAt.Valid { job.StartedAt = &startedAt.Time } @@ -1920,8 +1915,37 @@ func (s *Manager) evaluateTaskCondition(taskID int64, jobID int64, conditionJSON } } +// getJobStatusUpdateMutex returns the mutex for a specific jobID, creating it if needed. +// This ensures serialized execution of updateJobStatusFromTasks per job to prevent race conditions. +func (s *Manager) getJobStatusUpdateMutex(jobID int64) *sync.Mutex { + s.jobStatusUpdateMuMu.Lock() + defer s.jobStatusUpdateMuMu.Unlock() + + mu, exists := s.jobStatusUpdateMu[jobID] + if !exists { + mu = &sync.Mutex{} + s.jobStatusUpdateMu[jobID] = mu + } + return mu +} + +// cleanupJobStatusUpdateMutex removes the mutex for a jobID after it's no longer needed. +// Should only be called when the job is in a final state (completed/failed) and no more updates are expected. +func (s *Manager) cleanupJobStatusUpdateMutex(jobID int64) { + s.jobStatusUpdateMuMu.Lock() + defer s.jobStatusUpdateMuMu.Unlock() + delete(s.jobStatusUpdateMu, jobID) +} + // updateJobStatusFromTasks updates job status and progress based on task states +// This function is serialized per jobID to prevent race conditions when multiple tasks +// complete concurrently and trigger status updates simultaneously. func (s *Manager) updateJobStatusFromTasks(jobID int64) { + // Serialize updates per job to prevent race conditions + mu := s.getJobStatusUpdateMutex(jobID) + mu.Lock() + defer mu.Unlock() + now := time.Now() // All jobs now use parallel runners (one task per frame), so we always use task-based progress @@ -2087,6 +2111,11 @@ func (s *Manager) updateJobStatusFromTasks(jobID int64) { "progress": progress, "completed_at": now, }) + // Clean up mutex for jobs in final states (completed or failed) + // No more status updates will occur for these jobs + if jobStatus == string(types.JobStatusCompleted) || jobStatus == string(types.JobStatusFailed) { + s.cleanupJobStatusUpdateMutex(jobID) + } } }