Compare commits
7 Commits
0.0.1
...
1a8836e6aa
| Author | SHA1 | Date | |
|---|---|---|---|
| 1a8836e6aa | |||
| b51b96a618 | |||
| 8e561922c9 | |||
| 1c4bd78f56 | |||
| 3f2982ddb3 | |||
| 0b852c5087 | |||
| 5e56c7f0e8 |
@@ -10,6 +10,8 @@ jobs:
|
||||
- uses: actions/setup-go@main
|
||||
with:
|
||||
go-version-file: 'go.mod'
|
||||
- run: go mod tidy
|
||||
- uses: FedericoCarboni/setup-ffmpeg@v3
|
||||
- run: go mod tidy
|
||||
- run: cd web && npm install && npm run build
|
||||
- run: go build ./...
|
||||
- run: go test -race -v -shuffle=on ./...
|
||||
@@ -3024,9 +3024,6 @@ func (s *Manager) handleListJobTasks(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
if err != nil {
|
||||
total = -1
|
||||
}
|
||||
|
||||
tasks := []types.Task{}
|
||||
for rows.Next() {
|
||||
|
||||
@@ -89,6 +89,9 @@ type Manager struct {
|
||||
// Throttling for task status updates (per task)
|
||||
taskUpdateTimes map[int64]time.Time // key: taskID
|
||||
taskUpdateTimesMu sync.RWMutex
|
||||
// Per-job mutexes to serialize updateJobStatusFromTasks calls and prevent race conditions
|
||||
jobStatusUpdateMu map[int64]*sync.Mutex // key: jobID
|
||||
jobStatusUpdateMuMu sync.RWMutex
|
||||
|
||||
// Client WebSocket connections (new unified WebSocket)
|
||||
// Key is "userID:connID" to support multiple tabs per user
|
||||
@@ -162,6 +165,8 @@ func NewManager(db *database.DB, cfg *config.Config, auth *authpkg.Auth, storage
|
||||
runnerJobConns: make(map[string]*websocket.Conn),
|
||||
runnerJobConnsWriteMu: make(map[string]*sync.Mutex),
|
||||
runnerJobConnsWriteMuMu: sync.RWMutex{}, // Initialize the new field
|
||||
// Per-job mutexes for serializing status updates
|
||||
jobStatusUpdateMu: make(map[int64]*sync.Mutex),
|
||||
}
|
||||
|
||||
// Check for required external tools
|
||||
|
||||
@@ -1019,6 +1019,10 @@ func (s *Manager) handleGetJobStatusForRunner(w http.ResponseWriter, r *http.Req
|
||||
&job.CreatedAt, &startedAt, &completedAt, &errorMessage,
|
||||
)
|
||||
})
|
||||
if err == sql.ErrNoRows {
|
||||
s.respondError(w, http.StatusNotFound, "Job not found")
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err))
|
||||
return
|
||||
@@ -1037,15 +1041,6 @@ func (s *Manager) handleGetJobStatusForRunner(w http.ResponseWriter, r *http.Req
|
||||
job.OutputFormat = &outputFormat.String
|
||||
}
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
s.respondError(w, http.StatusNotFound, "Job not found")
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
if startedAt.Valid {
|
||||
job.StartedAt = &startedAt.Time
|
||||
}
|
||||
@@ -1920,8 +1915,37 @@ func (s *Manager) evaluateTaskCondition(taskID int64, jobID int64, conditionJSON
|
||||
}
|
||||
}
|
||||
|
||||
// getJobStatusUpdateMutex returns the mutex for a specific jobID, creating it if needed.
|
||||
// This ensures serialized execution of updateJobStatusFromTasks per job to prevent race conditions.
|
||||
func (s *Manager) getJobStatusUpdateMutex(jobID int64) *sync.Mutex {
|
||||
s.jobStatusUpdateMuMu.Lock()
|
||||
defer s.jobStatusUpdateMuMu.Unlock()
|
||||
|
||||
mu, exists := s.jobStatusUpdateMu[jobID]
|
||||
if !exists {
|
||||
mu = &sync.Mutex{}
|
||||
s.jobStatusUpdateMu[jobID] = mu
|
||||
}
|
||||
return mu
|
||||
}
|
||||
|
||||
// cleanupJobStatusUpdateMutex removes the mutex for a jobID after it's no longer needed.
|
||||
// Should only be called when the job is in a final state (completed/failed) and no more updates are expected.
|
||||
func (s *Manager) cleanupJobStatusUpdateMutex(jobID int64) {
|
||||
s.jobStatusUpdateMuMu.Lock()
|
||||
defer s.jobStatusUpdateMuMu.Unlock()
|
||||
delete(s.jobStatusUpdateMu, jobID)
|
||||
}
|
||||
|
||||
// updateJobStatusFromTasks updates job status and progress based on task states
|
||||
// This function is serialized per jobID to prevent race conditions when multiple tasks
|
||||
// complete concurrently and trigger status updates simultaneously.
|
||||
func (s *Manager) updateJobStatusFromTasks(jobID int64) {
|
||||
// Serialize updates per job to prevent race conditions
|
||||
mu := s.getJobStatusUpdateMutex(jobID)
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
|
||||
// All jobs now use parallel runners (one task per frame), so we always use task-based progress
|
||||
@@ -2087,6 +2111,11 @@ func (s *Manager) updateJobStatusFromTasks(jobID int64) {
|
||||
"progress": progress,
|
||||
"completed_at": now,
|
||||
})
|
||||
// Clean up mutex for jobs in final states (completed or failed)
|
||||
// No more status updates will occur for these jobs
|
||||
if jobStatus == string(types.JobStatusCompleted) || jobStatus == string(types.JobStatusFailed) {
|
||||
s.cleanupJobStatusUpdateMutex(jobID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -289,6 +289,10 @@ func (r *Runner) uploadOutputs(ctx *tasks.Context, job *api.NextJobResponse) err
|
||||
log.Printf("Failed to upload %s: %v", filePath, err)
|
||||
} else {
|
||||
ctx.OutputUploaded(entry.Name())
|
||||
// Delete file after successful upload to prevent duplicate uploads
|
||||
if err := os.Remove(filePath); err != nil {
|
||||
log.Printf("Warning: Failed to delete file %s after upload: %v", filePath, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -373,6 +373,12 @@ func (p *EncodeProcessor) Process(ctx *Context) error {
|
||||
|
||||
ctx.Info(fmt.Sprintf("Successfully uploaded %s: %s", strings.ToUpper(outputExt), filepath.Base(outputVideo)))
|
||||
|
||||
// Delete file after successful upload to prevent duplicate uploads
|
||||
if err := os.Remove(outputVideo); err != nil {
|
||||
log.Printf("Warning: Failed to delete video file %s after upload: %v", outputVideo, err)
|
||||
ctx.Warn(fmt.Sprintf("Warning: Failed to delete video file after upload: %v", err))
|
||||
}
|
||||
|
||||
log.Printf("Successfully generated and uploaded %s for job %d: %s", strings.ToUpper(outputExt), ctx.JobID, filepath.Base(outputVideo))
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user