Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d3c5ee0dba | |||
| bb57ce8659 | |||
| 1a8836e6aa | |||
| b51b96a618 | |||
| 8e561922c9 | |||
| 1c4bd78f56 | |||
| 3f2982ddb3 | |||
| 0b852c5087 | |||
| 5e56c7f0e8 |
@@ -10,6 +10,8 @@ jobs:
|
|||||||
- uses: actions/setup-go@main
|
- uses: actions/setup-go@main
|
||||||
with:
|
with:
|
||||||
go-version-file: 'go.mod'
|
go-version-file: 'go.mod'
|
||||||
- run: go mod tidy
|
- uses: FedericoCarboni/setup-ffmpeg@v3
|
||||||
|
- run: go mod tidy
|
||||||
|
- run: cd web && npm install && npm run build
|
||||||
- run: go build ./...
|
- run: go build ./...
|
||||||
- run: go test -race -v -shuffle=on ./...
|
- run: go test -race -v -shuffle=on ./...
|
||||||
@@ -944,7 +944,7 @@ func (s *Manager) handleCancelJob(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
// Cancel all pending tasks
|
// Cancel all pending tasks
|
||||||
_, err = conn.Exec(
|
_, err = conn.Exec(
|
||||||
`UPDATE tasks SET status = ? WHERE job_id = ? AND status = ?`,
|
`UPDATE tasks SET status = ?, runner_id = NULL WHERE job_id = ? AND status = ?`,
|
||||||
types.TaskStatusFailed, jobID, types.TaskStatusPending,
|
types.TaskStatusFailed, jobID, types.TaskStatusPending,
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
@@ -3024,9 +3024,6 @@ func (s *Manager) handleListJobTasks(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
if err != nil {
|
|
||||||
total = -1
|
|
||||||
}
|
|
||||||
|
|
||||||
tasks := []types.Task{}
|
tasks := []types.Task{}
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
|
|||||||
@@ -89,6 +89,9 @@ type Manager struct {
|
|||||||
// Throttling for task status updates (per task)
|
// Throttling for task status updates (per task)
|
||||||
taskUpdateTimes map[int64]time.Time // key: taskID
|
taskUpdateTimes map[int64]time.Time // key: taskID
|
||||||
taskUpdateTimesMu sync.RWMutex
|
taskUpdateTimesMu sync.RWMutex
|
||||||
|
// Per-job mutexes to serialize updateJobStatusFromTasks calls and prevent race conditions
|
||||||
|
jobStatusUpdateMu map[int64]*sync.Mutex // key: jobID
|
||||||
|
jobStatusUpdateMuMu sync.RWMutex
|
||||||
|
|
||||||
// Client WebSocket connections (new unified WebSocket)
|
// Client WebSocket connections (new unified WebSocket)
|
||||||
// Key is "userID:connID" to support multiple tabs per user
|
// Key is "userID:connID" to support multiple tabs per user
|
||||||
@@ -162,6 +165,8 @@ func NewManager(db *database.DB, cfg *config.Config, auth *authpkg.Auth, storage
|
|||||||
runnerJobConns: make(map[string]*websocket.Conn),
|
runnerJobConns: make(map[string]*websocket.Conn),
|
||||||
runnerJobConnsWriteMu: make(map[string]*sync.Mutex),
|
runnerJobConnsWriteMu: make(map[string]*sync.Mutex),
|
||||||
runnerJobConnsWriteMuMu: sync.RWMutex{}, // Initialize the new field
|
runnerJobConnsWriteMuMu: sync.RWMutex{}, // Initialize the new field
|
||||||
|
// Per-job mutexes for serializing status updates
|
||||||
|
jobStatusUpdateMu: make(map[int64]*sync.Mutex),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for required external tools
|
// Check for required external tools
|
||||||
|
|||||||
@@ -390,7 +390,7 @@ func (s *Manager) handleNextJob(w http.ResponseWriter, r *http.Request) {
|
|||||||
t.condition
|
t.condition
|
||||||
FROM tasks t
|
FROM tasks t
|
||||||
JOIN jobs j ON t.job_id = j.id
|
JOIN jobs j ON t.job_id = j.id
|
||||||
WHERE t.status = ? AND j.status != ?
|
WHERE t.status = ? AND t.runner_id IS NULL AND j.status != ?
|
||||||
ORDER BY t.created_at ASC
|
ORDER BY t.created_at ASC
|
||||||
LIMIT 50`,
|
LIMIT 50`,
|
||||||
types.TaskStatusPending, types.JobStatusCancelled,
|
types.TaskStatusPending, types.JobStatusCancelled,
|
||||||
@@ -1019,6 +1019,10 @@ func (s *Manager) handleGetJobStatusForRunner(w http.ResponseWriter, r *http.Req
|
|||||||
&job.CreatedAt, &startedAt, &completedAt, &errorMessage,
|
&job.CreatedAt, &startedAt, &completedAt, &errorMessage,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
s.respondError(w, http.StatusNotFound, "Job not found")
|
||||||
|
return
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err))
|
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err))
|
||||||
return
|
return
|
||||||
@@ -1037,15 +1041,6 @@ func (s *Manager) handleGetJobStatusForRunner(w http.ResponseWriter, r *http.Req
|
|||||||
job.OutputFormat = &outputFormat.String
|
job.OutputFormat = &outputFormat.String
|
||||||
}
|
}
|
||||||
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
s.respondError(w, http.StatusNotFound, "Job not found")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if startedAt.Valid {
|
if startedAt.Valid {
|
||||||
job.StartedAt = &startedAt.Time
|
job.StartedAt = &startedAt.Time
|
||||||
}
|
}
|
||||||
@@ -1368,7 +1363,7 @@ func (s *Manager) handleRunnerJobWebSocket(w http.ResponseWriter, r *http.Reques
|
|||||||
log.Printf("Job WebSocket disconnected unexpectedly for task %d, marking as failed", taskID)
|
log.Printf("Job WebSocket disconnected unexpectedly for task %d, marking as failed", taskID)
|
||||||
s.db.With(func(conn *sql.DB) error {
|
s.db.With(func(conn *sql.DB) error {
|
||||||
_, err := conn.Exec(
|
_, err := conn.Exec(
|
||||||
`UPDATE tasks SET status = ?, error_message = ?, completed_at = ? WHERE id = ?`,
|
`UPDATE tasks SET status = ?, runner_id = NULL, error_message = ?, completed_at = ? WHERE id = ?`,
|
||||||
types.TaskStatusFailed, "WebSocket connection lost", time.Now(), taskID,
|
types.TaskStatusFailed, "WebSocket connection lost", time.Now(), taskID,
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
@@ -1683,11 +1678,10 @@ func (s *Manager) handleWebSocketTaskComplete(runnerID int64, taskUpdate WSTaskU
|
|||||||
} else {
|
} else {
|
||||||
// No retries remaining - mark as failed
|
// No retries remaining - mark as failed
|
||||||
err = s.db.WithTx(func(tx *sql.Tx) error {
|
err = s.db.WithTx(func(tx *sql.Tx) error {
|
||||||
_, err := tx.Exec(`UPDATE tasks SET status = ? WHERE id = ?`, types.TaskStatusFailed, taskUpdate.TaskID)
|
_, err := tx.Exec(
|
||||||
if err != nil {
|
`UPDATE tasks SET status = ?, runner_id = NULL, completed_at = ? WHERE id = ?`,
|
||||||
return err
|
types.TaskStatusFailed, now, taskUpdate.TaskID,
|
||||||
}
|
)
|
||||||
_, err = tx.Exec(`UPDATE tasks SET completed_at = ? WHERE id = ?`, now, taskUpdate.TaskID)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -1854,7 +1848,7 @@ func (s *Manager) cancelActiveTasksForJob(jobID int64) error {
|
|||||||
// Tasks don't have a cancelled status - mark them as failed instead
|
// Tasks don't have a cancelled status - mark them as failed instead
|
||||||
err := s.db.With(func(conn *sql.DB) error {
|
err := s.db.With(func(conn *sql.DB) error {
|
||||||
_, err := conn.Exec(
|
_, err := conn.Exec(
|
||||||
`UPDATE tasks SET status = ?, error_message = ? WHERE job_id = ? AND status IN (?, ?)`,
|
`UPDATE tasks SET status = ?, runner_id = NULL, error_message = ? WHERE job_id = ? AND status IN (?, ?)`,
|
||||||
types.TaskStatusFailed, "Job cancelled", jobID, types.TaskStatusPending, types.TaskStatusRunning,
|
types.TaskStatusFailed, "Job cancelled", jobID, types.TaskStatusPending, types.TaskStatusRunning,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -1920,8 +1914,37 @@ func (s *Manager) evaluateTaskCondition(taskID int64, jobID int64, conditionJSON
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getJobStatusUpdateMutex returns the mutex for a specific jobID, creating it if needed.
|
||||||
|
// This ensures serialized execution of updateJobStatusFromTasks per job to prevent race conditions.
|
||||||
|
func (s *Manager) getJobStatusUpdateMutex(jobID int64) *sync.Mutex {
|
||||||
|
s.jobStatusUpdateMuMu.Lock()
|
||||||
|
defer s.jobStatusUpdateMuMu.Unlock()
|
||||||
|
|
||||||
|
mu, exists := s.jobStatusUpdateMu[jobID]
|
||||||
|
if !exists {
|
||||||
|
mu = &sync.Mutex{}
|
||||||
|
s.jobStatusUpdateMu[jobID] = mu
|
||||||
|
}
|
||||||
|
return mu
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanupJobStatusUpdateMutex removes the mutex for a jobID after it's no longer needed.
|
||||||
|
// Should only be called when the job is in a final state (completed/failed) and no more updates are expected.
|
||||||
|
func (s *Manager) cleanupJobStatusUpdateMutex(jobID int64) {
|
||||||
|
s.jobStatusUpdateMuMu.Lock()
|
||||||
|
defer s.jobStatusUpdateMuMu.Unlock()
|
||||||
|
delete(s.jobStatusUpdateMu, jobID)
|
||||||
|
}
|
||||||
|
|
||||||
// updateJobStatusFromTasks updates job status and progress based on task states
|
// updateJobStatusFromTasks updates job status and progress based on task states
|
||||||
|
// This function is serialized per jobID to prevent race conditions when multiple tasks
|
||||||
|
// complete concurrently and trigger status updates simultaneously.
|
||||||
func (s *Manager) updateJobStatusFromTasks(jobID int64) {
|
func (s *Manager) updateJobStatusFromTasks(jobID int64) {
|
||||||
|
// Serialize updates per job to prevent race conditions
|
||||||
|
mu := s.getJobStatusUpdateMutex(jobID)
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
// All jobs now use parallel runners (one task per frame), so we always use task-based progress
|
// All jobs now use parallel runners (one task per frame), so we always use task-based progress
|
||||||
@@ -2087,6 +2110,11 @@ func (s *Manager) updateJobStatusFromTasks(jobID int64) {
|
|||||||
"progress": progress,
|
"progress": progress,
|
||||||
"completed_at": now,
|
"completed_at": now,
|
||||||
})
|
})
|
||||||
|
// Clean up mutex for jobs in final states (completed or failed)
|
||||||
|
// No more status updates will occur for these jobs
|
||||||
|
if jobStatus == string(types.JobStatusCompleted) || jobStatus == string(types.JobStatusFailed) {
|
||||||
|
s.cleanupJobStatusUpdateMutex(jobID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -289,6 +289,10 @@ func (r *Runner) uploadOutputs(ctx *tasks.Context, job *api.NextJobResponse) err
|
|||||||
log.Printf("Failed to upload %s: %v", filePath, err)
|
log.Printf("Failed to upload %s: %v", filePath, err)
|
||||||
} else {
|
} else {
|
||||||
ctx.OutputUploaded(entry.Name())
|
ctx.OutputUploaded(entry.Name())
|
||||||
|
// Delete file after successful upload to prevent duplicate uploads
|
||||||
|
if err := os.Remove(filePath); err != nil {
|
||||||
|
log.Printf("Warning: Failed to delete file %s after upload: %v", filePath, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -373,6 +373,12 @@ func (p *EncodeProcessor) Process(ctx *Context) error {
|
|||||||
|
|
||||||
ctx.Info(fmt.Sprintf("Successfully uploaded %s: %s", strings.ToUpper(outputExt), filepath.Base(outputVideo)))
|
ctx.Info(fmt.Sprintf("Successfully uploaded %s: %s", strings.ToUpper(outputExt), filepath.Base(outputVideo)))
|
||||||
|
|
||||||
|
// Delete file after successful upload to prevent duplicate uploads
|
||||||
|
if err := os.Remove(outputVideo); err != nil {
|
||||||
|
log.Printf("Warning: Failed to delete video file %s after upload: %v", outputVideo, err)
|
||||||
|
ctx.Warn(fmt.Sprintf("Warning: Failed to delete video file after upload: %v", err))
|
||||||
|
}
|
||||||
|
|
||||||
log.Printf("Successfully generated and uploaded %s for job %d: %s", strings.ToUpper(outputExt), ctx.JobID, filepath.Base(outputVideo))
|
log.Printf("Successfully generated and uploaded %s for job %d: %s", strings.ToUpper(outputExt), ctx.JobID, filepath.Base(outputVideo))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -111,6 +111,41 @@ export default function JobDetails({ job, onClose, onUpdate }) {
|
|||||||
});
|
});
|
||||||
}, [taskData]);
|
}, [taskData]);
|
||||||
|
|
||||||
|
// Helper function to load all files with pagination
|
||||||
|
const loadAllFiles = async (jobId, signal) => {
|
||||||
|
const allFiles = [];
|
||||||
|
let offset = 0;
|
||||||
|
const limit = 100; // Load 100 files per page
|
||||||
|
let hasMore = true;
|
||||||
|
|
||||||
|
while (hasMore && !signal?.aborted) {
|
||||||
|
const fileList = await jobs.getFiles(jobId, { limit, offset, signal });
|
||||||
|
|
||||||
|
// Check for superseded sentinel
|
||||||
|
if (fileList === REQUEST_SUPERSEDED) {
|
||||||
|
return REQUEST_SUPERSEDED;
|
||||||
|
}
|
||||||
|
|
||||||
|
const fileData = fileList?.data || fileList;
|
||||||
|
const files = Array.isArray(fileData) ? fileData : [];
|
||||||
|
allFiles.push(...files);
|
||||||
|
|
||||||
|
// Check if there are more files to load
|
||||||
|
const total = fileList?.total;
|
||||||
|
if (total !== undefined) {
|
||||||
|
hasMore = offset + files.length < total;
|
||||||
|
} else {
|
||||||
|
// If total is not provided, check if we got a full page (or more)
|
||||||
|
// Use >= to safely handle edge cases where API returns different amounts
|
||||||
|
hasMore = files.length >= limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
offset += files.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
return allFiles;
|
||||||
|
};
|
||||||
|
|
||||||
const loadDetails = async () => {
|
const loadDetails = async () => {
|
||||||
// Guard against undefined job or job.id
|
// Guard against undefined job or job.id
|
||||||
if (!job || !job.id) {
|
if (!job || !job.id) {
|
||||||
@@ -122,9 +157,9 @@ export default function JobDetails({ job, onClose, onUpdate }) {
|
|||||||
setLoading(true);
|
setLoading(true);
|
||||||
// Use summary endpoint for tasks initially - much faster
|
// Use summary endpoint for tasks initially - much faster
|
||||||
const signal = abortControllerRef.current?.signal;
|
const signal = abortControllerRef.current?.signal;
|
||||||
const [details, fileList, taskListResult] = await Promise.all([
|
const [details, allFilesResult, taskListResult] = await Promise.all([
|
||||||
jobs.get(job.id, { signal }),
|
jobs.get(job.id, { signal }),
|
||||||
jobs.getFiles(job.id, { limit: 50, signal }), // Only load first page of files
|
loadAllFiles(job.id, signal), // Load all files with pagination
|
||||||
jobs.getTasksSummary(job.id, { sort: 'frame:asc', signal }), // Get all tasks
|
jobs.getTasksSummary(job.id, { sort: 'frame:asc', signal }), // Get all tasks
|
||||||
]);
|
]);
|
||||||
|
|
||||||
@@ -135,11 +170,10 @@ export default function JobDetails({ job, onClose, onUpdate }) {
|
|||||||
setJobDetails(details);
|
setJobDetails(details);
|
||||||
|
|
||||||
// Handle paginated file response - check for superseded sentinel
|
// Handle paginated file response - check for superseded sentinel
|
||||||
if (fileList === REQUEST_SUPERSEDED) {
|
if (allFilesResult === REQUEST_SUPERSEDED) {
|
||||||
return; // Request was superseded, skip this update
|
return; // Request was superseded, skip this update
|
||||||
}
|
}
|
||||||
const fileData = fileList?.data || fileList;
|
setFiles(Array.isArray(allFilesResult) ? allFilesResult : []);
|
||||||
setFiles(Array.isArray(fileData) ? fileData : []);
|
|
||||||
|
|
||||||
// Handle paginated task summary response - check for superseded sentinel
|
// Handle paginated task summary response - check for superseded sentinel
|
||||||
if (taskListResult === REQUEST_SUPERSEDED) {
|
if (taskListResult === REQUEST_SUPERSEDED) {
|
||||||
@@ -617,16 +651,22 @@ export default function JobDetails({ job, onClose, onUpdate }) {
|
|||||||
};
|
};
|
||||||
reloadTasks();
|
reloadTasks();
|
||||||
} else if (data.type === 'file_added' && data.data) {
|
} else if (data.type === 'file_added' && data.data) {
|
||||||
// New file was added - reload file list
|
// New file was added - reload all files
|
||||||
const reloadFiles = async () => {
|
const reloadFiles = async () => {
|
||||||
try {
|
try {
|
||||||
const fileList = await jobs.getFiles(job.id, { limit: 50 });
|
const signal = abortControllerRef.current?.signal;
|
||||||
|
const allFilesResult = await loadAllFiles(job.id, signal);
|
||||||
|
|
||||||
|
// Check if request was aborted
|
||||||
|
if (signal?.aborted) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Check for superseded sentinel
|
// Check for superseded sentinel
|
||||||
if (fileList === REQUEST_SUPERSEDED) {
|
if (allFilesResult === REQUEST_SUPERSEDED) {
|
||||||
return; // Request was superseded, skip this update
|
return; // Request was superseded, skip this update
|
||||||
}
|
}
|
||||||
const fileData = fileList.data || fileList;
|
setFiles(Array.isArray(allFilesResult) ? allFilesResult : []);
|
||||||
setFiles(Array.isArray(fileData) ? fileData : []);
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Failed to reload files:', error);
|
console.error('Failed to reload files:', error);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user