9 Commits

Author SHA1 Message Date
d3c5ee0dba Add pagination support for file loading in JobDetails component
All checks were successful
Release Tag / release (push) Successful in 20s
- Introduced a helper function to load all files associated with a job using pagination, improving performance by fetching files in batches.
- Updated the loadDetails function to utilize the new pagination method for retrieving all files instead of just the first page.
- Adjusted file handling logic to ensure proper updates when new files are added, maintaining consistency with the paginated approach.
2026-01-03 10:58:36 -06:00
bb57ce8659 Update task status handling to reset runner_id on job cancellation and failure
All checks were successful
Release Tag / release (push) Successful in 20s
- Modified SQL queries in multiple functions to set runner_id to NULL when updating task statuses for cancelled jobs and failed tasks.
- Ensured that tasks are properly marked as failed with the correct error messages and updated completion timestamps.
- Improved handling of task statuses to prevent potential issues with task assignment and execution.
2026-01-03 09:01:08 -06:00
1a8836e6aa Merge pull request 'Refactor job status handling to prevent race conditions' (#4) from fix-race into master
All checks were successful
Release Tag / release (push) Successful in 18s
Reviewed-on: #4
2026-01-02 18:25:17 -06:00
b51b96a618 Refactor job status handling to prevent race conditions
All checks were successful
PR Check / check-and-test (pull_request) Successful in 26s
- Removed redundant error handling in handleListJobTasks.
- Introduced per-job mutexes in Manager to serialize updateJobStatusFromTasks calls, ensuring thread safety during concurrent task completions.
- Added methods to manage job status update mutexes, including creation and cleanup after job completion or failure.
- Improved error handling in handleGetJobStatusForRunner by consolidating error checks.
2026-01-02 18:22:55 -06:00
8e561922c9 Merge pull request 'Implement file deletion after successful uploads in runner and encoding processes' (#3) from fix-uploads into master
Reviewed-on: #3
2026-01-02 17:51:19 -06:00
1c4bd78f56 Add FFmpeg setup step to Gitea workflow for enhanced media processing
All checks were successful
PR Check / check-and-test (pull_request) Successful in 1m6s
- Included a new step in the test-pr.yaml workflow to set up FFmpeg, improving the project's media handling capabilities.
- This addition complements the existing build steps for Go and frontend assets, ensuring a more comprehensive build environment.
2026-01-02 17:48:46 -06:00
3f2982ddb3 Update Gitea workflow to include frontend build step and adjust Go build command
Some checks failed
PR Check / check-and-test (pull_request) Failing after 42s
- Added a step to install and build the frontend using npm in the test-pr.yaml workflow.
- Modified the Go build command to compile all packages instead of specifying the output binary location.
- This change improves the build process by integrating frontend assets with the backend build.
2026-01-02 17:46:03 -06:00
0b852c5087 Update Gitea workflow to specify output binary location for jiggablend build
Some checks failed
PR Check / check-and-test (pull_request) Failing after 8s
- Changed the build command in the test-pr.yaml workflow to output the jiggablend binary to the bin directory.
- This modification enhances the organization of build artifacts and aligns with project structure.
2026-01-02 17:40:34 -06:00
5e56c7f0e8 Implement file deletion after successful uploads in runner and encoding processes
Some checks failed
PR Check / check-and-test (pull_request) Failing after 9s
- Added logic to delete files after successful uploads in both runner and encode tasks to prevent duplicate uploads.
- Included logging for any errors encountered during file deletion to ensure visibility of issues.
2026-01-02 17:34:41 -06:00
7 changed files with 114 additions and 32 deletions

View File

@@ -10,6 +10,8 @@ jobs:
- uses: actions/setup-go@main - uses: actions/setup-go@main
with: with:
go-version-file: 'go.mod' go-version-file: 'go.mod'
- run: go mod tidy - uses: FedericoCarboni/setup-ffmpeg@v3
- run: go mod tidy
- run: cd web && npm install && npm run build
- run: go build ./... - run: go build ./...
- run: go test -race -v -shuffle=on ./... - run: go test -race -v -shuffle=on ./...

View File

@@ -944,7 +944,7 @@ func (s *Manager) handleCancelJob(w http.ResponseWriter, r *http.Request) {
// Cancel all pending tasks // Cancel all pending tasks
_, err = conn.Exec( _, err = conn.Exec(
`UPDATE tasks SET status = ? WHERE job_id = ? AND status = ?`, `UPDATE tasks SET status = ?, runner_id = NULL WHERE job_id = ? AND status = ?`,
types.TaskStatusFailed, jobID, types.TaskStatusPending, types.TaskStatusFailed, jobID, types.TaskStatusPending,
) )
return err return err
@@ -3024,9 +3024,6 @@ func (s *Manager) handleListJobTasks(w http.ResponseWriter, r *http.Request) {
return return
} }
defer rows.Close() defer rows.Close()
if err != nil {
total = -1
}
tasks := []types.Task{} tasks := []types.Task{}
for rows.Next() { for rows.Next() {

View File

@@ -89,6 +89,9 @@ type Manager struct {
// Throttling for task status updates (per task) // Throttling for task status updates (per task)
taskUpdateTimes map[int64]time.Time // key: taskID taskUpdateTimes map[int64]time.Time // key: taskID
taskUpdateTimesMu sync.RWMutex taskUpdateTimesMu sync.RWMutex
// Per-job mutexes to serialize updateJobStatusFromTasks calls and prevent race conditions
jobStatusUpdateMu map[int64]*sync.Mutex // key: jobID
jobStatusUpdateMuMu sync.RWMutex
// Client WebSocket connections (new unified WebSocket) // Client WebSocket connections (new unified WebSocket)
// Key is "userID:connID" to support multiple tabs per user // Key is "userID:connID" to support multiple tabs per user
@@ -162,6 +165,8 @@ func NewManager(db *database.DB, cfg *config.Config, auth *authpkg.Auth, storage
runnerJobConns: make(map[string]*websocket.Conn), runnerJobConns: make(map[string]*websocket.Conn),
runnerJobConnsWriteMu: make(map[string]*sync.Mutex), runnerJobConnsWriteMu: make(map[string]*sync.Mutex),
runnerJobConnsWriteMuMu: sync.RWMutex{}, // Initialize the new field runnerJobConnsWriteMuMu: sync.RWMutex{}, // Initialize the new field
// Per-job mutexes for serializing status updates
jobStatusUpdateMu: make(map[int64]*sync.Mutex),
} }
// Check for required external tools // Check for required external tools

View File

@@ -390,7 +390,7 @@ func (s *Manager) handleNextJob(w http.ResponseWriter, r *http.Request) {
t.condition t.condition
FROM tasks t FROM tasks t
JOIN jobs j ON t.job_id = j.id JOIN jobs j ON t.job_id = j.id
WHERE t.status = ? AND j.status != ? WHERE t.status = ? AND t.runner_id IS NULL AND j.status != ?
ORDER BY t.created_at ASC ORDER BY t.created_at ASC
LIMIT 50`, LIMIT 50`,
types.TaskStatusPending, types.JobStatusCancelled, types.TaskStatusPending, types.JobStatusCancelled,
@@ -1019,6 +1019,10 @@ func (s *Manager) handleGetJobStatusForRunner(w http.ResponseWriter, r *http.Req
&job.CreatedAt, &startedAt, &completedAt, &errorMessage, &job.CreatedAt, &startedAt, &completedAt, &errorMessage,
) )
}) })
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil { if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err)) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err))
return return
@@ -1037,15 +1041,6 @@ func (s *Manager) handleGetJobStatusForRunner(w http.ResponseWriter, r *http.Req
job.OutputFormat = &outputFormat.String job.OutputFormat = &outputFormat.String
} }
if err == sql.ErrNoRows {
s.respondError(w, http.StatusNotFound, "Job not found")
return
}
if err != nil {
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err))
return
}
if startedAt.Valid { if startedAt.Valid {
job.StartedAt = &startedAt.Time job.StartedAt = &startedAt.Time
} }
@@ -1368,7 +1363,7 @@ func (s *Manager) handleRunnerJobWebSocket(w http.ResponseWriter, r *http.Reques
log.Printf("Job WebSocket disconnected unexpectedly for task %d, marking as failed", taskID) log.Printf("Job WebSocket disconnected unexpectedly for task %d, marking as failed", taskID)
s.db.With(func(conn *sql.DB) error { s.db.With(func(conn *sql.DB) error {
_, err := conn.Exec( _, err := conn.Exec(
`UPDATE tasks SET status = ?, error_message = ?, completed_at = ? WHERE id = ?`, `UPDATE tasks SET status = ?, runner_id = NULL, error_message = ?, completed_at = ? WHERE id = ?`,
types.TaskStatusFailed, "WebSocket connection lost", time.Now(), taskID, types.TaskStatusFailed, "WebSocket connection lost", time.Now(), taskID,
) )
return err return err
@@ -1683,11 +1678,10 @@ func (s *Manager) handleWebSocketTaskComplete(runnerID int64, taskUpdate WSTaskU
} else { } else {
// No retries remaining - mark as failed // No retries remaining - mark as failed
err = s.db.WithTx(func(tx *sql.Tx) error { err = s.db.WithTx(func(tx *sql.Tx) error {
_, err := tx.Exec(`UPDATE tasks SET status = ? WHERE id = ?`, types.TaskStatusFailed, taskUpdate.TaskID) _, err := tx.Exec(
if err != nil { `UPDATE tasks SET status = ?, runner_id = NULL, completed_at = ? WHERE id = ?`,
return err types.TaskStatusFailed, now, taskUpdate.TaskID,
} )
_, err = tx.Exec(`UPDATE tasks SET completed_at = ? WHERE id = ?`, now, taskUpdate.TaskID)
if err != nil { if err != nil {
return err return err
} }
@@ -1854,7 +1848,7 @@ func (s *Manager) cancelActiveTasksForJob(jobID int64) error {
// Tasks don't have a cancelled status - mark them as failed instead // Tasks don't have a cancelled status - mark them as failed instead
err := s.db.With(func(conn *sql.DB) error { err := s.db.With(func(conn *sql.DB) error {
_, err := conn.Exec( _, err := conn.Exec(
`UPDATE tasks SET status = ?, error_message = ? WHERE job_id = ? AND status IN (?, ?)`, `UPDATE tasks SET status = ?, runner_id = NULL, error_message = ? WHERE job_id = ? AND status IN (?, ?)`,
types.TaskStatusFailed, "Job cancelled", jobID, types.TaskStatusPending, types.TaskStatusRunning, types.TaskStatusFailed, "Job cancelled", jobID, types.TaskStatusPending, types.TaskStatusRunning,
) )
if err != nil { if err != nil {
@@ -1920,8 +1914,37 @@ func (s *Manager) evaluateTaskCondition(taskID int64, jobID int64, conditionJSON
} }
} }
// getJobStatusUpdateMutex returns the mutex for a specific jobID, creating it if needed.
// This ensures serialized execution of updateJobStatusFromTasks per job to prevent race conditions.
func (s *Manager) getJobStatusUpdateMutex(jobID int64) *sync.Mutex {
s.jobStatusUpdateMuMu.Lock()
defer s.jobStatusUpdateMuMu.Unlock()
mu, exists := s.jobStatusUpdateMu[jobID]
if !exists {
mu = &sync.Mutex{}
s.jobStatusUpdateMu[jobID] = mu
}
return mu
}
// cleanupJobStatusUpdateMutex removes the mutex for a jobID after it's no longer needed.
// Should only be called when the job is in a final state (completed/failed) and no more updates are expected.
func (s *Manager) cleanupJobStatusUpdateMutex(jobID int64) {
s.jobStatusUpdateMuMu.Lock()
defer s.jobStatusUpdateMuMu.Unlock()
delete(s.jobStatusUpdateMu, jobID)
}
// updateJobStatusFromTasks updates job status and progress based on task states // updateJobStatusFromTasks updates job status and progress based on task states
// This function is serialized per jobID to prevent race conditions when multiple tasks
// complete concurrently and trigger status updates simultaneously.
func (s *Manager) updateJobStatusFromTasks(jobID int64) { func (s *Manager) updateJobStatusFromTasks(jobID int64) {
// Serialize updates per job to prevent race conditions
mu := s.getJobStatusUpdateMutex(jobID)
mu.Lock()
defer mu.Unlock()
now := time.Now() now := time.Now()
// All jobs now use parallel runners (one task per frame), so we always use task-based progress // All jobs now use parallel runners (one task per frame), so we always use task-based progress
@@ -2087,6 +2110,11 @@ func (s *Manager) updateJobStatusFromTasks(jobID int64) {
"progress": progress, "progress": progress,
"completed_at": now, "completed_at": now,
}) })
// Clean up mutex for jobs in final states (completed or failed)
// No more status updates will occur for these jobs
if jobStatus == string(types.JobStatusCompleted) || jobStatus == string(types.JobStatusFailed) {
s.cleanupJobStatusUpdateMutex(jobID)
}
} }
} }

View File

@@ -289,6 +289,10 @@ func (r *Runner) uploadOutputs(ctx *tasks.Context, job *api.NextJobResponse) err
log.Printf("Failed to upload %s: %v", filePath, err) log.Printf("Failed to upload %s: %v", filePath, err)
} else { } else {
ctx.OutputUploaded(entry.Name()) ctx.OutputUploaded(entry.Name())
// Delete file after successful upload to prevent duplicate uploads
if err := os.Remove(filePath); err != nil {
log.Printf("Warning: Failed to delete file %s after upload: %v", filePath, err)
}
} }
} }

View File

@@ -373,6 +373,12 @@ func (p *EncodeProcessor) Process(ctx *Context) error {
ctx.Info(fmt.Sprintf("Successfully uploaded %s: %s", strings.ToUpper(outputExt), filepath.Base(outputVideo))) ctx.Info(fmt.Sprintf("Successfully uploaded %s: %s", strings.ToUpper(outputExt), filepath.Base(outputVideo)))
// Delete file after successful upload to prevent duplicate uploads
if err := os.Remove(outputVideo); err != nil {
log.Printf("Warning: Failed to delete video file %s after upload: %v", outputVideo, err)
ctx.Warn(fmt.Sprintf("Warning: Failed to delete video file after upload: %v", err))
}
log.Printf("Successfully generated and uploaded %s for job %d: %s", strings.ToUpper(outputExt), ctx.JobID, filepath.Base(outputVideo)) log.Printf("Successfully generated and uploaded %s for job %d: %s", strings.ToUpper(outputExt), ctx.JobID, filepath.Base(outputVideo))
return nil return nil
} }

View File

@@ -111,6 +111,41 @@ export default function JobDetails({ job, onClose, onUpdate }) {
}); });
}, [taskData]); }, [taskData]);
// Helper function to load all files with pagination
const loadAllFiles = async (jobId, signal) => {
const allFiles = [];
let offset = 0;
const limit = 100; // Load 100 files per page
let hasMore = true;
while (hasMore && !signal?.aborted) {
const fileList = await jobs.getFiles(jobId, { limit, offset, signal });
// Check for superseded sentinel
if (fileList === REQUEST_SUPERSEDED) {
return REQUEST_SUPERSEDED;
}
const fileData = fileList?.data || fileList;
const files = Array.isArray(fileData) ? fileData : [];
allFiles.push(...files);
// Check if there are more files to load
const total = fileList?.total;
if (total !== undefined) {
hasMore = offset + files.length < total;
} else {
// If total is not provided, check if we got a full page (or more)
// Use >= to safely handle edge cases where API returns different amounts
hasMore = files.length >= limit;
}
offset += files.length;
}
return allFiles;
};
const loadDetails = async () => { const loadDetails = async () => {
// Guard against undefined job or job.id // Guard against undefined job or job.id
if (!job || !job.id) { if (!job || !job.id) {
@@ -122,9 +157,9 @@ export default function JobDetails({ job, onClose, onUpdate }) {
setLoading(true); setLoading(true);
// Use summary endpoint for tasks initially - much faster // Use summary endpoint for tasks initially - much faster
const signal = abortControllerRef.current?.signal; const signal = abortControllerRef.current?.signal;
const [details, fileList, taskListResult] = await Promise.all([ const [details, allFilesResult, taskListResult] = await Promise.all([
jobs.get(job.id, { signal }), jobs.get(job.id, { signal }),
jobs.getFiles(job.id, { limit: 50, signal }), // Only load first page of files loadAllFiles(job.id, signal), // Load all files with pagination
jobs.getTasksSummary(job.id, { sort: 'frame:asc', signal }), // Get all tasks jobs.getTasksSummary(job.id, { sort: 'frame:asc', signal }), // Get all tasks
]); ]);
@@ -135,11 +170,10 @@ export default function JobDetails({ job, onClose, onUpdate }) {
setJobDetails(details); setJobDetails(details);
// Handle paginated file response - check for superseded sentinel // Handle paginated file response - check for superseded sentinel
if (fileList === REQUEST_SUPERSEDED) { if (allFilesResult === REQUEST_SUPERSEDED) {
return; // Request was superseded, skip this update return; // Request was superseded, skip this update
} }
const fileData = fileList?.data || fileList; setFiles(Array.isArray(allFilesResult) ? allFilesResult : []);
setFiles(Array.isArray(fileData) ? fileData : []);
// Handle paginated task summary response - check for superseded sentinel // Handle paginated task summary response - check for superseded sentinel
if (taskListResult === REQUEST_SUPERSEDED) { if (taskListResult === REQUEST_SUPERSEDED) {
@@ -617,16 +651,22 @@ export default function JobDetails({ job, onClose, onUpdate }) {
}; };
reloadTasks(); reloadTasks();
} else if (data.type === 'file_added' && data.data) { } else if (data.type === 'file_added' && data.data) {
// New file was added - reload file list // New file was added - reload all files
const reloadFiles = async () => { const reloadFiles = async () => {
try { try {
const fileList = await jobs.getFiles(job.id, { limit: 50 }); const signal = abortControllerRef.current?.signal;
const allFilesResult = await loadAllFiles(job.id, signal);
// Check if request was aborted
if (signal?.aborted) {
return;
}
// Check for superseded sentinel // Check for superseded sentinel
if (fileList === REQUEST_SUPERSEDED) { if (allFilesResult === REQUEST_SUPERSEDED) {
return; // Request was superseded, skip this update return; // Request was superseded, skip this update
} }
const fileData = fileList.data || fileList; setFiles(Array.isArray(allFilesResult) ? allFilesResult : []);
setFiles(Array.isArray(fileData) ? fileData : []);
} catch (error) { } catch (error) {
console.error('Failed to reload files:', error); console.error('Failed to reload files:', error);
} }