Compare commits
2 Commits
1a8836e6aa
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| d3c5ee0dba | |||
| bb57ce8659 |
@@ -944,7 +944,7 @@ func (s *Manager) handleCancelJob(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Cancel all pending tasks
|
||||
_, err = conn.Exec(
|
||||
`UPDATE tasks SET status = ? WHERE job_id = ? AND status = ?`,
|
||||
`UPDATE tasks SET status = ?, runner_id = NULL WHERE job_id = ? AND status = ?`,
|
||||
types.TaskStatusFailed, jobID, types.TaskStatusPending,
|
||||
)
|
||||
return err
|
||||
|
||||
@@ -390,7 +390,7 @@ func (s *Manager) handleNextJob(w http.ResponseWriter, r *http.Request) {
|
||||
t.condition
|
||||
FROM tasks t
|
||||
JOIN jobs j ON t.job_id = j.id
|
||||
WHERE t.status = ? AND j.status != ?
|
||||
WHERE t.status = ? AND t.runner_id IS NULL AND j.status != ?
|
||||
ORDER BY t.created_at ASC
|
||||
LIMIT 50`,
|
||||
types.TaskStatusPending, types.JobStatusCancelled,
|
||||
@@ -1363,7 +1363,7 @@ func (s *Manager) handleRunnerJobWebSocket(w http.ResponseWriter, r *http.Reques
|
||||
log.Printf("Job WebSocket disconnected unexpectedly for task %d, marking as failed", taskID)
|
||||
s.db.With(func(conn *sql.DB) error {
|
||||
_, err := conn.Exec(
|
||||
`UPDATE tasks SET status = ?, error_message = ?, completed_at = ? WHERE id = ?`,
|
||||
`UPDATE tasks SET status = ?, runner_id = NULL, error_message = ?, completed_at = ? WHERE id = ?`,
|
||||
types.TaskStatusFailed, "WebSocket connection lost", time.Now(), taskID,
|
||||
)
|
||||
return err
|
||||
@@ -1678,11 +1678,10 @@ func (s *Manager) handleWebSocketTaskComplete(runnerID int64, taskUpdate WSTaskU
|
||||
} else {
|
||||
// No retries remaining - mark as failed
|
||||
err = s.db.WithTx(func(tx *sql.Tx) error {
|
||||
_, err := tx.Exec(`UPDATE tasks SET status = ? WHERE id = ?`, types.TaskStatusFailed, taskUpdate.TaskID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = tx.Exec(`UPDATE tasks SET completed_at = ? WHERE id = ?`, now, taskUpdate.TaskID)
|
||||
_, err := tx.Exec(
|
||||
`UPDATE tasks SET status = ?, runner_id = NULL, completed_at = ? WHERE id = ?`,
|
||||
types.TaskStatusFailed, now, taskUpdate.TaskID,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1849,7 +1848,7 @@ func (s *Manager) cancelActiveTasksForJob(jobID int64) error {
|
||||
// Tasks don't have a cancelled status - mark them as failed instead
|
||||
err := s.db.With(func(conn *sql.DB) error {
|
||||
_, err := conn.Exec(
|
||||
`UPDATE tasks SET status = ?, error_message = ? WHERE job_id = ? AND status IN (?, ?)`,
|
||||
`UPDATE tasks SET status = ?, runner_id = NULL, error_message = ? WHERE job_id = ? AND status IN (?, ?)`,
|
||||
types.TaskStatusFailed, "Job cancelled", jobID, types.TaskStatusPending, types.TaskStatusRunning,
|
||||
)
|
||||
if err != nil {
|
||||
|
||||
@@ -111,6 +111,41 @@ export default function JobDetails({ job, onClose, onUpdate }) {
|
||||
});
|
||||
}, [taskData]);
|
||||
|
||||
// Helper function to load all files with pagination
|
||||
const loadAllFiles = async (jobId, signal) => {
|
||||
const allFiles = [];
|
||||
let offset = 0;
|
||||
const limit = 100; // Load 100 files per page
|
||||
let hasMore = true;
|
||||
|
||||
while (hasMore && !signal?.aborted) {
|
||||
const fileList = await jobs.getFiles(jobId, { limit, offset, signal });
|
||||
|
||||
// Check for superseded sentinel
|
||||
if (fileList === REQUEST_SUPERSEDED) {
|
||||
return REQUEST_SUPERSEDED;
|
||||
}
|
||||
|
||||
const fileData = fileList?.data || fileList;
|
||||
const files = Array.isArray(fileData) ? fileData : [];
|
||||
allFiles.push(...files);
|
||||
|
||||
// Check if there are more files to load
|
||||
const total = fileList?.total;
|
||||
if (total !== undefined) {
|
||||
hasMore = offset + files.length < total;
|
||||
} else {
|
||||
// If total is not provided, check if we got a full page (or more)
|
||||
// Use >= to safely handle edge cases where API returns different amounts
|
||||
hasMore = files.length >= limit;
|
||||
}
|
||||
|
||||
offset += files.length;
|
||||
}
|
||||
|
||||
return allFiles;
|
||||
};
|
||||
|
||||
const loadDetails = async () => {
|
||||
// Guard against undefined job or job.id
|
||||
if (!job || !job.id) {
|
||||
@@ -122,9 +157,9 @@ export default function JobDetails({ job, onClose, onUpdate }) {
|
||||
setLoading(true);
|
||||
// Use summary endpoint for tasks initially - much faster
|
||||
const signal = abortControllerRef.current?.signal;
|
||||
const [details, fileList, taskListResult] = await Promise.all([
|
||||
const [details, allFilesResult, taskListResult] = await Promise.all([
|
||||
jobs.get(job.id, { signal }),
|
||||
jobs.getFiles(job.id, { limit: 50, signal }), // Only load first page of files
|
||||
loadAllFiles(job.id, signal), // Load all files with pagination
|
||||
jobs.getTasksSummary(job.id, { sort: 'frame:asc', signal }), // Get all tasks
|
||||
]);
|
||||
|
||||
@@ -135,11 +170,10 @@ export default function JobDetails({ job, onClose, onUpdate }) {
|
||||
setJobDetails(details);
|
||||
|
||||
// Handle paginated file response - check for superseded sentinel
|
||||
if (fileList === REQUEST_SUPERSEDED) {
|
||||
if (allFilesResult === REQUEST_SUPERSEDED) {
|
||||
return; // Request was superseded, skip this update
|
||||
}
|
||||
const fileData = fileList?.data || fileList;
|
||||
setFiles(Array.isArray(fileData) ? fileData : []);
|
||||
setFiles(Array.isArray(allFilesResult) ? allFilesResult : []);
|
||||
|
||||
// Handle paginated task summary response - check for superseded sentinel
|
||||
if (taskListResult === REQUEST_SUPERSEDED) {
|
||||
@@ -617,16 +651,22 @@ export default function JobDetails({ job, onClose, onUpdate }) {
|
||||
};
|
||||
reloadTasks();
|
||||
} else if (data.type === 'file_added' && data.data) {
|
||||
// New file was added - reload file list
|
||||
// New file was added - reload all files
|
||||
const reloadFiles = async () => {
|
||||
try {
|
||||
const fileList = await jobs.getFiles(job.id, { limit: 50 });
|
||||
const signal = abortControllerRef.current?.signal;
|
||||
const allFilesResult = await loadAllFiles(job.id, signal);
|
||||
|
||||
// Check if request was aborted
|
||||
if (signal?.aborted) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Check for superseded sentinel
|
||||
if (fileList === REQUEST_SUPERSEDED) {
|
||||
if (allFilesResult === REQUEST_SUPERSEDED) {
|
||||
return; // Request was superseded, skip this update
|
||||
}
|
||||
const fileData = fileList.data || fileList;
|
||||
setFiles(Array.isArray(fileData) ? fileData : []);
|
||||
setFiles(Array.isArray(allFilesResult) ? allFilesResult : []);
|
||||
} catch (error) {
|
||||
console.error('Failed to reload files:', error);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user