|
|
|
|
@@ -390,7 +390,7 @@ func (s *Manager) handleNextJob(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
t.condition
|
|
|
|
|
FROM tasks t
|
|
|
|
|
JOIN jobs j ON t.job_id = j.id
|
|
|
|
|
WHERE t.status = ? AND j.status != ?
|
|
|
|
|
WHERE t.status = ? AND t.runner_id IS NULL AND j.status != ?
|
|
|
|
|
ORDER BY t.created_at ASC
|
|
|
|
|
LIMIT 50`,
|
|
|
|
|
types.TaskStatusPending, types.JobStatusCancelled,
|
|
|
|
|
@@ -1019,6 +1019,10 @@ func (s *Manager) handleGetJobStatusForRunner(w http.ResponseWriter, r *http.Req
|
|
|
|
|
&job.CreatedAt, &startedAt, &completedAt, &errorMessage,
|
|
|
|
|
)
|
|
|
|
|
})
|
|
|
|
|
if err == sql.ErrNoRows {
|
|
|
|
|
s.respondError(w, http.StatusNotFound, "Job not found")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err))
|
|
|
|
|
return
|
|
|
|
|
@@ -1037,15 +1041,6 @@ func (s *Manager) handleGetJobStatusForRunner(w http.ResponseWriter, r *http.Req
|
|
|
|
|
job.OutputFormat = &outputFormat.String
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err == sql.ErrNoRows {
|
|
|
|
|
s.respondError(w, http.StatusNotFound, "Job not found")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err))
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if startedAt.Valid {
|
|
|
|
|
job.StartedAt = &startedAt.Time
|
|
|
|
|
}
|
|
|
|
|
@@ -1368,7 +1363,7 @@ func (s *Manager) handleRunnerJobWebSocket(w http.ResponseWriter, r *http.Reques
|
|
|
|
|
log.Printf("Job WebSocket disconnected unexpectedly for task %d, marking as failed", taskID)
|
|
|
|
|
s.db.With(func(conn *sql.DB) error {
|
|
|
|
|
_, err := conn.Exec(
|
|
|
|
|
`UPDATE tasks SET status = ?, error_message = ?, completed_at = ? WHERE id = ?`,
|
|
|
|
|
`UPDATE tasks SET status = ?, runner_id = NULL, error_message = ?, completed_at = ? WHERE id = ?`,
|
|
|
|
|
types.TaskStatusFailed, "WebSocket connection lost", time.Now(), taskID,
|
|
|
|
|
)
|
|
|
|
|
return err
|
|
|
|
|
@@ -1683,11 +1678,10 @@ func (s *Manager) handleWebSocketTaskComplete(runnerID int64, taskUpdate WSTaskU
|
|
|
|
|
} else {
|
|
|
|
|
// No retries remaining - mark as failed
|
|
|
|
|
err = s.db.WithTx(func(tx *sql.Tx) error {
|
|
|
|
|
_, err := tx.Exec(`UPDATE tasks SET status = ? WHERE id = ?`, types.TaskStatusFailed, taskUpdate.TaskID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
_, err = tx.Exec(`UPDATE tasks SET completed_at = ? WHERE id = ?`, now, taskUpdate.TaskID)
|
|
|
|
|
_, err := tx.Exec(
|
|
|
|
|
`UPDATE tasks SET status = ?, runner_id = NULL, completed_at = ? WHERE id = ?`,
|
|
|
|
|
types.TaskStatusFailed, now, taskUpdate.TaskID,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
@@ -1854,7 +1848,7 @@ func (s *Manager) cancelActiveTasksForJob(jobID int64) error {
|
|
|
|
|
// Tasks don't have a cancelled status - mark them as failed instead
|
|
|
|
|
err := s.db.With(func(conn *sql.DB) error {
|
|
|
|
|
_, err := conn.Exec(
|
|
|
|
|
`UPDATE tasks SET status = ?, error_message = ? WHERE job_id = ? AND status IN (?, ?)`,
|
|
|
|
|
`UPDATE tasks SET status = ?, runner_id = NULL, error_message = ? WHERE job_id = ? AND status IN (?, ?)`,
|
|
|
|
|
types.TaskStatusFailed, "Job cancelled", jobID, types.TaskStatusPending, types.TaskStatusRunning,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
@@ -1920,8 +1914,37 @@ func (s *Manager) evaluateTaskCondition(taskID int64, jobID int64, conditionJSON
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// getJobStatusUpdateMutex returns the mutex for a specific jobID, creating it if needed.
|
|
|
|
|
// This ensures serialized execution of updateJobStatusFromTasks per job to prevent race conditions.
|
|
|
|
|
func (s *Manager) getJobStatusUpdateMutex(jobID int64) *sync.Mutex {
|
|
|
|
|
s.jobStatusUpdateMuMu.Lock()
|
|
|
|
|
defer s.jobStatusUpdateMuMu.Unlock()
|
|
|
|
|
|
|
|
|
|
mu, exists := s.jobStatusUpdateMu[jobID]
|
|
|
|
|
if !exists {
|
|
|
|
|
mu = &sync.Mutex{}
|
|
|
|
|
s.jobStatusUpdateMu[jobID] = mu
|
|
|
|
|
}
|
|
|
|
|
return mu
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// cleanupJobStatusUpdateMutex removes the mutex for a jobID after it's no longer needed.
|
|
|
|
|
// Should only be called when the job is in a final state (completed/failed) and no more updates are expected.
|
|
|
|
|
func (s *Manager) cleanupJobStatusUpdateMutex(jobID int64) {
|
|
|
|
|
s.jobStatusUpdateMuMu.Lock()
|
|
|
|
|
defer s.jobStatusUpdateMuMu.Unlock()
|
|
|
|
|
delete(s.jobStatusUpdateMu, jobID)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// updateJobStatusFromTasks updates job status and progress based on task states
|
|
|
|
|
// This function is serialized per jobID to prevent race conditions when multiple tasks
|
|
|
|
|
// complete concurrently and trigger status updates simultaneously.
|
|
|
|
|
func (s *Manager) updateJobStatusFromTasks(jobID int64) {
|
|
|
|
|
// Serialize updates per job to prevent race conditions
|
|
|
|
|
mu := s.getJobStatusUpdateMutex(jobID)
|
|
|
|
|
mu.Lock()
|
|
|
|
|
defer mu.Unlock()
|
|
|
|
|
|
|
|
|
|
now := time.Now()
|
|
|
|
|
|
|
|
|
|
// All jobs now use parallel runners (one task per frame), so we always use task-based progress
|
|
|
|
|
@@ -2087,6 +2110,11 @@ func (s *Manager) updateJobStatusFromTasks(jobID int64) {
|
|
|
|
|
"progress": progress,
|
|
|
|
|
"completed_at": now,
|
|
|
|
|
})
|
|
|
|
|
// Clean up mutex for jobs in final states (completed or failed)
|
|
|
|
|
// No more status updates will occur for these jobs
|
|
|
|
|
if jobStatus == string(types.JobStatusCompleted) || jobStatus == string(types.JobStatusFailed) {
|
|
|
|
|
s.cleanupJobStatusUpdateMutex(jobID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|