package api import ( "archive/tar" "crypto/md5" "database/sql" _ "embed" "encoding/hex" "encoding/json" "errors" "fmt" "io" "log" "mime/multipart" "net/http" "os" "path/filepath" "strconv" "strings" "sync" "sync/atomic" "time" authpkg "jiggablend/internal/auth" "jiggablend/pkg/executils" "jiggablend/pkg/scripts" "jiggablend/pkg/types" "github.com/gorilla/websocket" ) // generateETag generates an ETag from data hash func generateETag(data interface{}) string { jsonData, err := json.Marshal(data) if err != nil { return "" } hash := md5.Sum(jsonData) return fmt.Sprintf(`"%s"`, hex.EncodeToString(hash[:])) } // checkETag checks if the request has If-None-Match header matching the ETag func checkETag(r *http.Request, etag string) bool { ifNoneMatch := r.Header.Get("If-None-Match") return ifNoneMatch != "" && ifNoneMatch == etag } // isAdminUser checks if the current user is an admin func isAdminUser(r *http.Request) bool { return authpkg.IsAdmin(r.Context()) } // handleCreateJob creates a new job func (s *Manager) handleCreateJob(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } var req types.CreateJobRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } // Validate job type - only render jobs are supported now if req.JobType != types.JobTypeRender { s.respondError(w, http.StatusBadRequest, "Invalid job_type: only 'render' jobs are supported") return } if req.Name == "" { s.respondError(w, http.StatusBadRequest, "Job name is required") return } // Validate render job requirements if req.JobType == types.JobTypeRender { if req.FrameStart == nil || req.FrameEnd == nil { s.respondError(w, http.StatusBadRequest, "frame_start and frame_end are required for render jobs") return } if *req.FrameStart < 0 { s.respondError(w, http.StatusBadRequest, "frame_start must be 0 or greater. Negative starting frames are not supported.") return } if *req.FrameEnd < 0 { s.respondError(w, http.StatusBadRequest, "frame_end must be 0 or greater. Negative frame numbers are not supported.") return } if *req.FrameEnd < *req.FrameStart { s.respondError(w, http.StatusBadRequest, "Invalid frame range") return } // Validate frame range limits (prevent abuse) const maxFrameRange = 10000 if *req.FrameEnd-*req.FrameStart+1 > maxFrameRange { s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Frame range too large. Maximum allowed: %d frames", maxFrameRange)) return } if req.OutputFormat == nil || *req.OutputFormat == "" { defaultFormat := "PNG" req.OutputFormat = &defaultFormat } } // Store render settings, unhide_objects, enable_execution, blender_version, preserve_hdr, and preserve_alpha flags in blend_metadata if provided // Always include output_format in metadata so tasks can access it var blendMetadataJSON *string if req.RenderSettings != nil || req.UnhideObjects != nil || req.EnableExecution != nil || req.BlenderVersion != nil || req.OutputFormat != nil || req.PreserveHDR != nil || req.PreserveAlpha != nil { metadata := types.BlendMetadata{ FrameStart: *req.FrameStart, FrameEnd: *req.FrameEnd, RenderSettings: types.RenderSettings{}, UnhideObjects: req.UnhideObjects, EnableExecution: req.EnableExecution, PreserveHDR: req.PreserveHDR, PreserveAlpha: req.PreserveAlpha, } if req.RenderSettings != nil { metadata.RenderSettings = *req.RenderSettings } // Always set output_format in metadata from job's output_format field if req.OutputFormat != nil { metadata.RenderSettings.OutputFormat = *req.OutputFormat } if req.BlenderVersion != nil { metadata.BlenderVersion = *req.BlenderVersion } metadataBytes, err := json.Marshal(metadata) if err == nil { metadataStr := string(metadataBytes) blendMetadataJSON = &metadataStr } } log.Printf("Creating render job with output_format: '%s' (from user selection)", *req.OutputFormat) var jobID int64 err = s.db.With(func(conn *sql.DB) error { result, err := conn.Exec( `INSERT INTO jobs (user_id, job_type, name, status, progress, frame_start, frame_end, output_format, blend_metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, userID, req.JobType, req.Name, types.JobStatusPending, 0.0, *req.FrameStart, *req.FrameEnd, *req.OutputFormat, blendMetadataJSON, ) if err != nil { return err } jobID, err = result.LastInsertId() return err }) if err == nil { log.Printf("Created render job %d with output_format: '%s'", jobID, *req.OutputFormat) } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create job: %v", err)) return } // If upload session ID is provided, move the context archive from temp to job directory if req.UploadSessionID != nil && *req.UploadSessionID != "" { log.Printf("Processing upload session for job %d: %s", jobID, *req.UploadSessionID) // Session ID is the full temp directory path tempDir := *req.UploadSessionID tempContextPath := filepath.Join(tempDir, "context.tar") if _, err := os.Stat(tempContextPath); err == nil { log.Printf("Found context archive at %s, moving to job %d directory", tempContextPath, jobID) // Move context to job directory jobPath := s.storage.JobPath(jobID) if err := os.MkdirAll(jobPath, 0755); err != nil { log.Printf("ERROR: Failed to create job directory for job %d: %v", jobID, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create job directory: %v", err)) return } jobContextPath := filepath.Join(jobPath, "context.tar") // Copy file instead of rename (works across filesystems) srcFile, err := os.Open(tempContextPath) if err != nil { log.Printf("ERROR: Failed to open source context archive %s: %v", tempContextPath, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to open context archive: %v", err)) return } defer srcFile.Close() dstFile, err := os.Create(jobContextPath) if err != nil { log.Printf("ERROR: Failed to create destination context archive %s: %v", jobContextPath, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create context archive: %v", err)) return } defer dstFile.Close() _, err = io.Copy(dstFile, srcFile) if err != nil { dstFile.Close() os.Remove(jobContextPath) // Clean up partial file log.Printf("ERROR: Failed to copy context archive from %s to %s: %v", tempContextPath, jobContextPath, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to copy context archive: %v", err)) return } // Close files before deleting source srcFile.Close() if err := dstFile.Close(); err != nil { log.Printf("ERROR: Failed to close destination file: %v", err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to finalize context archive: %v", err)) return } // Delete source file after successful copy if err := os.Remove(tempContextPath); err != nil { log.Printf("Warning: Failed to remove source context archive %s: %v", tempContextPath, err) // Don't fail the operation if cleanup fails } log.Printf("Successfully copied context archive to %s", jobContextPath) // Record context archive in database contextInfo, err := os.Stat(jobContextPath) if err != nil { log.Printf("ERROR: Failed to stat context archive after move: %v", err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify context archive: %v", err)) return } var fileID int64 err = s.db.With(func(conn *sql.DB) error { result, err := conn.Exec( `INSERT INTO job_files (job_id, file_type, file_path, file_name, file_size) VALUES (?, ?, ?, ?, ?)`, jobID, types.JobFileTypeInput, jobContextPath, filepath.Base(jobContextPath), contextInfo.Size(), ) if err != nil { return err } fileID, err = result.LastInsertId() return err }) if err != nil { log.Printf("ERROR: Failed to record context archive in database for job %d: %v", jobID, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to record context archive: %v", err)) return } log.Printf("Successfully recorded context archive in database for job %d (file ID: %d, size: %d bytes)", jobID, fileID, contextInfo.Size()) // Broadcast file addition s.broadcastJobUpdate(jobID, "file_added", map[string]interface{}{ "file_id": fileID, "file_type": types.JobFileTypeInput, "file_name": filepath.Base(jobContextPath), "file_size": contextInfo.Size(), }) // Clean up temp directory if err := os.RemoveAll(tempDir); err != nil { log.Printf("Warning: Failed to clean up temp directory %s: %v", tempDir, err) } } else { log.Printf("ERROR: Context archive not found at %s for session %s: %v", tempContextPath, *req.UploadSessionID, err) s.respondError(w, http.StatusBadRequest, "Context archive not found for upload session. Please upload the file again.") return } } else { log.Printf("Warning: No upload session ID provided for job %d - job created without input files", jobID) } // Only create render tasks for render jobs if req.JobType == types.JobTypeRender { // Determine task timeout based on output format taskTimeout := RenderTimeout // 1 hour for render jobs if *req.OutputFormat == "EXR_264_MP4" || *req.OutputFormat == "EXR_AV1_MP4" || *req.OutputFormat == "EXR_VP9_WEBM" { taskTimeout = VideoEncodeTimeout // 24 hours for encoding } // Create tasks for the job // Create one task per frame (all tasks are single-frame) var createdTaskIDs []int64 for frame := *req.FrameStart; frame <= *req.FrameEnd; frame++ { var taskID int64 err = s.db.With(func(conn *sql.DB) error { result, err := conn.Exec( `INSERT INTO tasks (job_id, frame, task_type, status, timeout_seconds, max_retries) VALUES (?, ?, ?, ?, ?, ?)`, jobID, frame, types.TaskTypeRender, types.TaskStatusPending, taskTimeout, 3, ) if err != nil { return err } taskID, err = result.LastInsertId() return err }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create tasks: %v", err)) return } createdTaskIDs = append(createdTaskIDs, taskID) } log.Printf("Created %d render tasks for job %d (frames %d-%d)", *req.FrameEnd-*req.FrameStart+1, jobID, *req.FrameStart, *req.FrameEnd) // Create encode task immediately if output format requires it // The task will have a condition that prevents it from being assigned until all render tasks are completed if *req.OutputFormat == "EXR_264_MP4" || *req.OutputFormat == "EXR_AV1_MP4" || *req.OutputFormat == "EXR_VP9_WEBM" { encodeTaskTimeout := VideoEncodeTimeout // 24 hours for encoding conditionJSON := `{"type": "all_render_tasks_completed"}` var encodeTaskID int64 err = s.db.With(func(conn *sql.DB) error { result, err := conn.Exec( `INSERT INTO tasks (job_id, frame, task_type, status, timeout_seconds, max_retries, condition) VALUES (?, ?, ?, ?, ?, ?, ?)`, jobID, 0, types.TaskTypeEncode, types.TaskStatusPending, encodeTaskTimeout, 1, conditionJSON, ) if err != nil { return err } encodeTaskID, err = result.LastInsertId() return err }) if err != nil { log.Printf("Failed to create encode task for job %d: %v", jobID, err) // Don't fail the job creation if encode task creation fails } else { createdTaskIDs = append(createdTaskIDs, encodeTaskID) log.Printf("Created encode task %d for job %d (with condition: all render tasks must be completed)", encodeTaskID, jobID) } } // Update job status (should be pending since tasks are pending) s.updateJobStatusFromTasks(jobID) // Broadcast that new tasks were added if len(createdTaskIDs) > 0 { log.Printf("Broadcasting tasks_added for job %d: %d tasks", jobID, len(createdTaskIDs)) s.broadcastTaskUpdate(jobID, 0, "tasks_added", map[string]interface{}{ "task_ids": createdTaskIDs, "count": len(createdTaskIDs), }) } } // Build response job object job := types.Job{ ID: jobID, UserID: userID, JobType: req.JobType, Name: req.Name, Status: types.JobStatusPending, Progress: 0.0, CreatedAt: time.Now(), } if req.JobType == types.JobTypeRender { job.FrameStart = req.FrameStart job.FrameEnd = req.FrameEnd job.OutputFormat = req.OutputFormat } // Broadcast job_created to all clients via jobs channel s.broadcastToAllClients("jobs", map[string]interface{}{ "type": "job_created", "job_id": jobID, "data": map[string]interface{}{ "id": jobID, "name": req.Name, "status": types.JobStatusPending, "progress": 0.0, "frame_start": *req.FrameStart, "frame_end": *req.FrameEnd, "output_format": *req.OutputFormat, "created_at": time.Now(), }, "timestamp": time.Now().Unix(), }) s.respondJSON(w, http.StatusCreated, job) } // handleListJobs lists jobs for the current user with pagination and filtering func (s *Manager) handleListJobs(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } // Parse query parameters limit := 50 // default if limitStr := r.URL.Query().Get("limit"); limitStr != "" { if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 { limit = l } } offset := 0 if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" { if o, err := strconv.Atoi(offsetStr); err == nil && o >= 0 { offset = o } } statusFilter := r.URL.Query().Get("status") sortBy := r.URL.Query().Get("sort") if sortBy == "" { sortBy = "created_at:desc" } // Parse sort parameter (format: "field:direction") sortParts := strings.Split(sortBy, ":") sortField := "created_at" sortDir := "DESC" if len(sortParts) == 2 { sortField = sortParts[0] sortDir = strings.ToUpper(sortParts[1]) if sortDir != "ASC" && sortDir != "DESC" { sortDir = "DESC" } // Validate sort field validFields := map[string]bool{ "created_at": true, "started_at": true, "completed_at": true, "status": true, "progress": true, "name": true, } if !validFields[sortField] { sortField = "created_at" } } // Build query with filters query := `SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format, blend_metadata, created_at, started_at, completed_at, error_message FROM jobs WHERE user_id = ?` args := []interface{}{userID} if statusFilter != "" { // Support multiple statuses: "running,pending" or single "running" statuses := strings.Split(statusFilter, ",") placeholders := make([]string, len(statuses)) for i, status := range statuses { placeholders[i] = "?" args = append(args, strings.TrimSpace(status)) } query += fmt.Sprintf(" AND status IN (%s)", strings.Join(placeholders, ",")) } query += fmt.Sprintf(" ORDER BY %s %s LIMIT ? OFFSET ?", sortField, sortDir) args = append(args, limit, offset) var rows *sql.Rows var total int err = s.db.With(func(conn *sql.DB) error { var err error rows, err = conn.Query(query, args...) if err != nil { return err } // Get total count for pagination metadata countQuery := `SELECT COUNT(*) FROM jobs WHERE user_id = ?` countArgs := []interface{}{userID} if statusFilter != "" { statuses := strings.Split(statusFilter, ",") placeholders := make([]string, len(statuses)) for i, status := range statuses { placeholders[i] = "?" countArgs = append(countArgs, strings.TrimSpace(status)) } countQuery += fmt.Sprintf(" AND status IN (%s)", strings.Join(placeholders, ",")) } err = conn.QueryRow(countQuery, countArgs...).Scan(&total) if err != nil { // If count fails, continue without it total = -1 } return nil }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query jobs: %v", err)) return } defer rows.Close() jobs := []types.Job{} for rows.Next() { var job types.Job var jobType string var startedAt, completedAt sql.NullTime var blendMetadataJSON sql.NullString var errorMessage sql.NullString var frameStart, frameEnd sql.NullInt64 var outputFormat sql.NullString err := rows.Scan( &job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress, &frameStart, &frameEnd, &outputFormat, &blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &errorMessage, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan job: %v", err)) return } job.JobType = types.JobType(jobType) if frameStart.Valid { fs := int(frameStart.Int64) job.FrameStart = &fs } if frameEnd.Valid { fe := int(frameEnd.Int64) job.FrameEnd = &fe } if outputFormat.Valid { job.OutputFormat = &outputFormat.String } if startedAt.Valid { job.StartedAt = &startedAt.Time } if completedAt.Valid { job.CompletedAt = &completedAt.Time } if blendMetadataJSON.Valid && blendMetadataJSON.String != "" { var metadata types.BlendMetadata if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err == nil { job.BlendMetadata = &metadata } } if errorMessage.Valid { job.ErrorMessage = errorMessage.String } jobs = append(jobs, job) } // Generate ETag and check If-None-Match response := map[string]interface{}{ "data": jobs, "total": total, "limit": limit, "offset": offset, } etag := generateETag(response) w.Header().Set("ETag", etag) if checkETag(r, etag) { w.WriteHeader(http.StatusNotModified) return } s.respondJSON(w, http.StatusOK, response) } // handleListJobsSummary lists lightweight job summaries for the current user func (s *Manager) handleListJobsSummary(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } // Parse query parameters (same as handleListJobs) limit := 50 if limitStr := r.URL.Query().Get("limit"); limitStr != "" { if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 { limit = l } } offset := 0 if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" { if o, err := strconv.Atoi(offsetStr); err == nil && o >= 0 { offset = o } } statusFilter := r.URL.Query().Get("status") sortBy := r.URL.Query().Get("sort") if sortBy == "" { sortBy = "created_at:desc" } sortParts := strings.Split(sortBy, ":") sortField := "created_at" sortDir := "DESC" if len(sortParts) == 2 { sortField = sortParts[0] sortDir = strings.ToUpper(sortParts[1]) if sortDir != "ASC" && sortDir != "DESC" { sortDir = "DESC" } validFields := map[string]bool{ "created_at": true, "started_at": true, "completed_at": true, "status": true, "progress": true, "name": true, } if !validFields[sortField] { sortField = "created_at" } } // Build query - only select summary fields query := `SELECT id, name, status, progress, frame_start, frame_end, output_format, created_at FROM jobs WHERE user_id = ?` args := []interface{}{userID} if statusFilter != "" { statuses := strings.Split(statusFilter, ",") placeholders := make([]string, len(statuses)) for i, status := range statuses { placeholders[i] = "?" args = append(args, strings.TrimSpace(status)) } query += fmt.Sprintf(" AND status IN (%s)", strings.Join(placeholders, ",")) } query += fmt.Sprintf(" ORDER BY %s %s LIMIT ? OFFSET ?", sortField, sortDir) args = append(args, limit, offset) var rows *sql.Rows var total int err = s.db.With(func(conn *sql.DB) error { var err error rows, err = conn.Query(query, args...) if err != nil { return err } // Get total count countQuery := `SELECT COUNT(*) FROM jobs WHERE user_id = ?` countArgs := []interface{}{userID} if statusFilter != "" { statuses := strings.Split(statusFilter, ",") placeholders := make([]string, len(statuses)) for i, status := range statuses { placeholders[i] = "?" countArgs = append(countArgs, strings.TrimSpace(status)) } countQuery += fmt.Sprintf(" AND status IN (%s)", strings.Join(placeholders, ",")) } err = conn.QueryRow(countQuery, countArgs...).Scan(&total) return err }) if err != nil { total = -1 } type JobSummary struct { ID int64 `json:"id"` Name string `json:"name"` Status string `json:"status"` Progress float64 `json:"progress"` FrameStart *int `json:"frame_start,omitempty"` FrameEnd *int `json:"frame_end,omitempty"` OutputFormat *string `json:"output_format,omitempty"` CreatedAt time.Time `json:"created_at"` } summaries := []JobSummary{} for rows.Next() { var summary JobSummary var frameStart, frameEnd sql.NullInt64 var outputFormat sql.NullString err := rows.Scan( &summary.ID, &summary.Name, &summary.Status, &summary.Progress, &frameStart, &frameEnd, &outputFormat, &summary.CreatedAt, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan job: %v", err)) return } if frameStart.Valid { fs := int(frameStart.Int64) summary.FrameStart = &fs } if frameEnd.Valid { fe := int(frameEnd.Int64) summary.FrameEnd = &fe } if outputFormat.Valid { summary.OutputFormat = &outputFormat.String } summaries = append(summaries, summary) } response := map[string]interface{}{ "data": summaries, "total": total, "limit": limit, "offset": offset, } s.respondJSON(w, http.StatusOK, response) } // handleBatchGetJobs fetches multiple jobs by IDs func (s *Manager) handleBatchGetJobs(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } var req struct { JobIDs []int64 `json:"job_ids"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } if len(req.JobIDs) == 0 { s.respondJSON(w, http.StatusOK, []types.Job{}) return } if len(req.JobIDs) > 100 { s.respondError(w, http.StatusBadRequest, "Maximum 100 job IDs allowed per batch") return } // Build query with IN clause placeholders := make([]string, len(req.JobIDs)) args := make([]interface{}, len(req.JobIDs)+1) args[0] = userID for i, jobID := range req.JobIDs { placeholders[i] = "?" args[i+1] = jobID } query := fmt.Sprintf(`SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format, blend_metadata, created_at, started_at, completed_at, error_message FROM jobs WHERE user_id = ? AND id IN (%s) ORDER BY created_at DESC`, strings.Join(placeholders, ",")) var rows *sql.Rows err = s.db.With(func(conn *sql.DB) error { var err error rows, err = conn.Query(query, args...) return err }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query jobs: %v", err)) return } defer rows.Close() jobs := []types.Job{} for rows.Next() { var job types.Job var jobType string var startedAt, completedAt sql.NullTime var blendMetadataJSON sql.NullString var errorMessage sql.NullString var frameStart, frameEnd sql.NullInt64 var outputFormat sql.NullString err := rows.Scan( &job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress, &frameStart, &frameEnd, &outputFormat, &blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &errorMessage, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan job: %v", err)) return } job.JobType = types.JobType(jobType) if frameStart.Valid { fs := int(frameStart.Int64) job.FrameStart = &fs } if frameEnd.Valid { fe := int(frameEnd.Int64) job.FrameEnd = &fe } if outputFormat.Valid { job.OutputFormat = &outputFormat.String } if startedAt.Valid { job.StartedAt = &startedAt.Time } if completedAt.Valid { job.CompletedAt = &completedAt.Time } if blendMetadataJSON.Valid && blendMetadataJSON.String != "" { var metadata types.BlendMetadata if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err == nil { job.BlendMetadata = &metadata } } if errorMessage.Valid { job.ErrorMessage = errorMessage.String } jobs = append(jobs, job) } s.respondJSON(w, http.StatusOK, jobs) } // handleGetJob gets a specific job func (s *Manager) handleGetJob(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } var job types.Job var jobType string var startedAt, completedAt sql.NullTime var blendMetadataJSON sql.NullString var errorMessage sql.NullString var frameStart, frameEnd sql.NullInt64 var outputFormat sql.NullString // Allow admins to view any job, regular users can only view their own isAdmin := isAdminUser(r) var err2 error err2 = s.db.With(func(conn *sql.DB) error { if isAdmin { return conn.QueryRow( `SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format, blend_metadata, created_at, started_at, completed_at, error_message FROM jobs WHERE id = ?`, jobID, ).Scan( &job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress, &frameStart, &frameEnd, &outputFormat, &blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &errorMessage, ) } else { return conn.QueryRow( `SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format, blend_metadata, created_at, started_at, completed_at, error_message FROM jobs WHERE id = ? AND user_id = ?`, jobID, userID, ).Scan( &job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress, &frameStart, &frameEnd, &outputFormat, &blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &errorMessage, ) } }) if err2 == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err2 != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err2)) return } job.JobType = types.JobType(jobType) if frameStart.Valid { fs := int(frameStart.Int64) job.FrameStart = &fs } if frameEnd.Valid { fe := int(frameEnd.Int64) job.FrameEnd = &fe } if outputFormat.Valid { job.OutputFormat = &outputFormat.String } if startedAt.Valid { job.StartedAt = &startedAt.Time } if completedAt.Valid { job.CompletedAt = &completedAt.Time } if blendMetadataJSON.Valid && blendMetadataJSON.String != "" { var metadata types.BlendMetadata if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err == nil { job.BlendMetadata = &metadata } } if errorMessage.Valid { job.ErrorMessage = errorMessage.String } // Generate ETag and check If-None-Match etag := generateETag(job) w.Header().Set("ETag", etag) if checkETag(r, etag) { w.WriteHeader(http.StatusNotModified) return } s.respondJSON(w, http.StatusOK, job) } // handleCancelJob cancels a job func (s *Manager) handleCancelJob(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Check if this is a metadata extraction job - if so, don't cancel running metadata tasks var jobType string var jobStatus string err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT job_type, status FROM jobs WHERE id = ? AND user_id = ?", jobID, userID).Scan(&jobType, &jobStatus) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } // Don't allow cancelling already completed or cancelled jobs if jobStatus == string(types.JobStatusCompleted) || jobStatus == string(types.JobStatusCancelled) { s.respondJSON(w, http.StatusOK, map[string]string{"message": "Job already " + jobStatus}) return } var rowsAffected int64 err = s.db.With(func(conn *sql.DB) error { result, err := conn.Exec( `UPDATE jobs SET status = ? WHERE id = ? AND user_id = ?`, types.JobStatusCancelled, jobID, userID, ) if err != nil { return err } rowsAffected, _ = result.RowsAffected() if rowsAffected == 0 { return sql.ErrNoRows } // Cancel all pending tasks _, err = conn.Exec( `UPDATE tasks SET status = ?, runner_id = NULL WHERE job_id = ? AND status = ?`, types.TaskStatusFailed, jobID, types.TaskStatusPending, ) return err }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to cancel job: %v", err)) return } log.Printf("Cancelling job %d (type: %s)", jobID, jobType) s.respondJSON(w, http.StatusOK, map[string]string{"message": "Job cancelled"}) } // handleDeleteJob permanently deletes a job and all its associated data func (s *Manager) handleDeleteJob(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) and check status isAdmin := isAdminUser(r) var jobUserID int64 var jobStatus string err = s.db.With(func(conn *sql.DB) error { if isAdmin { return conn.QueryRow("SELECT user_id, status FROM jobs WHERE id = ?", jobID).Scan(&jobUserID, &jobStatus) } else { // Non-admin users can only delete their own jobs return conn.QueryRow("SELECT user_id, status FROM jobs WHERE id = ? AND user_id = ?", jobID, userID).Scan(&jobUserID, &jobStatus) } }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if !isAdmin && jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } // Prevent deletion of jobs that are still cancellable (pending or running) if jobStatus == string(types.JobStatusPending) || jobStatus == string(types.JobStatusRunning) { s.respondError(w, http.StatusBadRequest, "Cannot delete a job that is pending or running. Please cancel it first.") return } // Delete in transaction to ensure consistency err = s.db.WithTx(func(tx *sql.Tx) error { // Delete task logs _, err := tx.Exec(`DELETE FROM task_logs WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID) if err != nil { return fmt.Errorf("failed to delete task logs: %w", err) } // Delete task steps _, err = tx.Exec(`DELETE FROM task_steps WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID) if err != nil { return fmt.Errorf("failed to delete task steps: %w", err) } // Delete tasks _, err = tx.Exec("DELETE FROM tasks WHERE job_id = ?", jobID) if err != nil { return fmt.Errorf("failed to delete tasks: %w", err) } // Delete job files _, err = tx.Exec("DELETE FROM job_files WHERE job_id = ?", jobID) if err != nil { return fmt.Errorf("failed to delete job files: %w", err) } // Delete the job _, err = tx.Exec("DELETE FROM jobs WHERE id = ?", jobID) if err != nil { return fmt.Errorf("failed to delete job: %w", err) } return nil }) if err != nil { s.respondError(w, http.StatusInternalServerError, err.Error()) return } // Delete physical files if err := s.storage.DeleteJobFiles(jobID); err != nil { log.Printf("Warning: Failed to delete job files for job %d: %v", jobID, err) // Don't fail the request if file deletion fails - the database records are already deleted } log.Printf("Deleted job %d (user: %d, admin: %v)", jobID, jobUserID, isAdmin) s.respondJSON(w, http.StatusOK, map[string]string{"message": "Job deleted"}) } // cleanupOldRenderJobs periodically deletes render jobs older than 1 month func (s *Manager) cleanupOldRenderJobs() { // Run cleanup every hour ticker := time.NewTicker(1 * time.Hour) defer ticker.Stop() // Run once immediately on startup s.cleanupOldRenderJobsOnce() for range ticker.C { s.cleanupOldRenderJobsOnce() } } // cleanupOldRenderJobsOnce finds and deletes render jobs older than 1 month that are completed, failed, or cancelled func (s *Manager) cleanupOldRenderJobsOnce() { defer func() { if r := recover(); r != nil { log.Printf("Panic in cleanupOldRenderJobs: %v", r) } }() // Find render jobs older than 1 month that are in a final state (completed, failed, or cancelled) // Don't delete running or pending jobs var rows *sql.Rows err := s.db.With(func(conn *sql.DB) error { var err error rows, err = conn.Query( `SELECT id FROM jobs WHERE job_type = ? AND status IN (?, ?, ?) AND created_at < datetime('now', '-1 month')`, types.JobTypeRender, types.JobStatusCompleted, types.JobStatusFailed, types.JobStatusCancelled, ) return err }) if err != nil { log.Printf("Failed to query old render jobs: %v", err) return } defer rows.Close() var jobIDs []int64 for rows.Next() { var jobID int64 if err := rows.Scan(&jobID); err == nil { jobIDs = append(jobIDs, jobID) } else { log.Printf("Failed to scan job ID in cleanupOldRenderJobs: %v", err) } } rows.Close() if len(jobIDs) == 0 { return } log.Printf("Cleaning up %d old render jobs", len(jobIDs)) // Delete each job for _, jobID := range jobIDs { // Delete in transaction to ensure consistency err := s.db.WithTx(func(tx *sql.Tx) error { // Delete task logs _, err := tx.Exec(`DELETE FROM task_logs WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID) if err != nil { return fmt.Errorf("failed to delete task logs: %w", err) } // Delete task steps _, err = tx.Exec(`DELETE FROM task_steps WHERE task_id IN (SELECT id FROM tasks WHERE job_id = ?)`, jobID) if err != nil { return fmt.Errorf("failed to delete task steps: %w", err) } // Delete tasks _, err = tx.Exec("DELETE FROM tasks WHERE job_id = ?", jobID) if err != nil { return fmt.Errorf("failed to delete tasks: %w", err) } // Delete job files _, err = tx.Exec("DELETE FROM job_files WHERE job_id = ?", jobID) if err != nil { return fmt.Errorf("failed to delete job files: %w", err) } // Delete the job _, err = tx.Exec("DELETE FROM jobs WHERE id = ?", jobID) if err != nil { return fmt.Errorf("failed to delete job: %w", err) } return nil }) if err != nil { log.Printf("Failed to delete job %d: %v", jobID, err) continue } // Delete physical files (best effort, don't fail if this errors) if err := s.storage.DeleteJobFiles(jobID); err != nil { log.Printf("Warning: Failed to delete files for render job %d: %v", jobID, err) } } log.Printf("Cleaned up %d old render jobs", len(jobIDs)) } // handleUploadJobFile handles file upload for a job func (s *Manager) handleUploadJobFile(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } // Parse multipart form with large limit for big files // Note: For very large files, this will use temporary files on disk err = r.ParseMultipartForm(20 << 30) // 20 GB (for large ZIP files and blend files) if err != nil { log.Printf("Error parsing multipart form for job %d: %v", jobID, err) s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Failed to parse form: %v", err)) return } file, header, err := r.FormFile("file") if err != nil { log.Printf("Error getting file from form for job %d: %v", jobID, err) s.respondError(w, http.StatusBadRequest, fmt.Sprintf("No file provided: %v", err)) return } defer file.Close() log.Printf("Uploading file '%s' (size: %d bytes) for job %d", header.Filename, header.Size, jobID) jobPath := s.storage.JobPath(jobID) if err := os.MkdirAll(jobPath, 0755); err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create job directory: %v", err)) return } // Create temporary directory for processing upload tmpDir, err := s.storage.TempDir(fmt.Sprintf("jiggablend-upload-%d-*", jobID)) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create temporary directory: %v", err)) return } defer func() { if err := os.RemoveAll(tmpDir); err != nil { log.Printf("Warning: Failed to clean up temp directory %s: %v", tmpDir, err) } }() var fileID int64 var mainBlendFile string var extractedFiles []string // Check if this is a ZIP file if strings.HasSuffix(strings.ToLower(header.Filename), ".zip") { log.Printf("Processing ZIP file '%s' for job %d", header.Filename, jobID) // Save ZIP to temporary directory zipPath := filepath.Join(tmpDir, header.Filename) log.Printf("Creating ZIP file at: %s", zipPath) zipFile, err := os.Create(zipPath) if err != nil { log.Printf("ERROR: Failed to create ZIP file for job %d: %v", jobID, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create ZIP file: %v", err)) return } log.Printf("Copying %d bytes to ZIP file for job %d...", header.Size, jobID) copied, err := io.Copy(zipFile, file) zipFile.Close() if err != nil { log.Printf("ERROR: Failed to save ZIP file for job %d (copied %d bytes): %v", jobID, copied, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to save ZIP file: %v", err)) return } log.Printf("Successfully copied %d bytes to ZIP file for job %d", copied, jobID) // Extract ZIP file to temporary directory log.Printf("Extracting ZIP file for job %d...", jobID) extractedFiles, err = s.storage.ExtractZip(zipPath, tmpDir) if err != nil { log.Printf("ERROR: Failed to extract ZIP file for job %d: %v", jobID, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to extract ZIP file: %v", err)) return } log.Printf("Successfully extracted %d files from ZIP for job %d", len(extractedFiles), jobID) // Find main blend file (check for user selection first, then auto-detect) mainBlendParam := r.FormValue("main_blend_file") if mainBlendParam != "" { // User specified main blend file mainBlendFile = filepath.Join(tmpDir, mainBlendParam) if _, err := os.Stat(mainBlendFile); err != nil { s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Specified main blend file not found: %s", mainBlendParam)) return } } else { // Auto-detect: find blend files in root directory blendFiles := []string{} err := filepath.Walk(tmpDir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } // Only check files in root directory (not subdirectories) relPath, _ := filepath.Rel(tmpDir, path) if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".blend") { // Check if it's in root (no path separators) if !strings.Contains(relPath, string(filepath.Separator)) { blendFiles = append(blendFiles, path) } } return nil }) if err == nil && len(blendFiles) == 1 { // Only one blend file in root - use it mainBlendFile = blendFiles[0] } else if len(blendFiles) > 1 { // Multiple blend files - need user to specify // Return list of blend files for user to choose blendFileNames := []string{} for _, f := range blendFiles { rel, _ := filepath.Rel(tmpDir, f) blendFileNames = append(blendFileNames, rel) } s.respondJSON(w, http.StatusOK, map[string]interface{}{ "zip_extracted": true, "blend_files": blendFileNames, "message": "Multiple blend files found. Please specify the main blend file.", }) return } } } else { // Regular file upload (not ZIP) - save to temporary directory filePath := filepath.Join(tmpDir, header.Filename) outFile, err := os.Create(filePath) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create file: %v", err)) return } // Get a fresh file reader (FormFile returns a new reader each time) fileReader, _, err := r.FormFile("file") if err != nil { outFile.Close() s.respondError(w, http.StatusBadRequest, fmt.Sprintf("No file provided: %v", err)) return } if _, err := io.Copy(outFile, fileReader); err != nil { fileReader.Close() outFile.Close() s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to save file: %v", err)) return } fileReader.Close() outFile.Close() if strings.HasSuffix(strings.ToLower(header.Filename), ".blend") { mainBlendFile = filePath } } // Create context archive from temporary directory - this is the primary artifact // Exclude the original uploaded ZIP file (but keep blend files as they're needed for rendering) var excludeFiles []string if strings.HasSuffix(strings.ToLower(header.Filename), ".zip") { excludeFiles = append(excludeFiles, header.Filename) } contextPath, err := s.storage.CreateJobContextFromDir(tmpDir, jobID, excludeFiles...) if err != nil { log.Printf("ERROR: Failed to create context archive for job %d: %v", jobID, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create context archive: %v", err)) return } // Record context archive in database contextInfo, err := os.Stat(contextPath) if err != nil { log.Printf("ERROR: Failed to stat context archive for job %d: %v", jobID, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to stat context archive: %v", err)) return } err = s.db.With(func(conn *sql.DB) error { result, err := conn.Exec( `INSERT INTO job_files (job_id, file_type, file_path, file_name, file_size) VALUES (?, ?, ?, ?, ?)`, jobID, types.JobFileTypeInput, contextPath, filepath.Base(contextPath), contextInfo.Size(), ) if err != nil { return err } fileID, err = result.LastInsertId() return err }) if err != nil { log.Printf("ERROR: Failed to record context archive in database for job %d: %v", jobID, err) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to record context archive: %v", err)) return } log.Printf("Context archive recorded in database with ID %d for job %d", fileID, jobID) // Broadcast file addition s.broadcastJobUpdate(jobID, "file_added", map[string]interface{}{ "file_id": fileID, "file_type": types.JobFileTypeInput, "file_name": filepath.Base(contextPath), "file_size": contextInfo.Size(), }) // Extract metadata directly from the context archive log.Printf("Extracting metadata for job %d...", jobID) metadata, err := s.extractMetadataFromContext(jobID) if err != nil { log.Printf("Warning: Failed to extract metadata for job %d: %v", jobID, err) // Don't fail the upload if metadata extraction fails - job can still proceed } else { // Update job with metadata metadataJSON, err := json.Marshal(metadata) if err == nil { err = s.db.With(func(conn *sql.DB) error { _, err := conn.Exec( `UPDATE jobs SET blend_metadata = ? WHERE id = ?`, string(metadataJSON), jobID, ) if err != nil { log.Printf("Warning: Failed to update job metadata in database: %v", err) } else { log.Printf("Successfully extracted and stored metadata for job %d", jobID) } return err }) } else { log.Printf("Warning: Failed to marshal metadata: %v", err) } } response := map[string]interface{}{ "id": fileID, "file_name": header.Filename, "file_size": header.Size, "context_archive": filepath.Base(contextPath), } if strings.HasSuffix(strings.ToLower(header.Filename), ".zip") { response["zip_extracted"] = true response["extracted_files_count"] = len(extractedFiles) if mainBlendFile != "" { // Get relative path from temp dir relPath, _ := filepath.Rel(tmpDir, mainBlendFile) response["main_blend_file"] = relPath } } else if mainBlendFile != "" { relPath, _ := filepath.Rel(tmpDir, mainBlendFile) response["main_blend_file"] = relPath } s.respondJSON(w, http.StatusCreated, response) } // handleUploadFileForJobCreation handles file upload before job creation // Creates context archive and extracts metadata, returns metadata and upload session ID func (s *Manager) handleUploadFileForJobCreation(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } // Use MultipartReader to stream the file instead of loading it all into memory // This allows us to report progress during upload reader, err := r.MultipartReader() if err != nil { log.Printf("Error creating multipart reader: %v", err) s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Failed to parse multipart form: %v", err)) return } // Find the file part and collect form values // IMPORTANT: With MultipartReader, we must read the file part's data immediately // before calling NextPart() again, otherwise the data becomes unavailable var header *multipart.FileHeader var filePath string formValues := make(map[string]string) var tmpDir string var sessionID string var mainBlendFile string // Create temporary directory first (before reading parts) tmpDir, err = s.storage.TempDir(fmt.Sprintf("jiggablend-upload-user-%d-*", userID)) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create temporary directory: %v", err)) return } // Generate session ID (use temp directory path as session ID) sessionID = tmpDir // Create upload session s.uploadSessionsMu.Lock() s.uploadSessions[sessionID] = &UploadSession{ SessionID: sessionID, UserID: userID, Progress: 0.0, Status: "uploading", Message: "Uploading file...", CreatedAt: time.Now(), } s.uploadSessionsMu.Unlock() // Client tracks upload progress via XHR - no need to broadcast here // We only broadcast processing status changes (extracting, creating context, etc.) fileFound := false for { part, err := reader.NextPart() if err == io.EOF { break } if err != nil { log.Printf("Error reading multipart: %v", err) os.RemoveAll(tmpDir) s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Failed to read multipart: %v", err)) return } formName := part.FormName() if formName == "file" { // Read file part immediately - can't store it for later with MultipartReader header = &multipart.FileHeader{ Filename: part.FileName(), } // Try to get Content-Length from header if available if cl := part.Header.Get("Content-Length"); cl != "" { if size, err := strconv.ParseInt(cl, 10, 64); err == nil { header.Size = size } } // Determine file path if strings.HasSuffix(strings.ToLower(header.Filename), ".zip") { filePath = filepath.Join(tmpDir, header.Filename) } else { filePath = filepath.Join(tmpDir, header.Filename) if strings.HasSuffix(strings.ToLower(header.Filename), ".blend") { mainBlendFile = filePath } } // Create file and copy data immediately outFile, err := os.Create(filePath) if err != nil { part.Close() os.RemoveAll(tmpDir) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create file: %v", err)) return } // Copy file data - must do this before calling NextPart() again copied, err := io.Copy(outFile, part) outFile.Close() part.Close() if err != nil { os.RemoveAll(tmpDir) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to save file: %v", err)) return } // Update header size with actual bytes copied (in case Content-Length was wrong) if header.Size == 0 { header.Size = copied } fileFound = true log.Printf("Uploading file '%s' (size: %d bytes, copied: %d bytes) for user %d (pre-job creation)", header.Filename, header.Size, copied, userID) } else if formName != "" { // Read form value valueBytes, err := io.ReadAll(part) if err == nil { formValues[formName] = string(valueBytes) } part.Close() } else { part.Close() } } if !fileFound { os.RemoveAll(tmpDir) s.respondError(w, http.StatusBadRequest, "No file provided") return } // Process everything synchronously and return metadata in HTTP response // Client will show upload progress during upload, then processing progress while waiting filename := header.Filename fileSize := header.Size mainBlendParam := formValues["main_blend_file"] var processedMainBlendFile string var processedExtractedFiles []string var processedMetadata *types.BlendMetadata // Process ZIP extraction if needed if strings.HasSuffix(strings.ToLower(filename), ".zip") { zipPath := filepath.Join(tmpDir, filename) log.Printf("Extracting ZIP file: %s", zipPath) processedExtractedFiles, err = s.storage.ExtractZip(zipPath, tmpDir) if err != nil { log.Printf("ERROR: Failed to extract ZIP file: %v", err) os.RemoveAll(tmpDir) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to extract ZIP file: %v", err)) return } log.Printf("Successfully extracted %d files from ZIP", len(processedExtractedFiles)) // Find main blend file if mainBlendParam != "" { processedMainBlendFile = filepath.Join(tmpDir, mainBlendParam) if _, err := os.Stat(processedMainBlendFile); err != nil { log.Printf("ERROR: Specified main blend file not found: %s", mainBlendParam) os.RemoveAll(tmpDir) s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Specified main blend file not found: %s", mainBlendParam)) return } } else { // Auto-detect: find blend files in root directory blendFiles := []string{} err := filepath.Walk(tmpDir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } relPath, _ := filepath.Rel(tmpDir, path) if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".blend") { if !strings.Contains(relPath, string(filepath.Separator)) { blendFiles = append(blendFiles, path) } } return nil }) if err == nil && len(blendFiles) == 1 { processedMainBlendFile = blendFiles[0] } else if len(blendFiles) > 1 { // Multiple blend files - return response with list for user to select blendFileNames := []string{} for _, f := range blendFiles { rel, _ := filepath.Rel(tmpDir, f) blendFileNames = append(blendFileNames, rel) } // Return response indicating multiple blend files found response := map[string]interface{}{ "session_id": sessionID, "file_name": filename, "file_size": fileSize, "status": "select_blend", "zip_extracted": true, "blend_files": blendFileNames, } s.respondJSON(w, http.StatusOK, response) return } } } else { processedMainBlendFile = mainBlendFile } // Create context archive var excludeFiles []string if strings.HasSuffix(strings.ToLower(filename), ".zip") { excludeFiles = append(excludeFiles, filename) } log.Printf("Creating context archive for session %s", sessionID) contextPath := filepath.Join(tmpDir, "context.tar") contextPath, err = s.createContextFromDir(tmpDir, contextPath, excludeFiles...) if err != nil { log.Printf("ERROR: Failed to create context archive: %v", err) os.RemoveAll(tmpDir) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create context archive: %v", err)) return } // Extract metadata from context archive log.Printf("Extracting metadata from context archive for session %s", sessionID) processedMetadata, err = s.extractMetadataFromTempContext(contextPath) if err != nil { log.Printf("Warning: Failed to extract metadata: %v", err) // Continue anyway - user can fill in manually processedMetadata = nil } // Build response with all results response := map[string]interface{}{ "session_id": sessionID, "file_name": filename, "file_size": fileSize, "context_archive": filepath.Base(contextPath), "status": "completed", } if strings.HasSuffix(strings.ToLower(filename), ".zip") { response["zip_extracted"] = true response["extracted_files_count"] = len(processedExtractedFiles) if processedMainBlendFile != "" { relPath, _ := filepath.Rel(tmpDir, processedMainBlendFile) response["main_blend_file"] = relPath } } else if processedMainBlendFile != "" { relPath, _ := filepath.Rel(tmpDir, processedMainBlendFile) response["main_blend_file"] = relPath } if processedMetadata != nil { response["metadata"] = processedMetadata response["metadata_extracted"] = true } else { response["metadata_extracted"] = false } // Clean up upload session immediately (no longer needed for WebSocket) s.uploadSessionsMu.Lock() delete(s.uploadSessions, sessionID) s.uploadSessionsMu.Unlock() // Return response with metadata s.respondJSON(w, http.StatusOK, response) } // extractMetadataFromTempContext extracts metadata from a context archive in a temporary location func (s *Manager) extractMetadataFromTempContext(contextPath string) (*types.BlendMetadata, error) { return s.extractMetadataFromTempContextWithProgress(contextPath, nil) } // extractMetadataFromTempContextWithProgress extracts metadata with progress callbacks func (s *Manager) extractMetadataFromTempContextWithProgress(contextPath string, progressCallback func(float64, string)) (*types.BlendMetadata, error) { // Create temporary directory for extraction under storage base path tmpDir, err := s.storage.TempDir("jiggablend-metadata-temp-*") if err != nil { return nil, fmt.Errorf("failed to create temporary directory: %w", err) } defer func() { if err := os.RemoveAll(tmpDir); err != nil { log.Printf("Warning: Failed to clean up temp directory %s: %v", tmpDir, err) } }() // Extract context archive if err := s.extractTar(contextPath, tmpDir); err != nil { return nil, fmt.Errorf("failed to extract context: %w", err) } // Find .blend file blendFile := "" err = filepath.Walk(tmpDir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".blend") { lower := strings.ToLower(info.Name()) idx := strings.LastIndex(lower, ".blend") if idx != -1 { suffix := lower[idx+len(".blend"):] isSaveFile := false if len(suffix) > 0 { isSaveFile = true for _, r := range suffix { if r < '0' || r > '9' { isSaveFile = false break } } } if !isSaveFile { blendFile = path return filepath.SkipAll } } } return nil }) if err != nil || blendFile == "" { return nil, fmt.Errorf("no .blend file found in context - the uploaded context archive must contain at least one .blend file to render") } // Detect Blender version from blend file header BEFORE running Blender // This allows us to use the correct Blender version for metadata extraction detectedVersion := "" major, minor, versionErr := ParseBlenderVersionFromFile(blendFile) if versionErr == nil { detectedVersion = fmt.Sprintf("%d.%d", major, minor) log.Printf("Detected Blender version %s from blend file header", detectedVersion) } else { log.Printf("Warning: Could not detect Blender version from blend file: %v", versionErr) } // Use the same extraction script and process as extractMetadataFromContext // (Copy the logic from extractMetadataFromContext but use tmpDir and blendFile) metadata, err := s.runBlenderMetadataExtraction(blendFile, tmpDir, detectedVersion, nil, progressCallback) if err != nil { return nil, err } // Set the detected/resolved Blender version in metadata if metadata != nil && detectedVersion != "" { // Get the latest patch version for this major.minor version, verr := s.GetLatestBlenderForMajorMinor(major, minor) if verr == nil { metadata.BlenderVersion = version.Full log.Printf("Resolved Blender version to %s", version.Full) } else { metadata.BlenderVersion = detectedVersion log.Printf("Using detected version %s (could not resolve latest: %v)", detectedVersion, verr) } } return metadata, nil } // runBlenderMetadataExtraction runs Blender to extract metadata from a blend file // blenderVersion is optional - if provided, will use versioned blender from cache // stderrCallback is optional and will be called for each stderr line (note: with RunCommand, this is called after completion) // progressCallback is optional and will be called with progress updates (0.0-1.0) func (s *Manager) runBlenderMetadataExtraction(blendFile, workDir, blenderVersion string, stderrCallback func(string), progressCallback func(float64, string)) (*types.BlendMetadata, error) { // Use embedded Python script scriptPath := filepath.Join(workDir, "extract_metadata.py") if err := os.WriteFile(scriptPath, []byte(scripts.ExtractMetadata), 0644); err != nil { return nil, fmt.Errorf("failed to create extraction script: %w", err) } // Make blend file path relative to workDir to avoid path resolution issues blendFileRel, err := filepath.Rel(workDir, blendFile) if err != nil { return nil, fmt.Errorf("failed to get relative path for blend file: %w", err) } // Determine which blender binary to use blenderBinary := "blender" // Default to system blender var version *BlenderVersion // Track version for cleanup if blenderVersion != "" { // Try to get versioned blender from cache var major, minor int fmt.Sscanf(blenderVersion, "%d.%d", &major, &minor) version, err = s.GetLatestBlenderForMajorMinor(major, minor) if err == nil { archivePath, err := s.GetBlenderArchivePath(version) if err == nil { // Extract to temp location for manager-side metadata extraction blenderDir := filepath.Join(s.storage.BasePath(), "blender-versions") binaryPath := filepath.Join(blenderDir, version.Full, "blender") // Make path absolute to avoid working directory issues if absBinaryPath, absErr := filepath.Abs(binaryPath); absErr == nil { binaryPath = absBinaryPath } if _, err := os.Stat(binaryPath); os.IsNotExist(err) { // Need to extract if progressCallback != nil { progressCallback(0.5, "Extracting Blender binary...") } if err := extractBlenderArchive(archivePath, version, blenderDir); err == nil { blenderBinary = binaryPath log.Printf("Using Blender %s at %s for metadata extraction", version.Full, binaryPath) if progressCallback != nil { progressCallback(0.7, "Blender extracted, extracting metadata...") } } else { log.Printf("Warning: Failed to extract Blender %s: %v, using system blender", version.Full, err) } } else { blenderBinary = binaryPath log.Printf("Using cached Blender %s at %s for metadata extraction", version.Full, binaryPath) if progressCallback != nil { progressCallback(0.7, "Extracting metadata from blend file...") } } } else { log.Printf("Warning: Failed to get Blender archive for %s: %v, using system blender", version.Full, err) } } else { log.Printf("Warning: Failed to find Blender version %s: %v, using system blender", blenderVersion, err) } } // Execute Blender using executils result, err := executils.RunCommand( blenderBinary, []string{"-b", blendFileRel, "--python", "extract_metadata.py"}, workDir, nil, // inherit environment 0, // no task ID for metadata extraction nil, // no process tracker needed ) // Forward stderr via callback if provided if result != nil && stderrCallback != nil && result.Stderr != "" { for _, line := range strings.Split(result.Stderr, "\n") { if line != "" { stderrCallback(line) } } } if err != nil { stderrOutput := "" stdoutOutput := "" if result != nil { stderrOutput = strings.TrimSpace(result.Stderr) stdoutOutput = strings.TrimSpace(result.Stdout) } log.Printf("Blender metadata extraction failed:") if stderrOutput != "" { log.Printf("Blender stderr: %s", stderrOutput) } if stdoutOutput != "" { log.Printf("Blender stdout (last 500 chars): %s", truncateString(stdoutOutput, 500)) } if stderrOutput != "" { return nil, fmt.Errorf("blender metadata extraction failed: %w (stderr: %s)", err, truncateString(stderrOutput, 200)) } return nil, fmt.Errorf("blender metadata extraction failed: %w", err) } metadataJSON := strings.TrimSpace(result.Stdout) jsonStart := strings.Index(metadataJSON, "{") jsonEnd := strings.LastIndex(metadataJSON, "}") if jsonStart == -1 || jsonEnd == -1 || jsonEnd <= jsonStart { return nil, errors.New("failed to extract JSON from Blender output") } metadataJSON = metadataJSON[jsonStart : jsonEnd+1] var metadata types.BlendMetadata if err := json.Unmarshal([]byte(metadataJSON), &metadata); err != nil { return nil, fmt.Errorf("failed to parse metadata JSON: %w", err) } log.Printf("Metadata extracted: frame_start=%d, frame_end=%d", metadata.FrameStart, metadata.FrameEnd) return &metadata, nil } // createContextFromDir creates a context archive from a source directory to a specific destination path func (s *Manager) createContextFromDir(sourceDir, destPath string, excludeFiles ...string) (string, error) { // Build set of files to exclude excludeSet := make(map[string]bool) for _, excludeFile := range excludeFiles { excludePath := filepath.Clean(excludeFile) excludeSet[excludePath] = true excludeSet[filepath.ToSlash(excludePath)] = true } // Collect all files from source directory var filesToInclude []string err := filepath.Walk(sourceDir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if info.IsDir() { return nil } // Skip Blender save files lower := strings.ToLower(info.Name()) idx := strings.LastIndex(lower, ".blend") if idx != -1 { suffix := lower[idx+len(".blend"):] if len(suffix) > 0 { isSaveFile := true for _, r := range suffix { if r < '0' || r > '9' { isSaveFile = false break } } if isSaveFile { return nil } } } relPath, err := filepath.Rel(sourceDir, path) if err != nil { return err } cleanRelPath := filepath.Clean(relPath) if strings.HasPrefix(cleanRelPath, "..") { return fmt.Errorf("invalid file path: %s", relPath) } if excludeSet[cleanRelPath] || excludeSet[filepath.ToSlash(cleanRelPath)] { return nil } filesToInclude = append(filesToInclude, path) return nil }) if err != nil { return "", fmt.Errorf("failed to walk source directory: %w", err) } if len(filesToInclude) == 0 { return "", fmt.Errorf("no files found to include in context archive") } // Collect relative paths to find common prefix relPaths := make([]string, 0, len(filesToInclude)) for _, filePath := range filesToInclude { relPath, err := filepath.Rel(sourceDir, filePath) if err != nil { return "", fmt.Errorf("failed to get relative path: %w", err) } relPaths = append(relPaths, relPath) } // Find and strip common leading directory commonPrefix := "" if len(relPaths) > 0 { firstComponents := make([]string, 0, len(relPaths)) for _, path := range relPaths { parts := strings.Split(filepath.ToSlash(path), "/") if len(parts) > 0 && parts[0] != "" { firstComponents = append(firstComponents, parts[0]) } else { firstComponents = nil break } } if len(firstComponents) > 0 { commonFirst := firstComponents[0] allSame := true for _, comp := range firstComponents { if comp != commonFirst { allSame = false break } } if allSame { commonPrefix = commonFirst + "/" } } } // Validate single .blend file at root blendFilesAtRoot := 0 for _, relPath := range relPaths { tarPath := filepath.ToSlash(relPath) if commonPrefix != "" && strings.HasPrefix(tarPath, commonPrefix) { tarPath = strings.TrimPrefix(tarPath, commonPrefix) } if strings.HasSuffix(strings.ToLower(tarPath), ".blend") && !strings.Contains(tarPath, "/") { blendFilesAtRoot++ } } if blendFilesAtRoot == 0 { return "", fmt.Errorf("no .blend file found at root level in context archive - .blend files must be at the root level of the uploaded archive, not in subdirectories") } if blendFilesAtRoot > 1 { return "", fmt.Errorf("multiple .blend files found at root level in context archive (found %d, expected 1)", blendFilesAtRoot) } // Create the tar file contextFile, err := os.Create(destPath) if err != nil { return "", fmt.Errorf("failed to create context file: %w", err) } defer contextFile.Close() tarWriter := tar.NewWriter(contextFile) defer tarWriter.Close() // Add each file to the tar archive for i, filePath := range filesToInclude { file, err := os.Open(filePath) if err != nil { return "", fmt.Errorf("failed to open file: %w", err) } info, err := file.Stat() if err != nil { file.Close() return "", fmt.Errorf("failed to stat file: %w", err) } relPath := relPaths[i] tarPath := filepath.ToSlash(relPath) if commonPrefix != "" && strings.HasPrefix(tarPath, commonPrefix) { tarPath = strings.TrimPrefix(tarPath, commonPrefix) } header, err := tar.FileInfoHeader(info, "") if err != nil { file.Close() return "", fmt.Errorf("failed to create tar header: %w", err) } header.Name = tarPath if err := tarWriter.WriteHeader(header); err != nil { file.Close() return "", fmt.Errorf("failed to write tar header: %w", err) } if _, err := io.Copy(tarWriter, file); err != nil { file.Close() return "", fmt.Errorf("failed to write file to tar: %w", err) } file.Close() } if err := tarWriter.Close(); err != nil { return "", fmt.Errorf("failed to close tar writer: %w", err) } if err := contextFile.Close(); err != nil { return "", fmt.Errorf("failed to close context file: %w", err) } return destPath, nil } // handleListJobFiles lists files for a job with pagination func (s *Manager) handleListJobFiles(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } else { // Admin: verify job exists var exists bool err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) }) if err != nil || !exists { s.respondError(w, http.StatusNotFound, "Job not found") return } } // Parse query parameters limit := 50 if limitStr := r.URL.Query().Get("limit"); limitStr != "" { if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 { limit = l } } offset := 0 if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" { if o, err := strconv.Atoi(offsetStr); err == nil && o >= 0 { offset = o } } fileTypeFilter := r.URL.Query().Get("file_type") extensionFilter := r.URL.Query().Get("extension") // Build query with filters query := `SELECT id, job_id, file_type, file_path, file_name, file_size, created_at FROM job_files WHERE job_id = ?` args := []interface{}{jobID} if fileTypeFilter != "" { query += " AND file_type = ?" args = append(args, fileTypeFilter) } if extensionFilter != "" { query += " AND file_name LIKE ?" args = append(args, "%."+extensionFilter) } query += " ORDER BY created_at DESC LIMIT ? OFFSET ?" args = append(args, limit, offset) var rows *sql.Rows var total int err = s.db.With(func(conn *sql.DB) error { var err error rows, err = conn.Query(query, args...) if err != nil { return err } // Get total count countQuery := `SELECT COUNT(*) FROM job_files WHERE job_id = ?` countArgs := []interface{}{jobID} if fileTypeFilter != "" { countQuery += " AND file_type = ?" countArgs = append(countArgs, fileTypeFilter) } if extensionFilter != "" { countQuery += " AND file_name LIKE ?" countArgs = append(countArgs, "%."+extensionFilter) } err = conn.QueryRow(countQuery, countArgs...).Scan(&total) if err != nil { total = -1 } return nil }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query files: %v", err)) return } defer rows.Close() files := []types.JobFile{} for rows.Next() { var file types.JobFile err := rows.Scan( &file.ID, &file.JobID, &file.FileType, &file.FilePath, &file.FileName, &file.FileSize, &file.CreatedAt, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan file: %v", err)) return } files = append(files, file) } response := map[string]interface{}{ "data": files, "total": total, "limit": limit, "offset": offset, } s.respondJSON(w, http.StatusOK, response) } // handleGetJobFilesCount returns the count of files for a job func (s *Manager) handleGetJobFilesCount(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } else { var exists bool err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) }) if err != nil || !exists { s.respondError(w, http.StatusNotFound, "Job not found") return } } fileTypeFilter := r.URL.Query().Get("file_type") var count int query := `SELECT COUNT(*) FROM job_files WHERE job_id = ?` args := []interface{}{jobID} if fileTypeFilter != "" { query += " AND file_type = ?" args = append(args, fileTypeFilter) } err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow(query, args...).Scan(&count) }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to count files: %v", err)) return } s.respondJSON(w, http.StatusOK, map[string]interface{}{"count": count}) } // handleListContextArchive lists files inside the context archive // Optimized to only read tar headers, skipping file data for fast directory listing func (s *Manager) handleListContextArchive(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } // Get context archive path contextPath := filepath.Join(s.storage.JobPath(jobID), "context.tar") if !s.storage.FileExists(contextPath) { s.respondError(w, http.StatusNotFound, "Context archive not found") return } // Open file directly for seeking (much faster than reading all data) file, err := os.Open(contextPath) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to open context archive: %v", err)) return } defer file.Close() type ArchiveFile struct { Name string `json:"name"` Size int64 `json:"size"` Path string `json:"path"` } var archiveFiles []ArchiveFile const tarBlockSize = 512 // Read tar headers sequentially, skipping file data by seeking // This is much faster than reading all file contents for { // Read 512-byte tar header headerBuf := make([]byte, tarBlockSize) n, err := file.Read(headerBuf) if err == io.EOF { break } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to read archive header: %v", err)) return } if n < tarBlockSize { // Incomplete header, likely end of archive break } // Check if this is the end marker (all zeros) - tar files end with two zero blocks allZeros := true for _, b := range headerBuf { if b != 0 { allZeros = false break } } if allZeros { break } // Parse tar header var header tar.Header if err := parseTarHeader(headerBuf, &header); err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to parse archive header: %v", err)) return } // Handle GNU tar long filename extension (type 'L') // If typeflag is 'L', the next block contains the actual filename if header.Typeflag == 'L' { // Read the long filename from the next block longNameBuf := make([]byte, tarBlockSize) if _, err := file.Read(longNameBuf); err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to read long filename: %v", err)) return } header.Name = strings.TrimRight(string(longNameBuf), "\x00") // Read the actual header after the long filename if _, err := file.Read(headerBuf); err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to read header after long filename: %v", err)) return } if err := parseTarHeader(headerBuf, &header); err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to parse header after long filename: %v", err)) return } } // Only include regular files (not directories) if header.Typeflag == tar.TypeReg { archiveFiles = append(archiveFiles, ArchiveFile{ Name: filepath.Base(header.Name), Size: header.Size, Path: header.Name, }) } // Skip file data by seeking forward // Tar format: file data is padded to 512-byte boundary dataSize := header.Size blockPadding := (tarBlockSize - (dataSize % tarBlockSize)) % tarBlockSize skipSize := dataSize + blockPadding // Seek forward to next header (much faster than reading) _, err = file.Seek(skipSize, io.SeekCurrent) if err != nil { // If seek fails (e.g., on non-seekable stream), fall back to reading and discarding _, readErr := io.CopyN(io.Discard, file, skipSize) if readErr != nil && readErr != io.EOF { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to skip file data: %v", readErr)) return } } } s.respondJSON(w, http.StatusOK, archiveFiles) } // parseTarHeader parses a 512-byte tar header block into a tar.Header // This is a simplified parser that extracts the essential fields we need func parseTarHeader(buf []byte, h *tar.Header) error { const tarHeaderSize = 512 if len(buf) < tarHeaderSize { return fmt.Errorf("buffer too small for tar header") } // Tar header format (UStar/POSIX format) // Field offsets based on POSIX.1-1988 tar format h.Name = strings.TrimRight(string(buf[0:100]), "\x00") // Parse mode (octal) modeStr := strings.TrimRight(string(buf[100:108]), " \x00") mode, err := strconv.ParseUint(modeStr, 8, 32) if err == nil { h.Mode = int64(mode) } // Parse size (octal) sizeStr := strings.TrimRight(string(buf[124:136]), " \x00") size, err := strconv.ParseInt(sizeStr, 8, 64) if err == nil { h.Size = size } // Parse typeflag if len(buf) > 156 { h.Typeflag = buf[156] } // Handle UStar format prefix (for long filenames) prefix := strings.TrimRight(string(buf[345:500]), "\x00") if prefix != "" { h.Name = prefix + "/" + h.Name } return nil } // handleDownloadJobFile downloads a job file func (s *Manager) handleDownloadJobFile(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } fileID, err := parseID(r, "fileId") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } else { // Admin: verify job exists var exists bool err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) }) if err != nil || !exists { s.respondError(w, http.StatusNotFound, "Job not found") return } } // Get file info var filePath, fileName string err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow( `SELECT file_path, file_name FROM job_files WHERE id = ? AND job_id = ?`, fileID, jobID, ).Scan(&filePath, &fileName) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "File not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query file: %v", err)) return } // Open file file, err := s.storage.GetFile(filePath) if err != nil { s.respondError(w, http.StatusNotFound, "File not found on disk") return } defer file.Close() // Determine content type based on file extension contentType := "application/octet-stream" disposition := "attachment" fileNameLower := strings.ToLower(fileName) switch { case strings.HasSuffix(fileNameLower, ".png"): contentType = "image/png" disposition = "inline" case strings.HasSuffix(fileNameLower, ".jpg") || strings.HasSuffix(fileNameLower, ".jpeg"): contentType = "image/jpeg" disposition = "inline" case strings.HasSuffix(fileNameLower, ".gif"): contentType = "image/gif" disposition = "inline" case strings.HasSuffix(fileNameLower, ".webp"): contentType = "image/webp" disposition = "inline" case strings.HasSuffix(fileNameLower, ".bmp"): contentType = "image/bmp" disposition = "inline" case strings.HasSuffix(fileNameLower, ".svg"): contentType = "image/svg+xml" disposition = "inline" } // Set headers w.Header().Set("Content-Disposition", fmt.Sprintf("%s; filename=%s", disposition, fileName)) w.Header().Set("Content-Type", contentType) // Stream file io.Copy(w, file) } // handlePreviewEXR converts an EXR file to PNG for browser preview // Uses ImageMagick to convert with HDR tone mapping and alpha preservation func (s *Manager) handlePreviewEXR(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } fileID, err := parseID(r, "fileId") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } else { // Admin: verify job exists var exists bool err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) }) if err != nil || !exists { s.respondError(w, http.StatusNotFound, "Job not found") return } } // Get file info var filePath, fileName string err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow( `SELECT file_path, file_name FROM job_files WHERE id = ? AND job_id = ?`, fileID, jobID, ).Scan(&filePath, &fileName) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "File not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query file: %v", err)) return } // Verify it's an EXR file if !strings.HasSuffix(strings.ToLower(fileName), ".exr") { s.respondError(w, http.StatusBadRequest, "File is not an EXR file") return } // Check if source file exists if !s.storage.FileExists(filePath) { s.respondError(w, http.StatusNotFound, "File not found on disk") return } // Create temp file for PNG output tmpFile, err := os.CreateTemp("", "exr-preview-*.png") if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create temp file: %v", err)) return } tmpPath := tmpFile.Name() tmpFile.Close() defer os.Remove(tmpPath) // Convert EXR to PNG using ImageMagick // -colorspace sRGB: Convert from linear RGB to sRGB (matches SDR encoding pipeline) // -depth 16: Use 16-bit depth for better quality // -alpha on: Preserve alpha channel // Note: Removed -auto-level to avoid automatic tone mapping that changes colors result, err := executils.RunCommand( "magick", []string{ filePath, "-colorspace", "sRGB", "-depth", "16", "-alpha", "on", tmpPath, }, "", // dir nil, // env 0, // taskID nil, // tracker ) if err != nil { // Try with 'convert' command (older ImageMagick) result, err = executils.RunCommand( "convert", []string{ filePath, "-colorspace", "sRGB", "-depth", "16", "-alpha", "on", tmpPath, }, "", // dir nil, // env 0, // taskID nil, // tracker ) if err != nil { log.Printf("EXR conversion failed: %v, output: %s %s", err, result.Stdout, result.Stderr) s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to convert EXR: %v", err)) return } } // Read the converted PNG pngData, err := os.ReadFile(tmpPath) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to read converted file: %v", err)) return } // Set headers pngFileName := strings.TrimSuffix(fileName, filepath.Ext(fileName)) + ".png" w.Header().Set("Content-Disposition", fmt.Sprintf("inline; filename=%s", pngFileName)) w.Header().Set("Content-Type", "image/png") w.Header().Set("Content-Length", strconv.Itoa(len(pngData))) // Write response w.Write(pngData) } // handleStreamVideo streams MP4 video file with range support func (s *Manager) handleStreamVideo(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) var jobUserID int64 var outputFormat string err = s.db.With(func(conn *sql.DB) error { if isAdmin { return conn.QueryRow("SELECT user_id, output_format FROM jobs WHERE id = ?", jobID).Scan(&jobUserID, &outputFormat) } else { return conn.QueryRow("SELECT user_id, output_format FROM jobs WHERE id = ? AND user_id = ?", jobID, userID).Scan(&jobUserID, &outputFormat) } }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query job: %v", err)) return } if !isAdmin && jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } // Find MP4 file var filePath, fileName string err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow( `SELECT file_path, file_name FROM job_files WHERE job_id = ? AND file_type = ? AND file_name LIKE '%.mp4' ORDER BY created_at DESC LIMIT 1`, jobID, types.JobFileTypeOutput, ).Scan(&filePath, &fileName) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Video file not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query file: %v", err)) return } // Open file file, err := s.storage.GetFile(filePath) if err != nil { s.respondError(w, http.StatusNotFound, "File not found on disk") return } defer file.Close() // Get file info fileInfo, err := file.Stat() if err != nil { s.respondError(w, http.StatusInternalServerError, "Failed to get file info") return } fileSize := fileInfo.Size() // Handle range requests for video seeking rangeHeader := r.Header.Get("Range") if rangeHeader != "" { // Parse range header var start, end int64 fmt.Sscanf(rangeHeader, "bytes=%d-%d", &start, &end) if end == 0 { end = fileSize - 1 } // Seek to start position file.Seek(start, 0) // Set headers for partial content w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, fileSize)) w.Header().Set("Accept-Ranges", "bytes") w.Header().Set("Content-Length", fmt.Sprintf("%d", end-start+1)) w.Header().Set("Content-Type", "video/mp4") w.WriteHeader(http.StatusPartialContent) // Copy partial content io.CopyN(w, file, end-start+1) } else { // Full file w.Header().Set("Content-Type", "video/mp4") w.Header().Set("Content-Length", fmt.Sprintf("%d", fileSize)) w.Header().Set("Accept-Ranges", "bytes") io.Copy(w, file) } } // handleListJobTasks lists all tasks for a job with pagination and filtering func (s *Manager) handleListJobTasks(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } else { // Admin: verify job exists var exists bool err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) }) if err != nil || !exists { s.respondError(w, http.StatusNotFound, "Job not found") return } } // Parse query parameters limit := 100 // default if limitStr := r.URL.Query().Get("limit"); limitStr != "" { if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 5000 { limit = l } } offset := 0 if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" { if o, err := strconv.Atoi(offsetStr); err == nil && o >= 0 { offset = o } } statusFilter := r.URL.Query().Get("status") frameStartFilter := r.URL.Query().Get("frame_start") frameEndFilter := r.URL.Query().Get("frame_end") sortBy := r.URL.Query().Get("sort") if sortBy == "" { sortBy = "frame:asc" } // Parse sort parameter sortParts := strings.Split(sortBy, ":") sortField := "frame" sortDir := "ASC" if len(sortParts) == 2 { sortField = sortParts[0] sortDir = strings.ToUpper(sortParts[1]) if sortDir != "ASC" && sortDir != "DESC" { sortDir = "ASC" } validFields := map[string]bool{ "frame": true, "status": true, "created_at": true, "started_at": true, "completed_at": true, } if !validFields[sortField] { sortField = "frame" } } // Build query with filters query := `SELECT id, job_id, runner_id, frame, status, task_type, current_step, retry_count, max_retries, output_path, created_at, started_at, completed_at, error_message, timeout_seconds FROM tasks WHERE job_id = ?` args := []interface{}{jobID} if statusFilter != "" { statuses := strings.Split(statusFilter, ",") placeholders := make([]string, len(statuses)) for i, status := range statuses { placeholders[i] = "?" args = append(args, strings.TrimSpace(status)) } query += fmt.Sprintf(" AND status IN (%s)", strings.Join(placeholders, ",")) } if frameStartFilter != "" { if fs, err := strconv.Atoi(frameStartFilter); err == nil { query += " AND frame >= ?" args = append(args, fs) } } if frameEndFilter != "" { if fe, err := strconv.Atoi(frameEndFilter); err == nil { query += " AND frame <= ?" args = append(args, fe) } } query += fmt.Sprintf(" ORDER BY %s %s LIMIT ? OFFSET ?", sortField, sortDir) args = append(args, limit, offset) var rows *sql.Rows var total int err = s.db.With(func(conn *sql.DB) error { var err error rows, err = conn.Query(query, args...) if err != nil { return err } // Get total count countQuery := `SELECT COUNT(*) FROM tasks WHERE job_id = ?` countArgs := []interface{}{jobID} if statusFilter != "" { statuses := strings.Split(statusFilter, ",") placeholders := make([]string, len(statuses)) for i, status := range statuses { placeholders[i] = "?" countArgs = append(countArgs, strings.TrimSpace(status)) } countQuery += fmt.Sprintf(" AND status IN (%s)", strings.Join(placeholders, ",")) } if frameStartFilter != "" { if fs, err := strconv.Atoi(frameStartFilter); err == nil { countQuery += " AND frame >= ?" countArgs = append(countArgs, fs) } } if frameEndFilter != "" { if fe, err := strconv.Atoi(frameEndFilter); err == nil { countQuery += " AND frame <= ?" countArgs = append(countArgs, fe) } } err = conn.QueryRow(countQuery, countArgs...).Scan(&total) return err }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query tasks: %v", err)) return } defer rows.Close() tasks := []types.Task{} for rows.Next() { var task types.Task var runnerID sql.NullInt64 var startedAt, completedAt sql.NullTime var timeoutSeconds sql.NullInt64 var errorMessage sql.NullString var currentStep sql.NullString var outputPath sql.NullString err := rows.Scan( &task.ID, &task.JobID, &runnerID, &task.Frame, &task.Status, &task.TaskType, ¤tStep, &task.RetryCount, &task.MaxRetries, &outputPath, &task.CreatedAt, &startedAt, &completedAt, &errorMessage, &timeoutSeconds, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan task: %v", err)) return } if runnerID.Valid { task.RunnerID = &runnerID.Int64 } if startedAt.Valid { task.StartedAt = &startedAt.Time } if completedAt.Valid { task.CompletedAt = &completedAt.Time } if timeoutSeconds.Valid { timeout := int(timeoutSeconds.Int64) task.TimeoutSeconds = &timeout } if errorMessage.Valid { task.ErrorMessage = errorMessage.String } if currentStep.Valid { task.CurrentStep = currentStep.String } if outputPath.Valid { task.OutputPath = outputPath.String } tasks = append(tasks, task) } response := map[string]interface{}{ "data": tasks, "total": total, "limit": limit, "offset": offset, } // Generate ETag and check If-None-Match etag := generateETag(response) w.Header().Set("ETag", etag) if checkETag(r, etag) { w.WriteHeader(http.StatusNotModified) return } s.respondJSON(w, http.StatusOK, response) } // handleListJobTasksSummary lists lightweight task summaries for a job func (s *Manager) handleListJobTasksSummary(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } else { var exists bool err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) }) if err != nil || !exists { s.respondError(w, http.StatusNotFound, "Job not found") return } } // Parse query parameters limit := 0 // 0 means unlimited if limitStr := r.URL.Query().Get("limit"); limitStr != "" { if l, err := strconv.Atoi(limitStr); err == nil && l > 0 { limit = l } } offset := 0 if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" { if o, err := strconv.Atoi(offsetStr); err == nil && o >= 0 { offset = o } } statusFilter := r.URL.Query().Get("status") sortBy := r.URL.Query().Get("sort") if sortBy == "" { sortBy = "frame:asc" } sortParts := strings.Split(sortBy, ":") sortField := "frame" sortDir := "ASC" if len(sortParts) == 2 { sortField = sortParts[0] sortDir = strings.ToUpper(sortParts[1]) if sortDir != "ASC" && sortDir != "DESC" { sortDir = "ASC" } validFields := map[string]bool{ "frame": true, "status": true, } if !validFields[sortField] { sortField = "frame" } } // Build query - only select summary fields query := `SELECT id, frame, status, task_type, runner_id FROM tasks WHERE job_id = ?` args := []interface{}{jobID} if statusFilter != "" { statuses := strings.Split(statusFilter, ",") placeholders := make([]string, len(statuses)) for i, status := range statuses { placeholders[i] = "?" args = append(args, strings.TrimSpace(status)) } query += fmt.Sprintf(" AND status IN (%s)", strings.Join(placeholders, ",")) } query += fmt.Sprintf(" ORDER BY %s %s", sortField, sortDir) if limit > 0 { query += " LIMIT ? OFFSET ?" args = append(args, limit, offset) } else { // Unlimited - only apply offset if specified if offset > 0 { query += " OFFSET ?" args = append(args, offset) } } var rows *sql.Rows var total int err = s.db.With(func(conn *sql.DB) error { var err error rows, err = conn.Query(query, args...) if err != nil { return err } // Get total count countQuery := `SELECT COUNT(*) FROM tasks WHERE job_id = ?` countArgs := []interface{}{jobID} if statusFilter != "" { statuses := strings.Split(statusFilter, ",") placeholders := make([]string, len(statuses)) for i, status := range statuses { placeholders[i] = "?" countArgs = append(countArgs, strings.TrimSpace(status)) } countQuery += fmt.Sprintf(" AND status IN (%s)", strings.Join(placeholders, ",")) } err = conn.QueryRow(countQuery, countArgs...).Scan(&total) if err != nil { total = -1 } return nil }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query tasks: %v", err)) return } defer rows.Close() type TaskSummary struct { ID int64 `json:"id"` Frame int `json:"frame"` Status string `json:"status"` TaskType string `json:"task_type"` RunnerID *int64 `json:"runner_id,omitempty"` } summaries := []TaskSummary{} for rows.Next() { var summary TaskSummary var runnerID sql.NullInt64 err := rows.Scan( &summary.ID, &summary.Frame, &summary.Status, &summary.TaskType, &runnerID, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan task: %v", err)) return } if runnerID.Valid { summary.RunnerID = &runnerID.Int64 } summaries = append(summaries, summary) } response := map[string]interface{}{ "data": summaries, "total": total, "limit": limit, "offset": offset, } s.respondJSON(w, http.StatusOK, response) } // handleBatchGetTasks fetches multiple tasks by IDs for a job func (s *Manager) handleBatchGetTasks(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } else { var exists bool err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) }) if err != nil || !exists { s.respondError(w, http.StatusNotFound, "Job not found") return } } var req struct { TaskIDs []int64 `json:"task_ids"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } if len(req.TaskIDs) == 0 { s.respondJSON(w, http.StatusOK, []types.Task{}) return } if len(req.TaskIDs) > 500 { s.respondError(w, http.StatusBadRequest, "Maximum 500 task IDs allowed per batch") return } // Build query with IN clause placeholders := make([]string, len(req.TaskIDs)) args := make([]interface{}, len(req.TaskIDs)+1) args[0] = jobID for i, taskID := range req.TaskIDs { placeholders[i] = "?" args[i+1] = taskID } query := fmt.Sprintf(`SELECT id, job_id, runner_id, frame, status, task_type, current_step, retry_count, max_retries, output_path, created_at, started_at, completed_at, error_message, timeout_seconds FROM tasks WHERE job_id = ? AND id IN (%s) ORDER BY frame ASC`, strings.Join(placeholders, ",")) var rows *sql.Rows err = s.db.With(func(conn *sql.DB) error { var err error rows, err = conn.Query(query, args...) return err }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query tasks: %v", err)) return } defer rows.Close() tasks := []types.Task{} for rows.Next() { var task types.Task var runnerID sql.NullInt64 var startedAt, completedAt sql.NullTime var timeoutSeconds sql.NullInt64 var errorMessage sql.NullString var currentStep sql.NullString var outputPath sql.NullString err := rows.Scan( &task.ID, &task.JobID, &runnerID, &task.Frame, &task.Status, &task.TaskType, ¤tStep, &task.RetryCount, &task.MaxRetries, &outputPath, &task.CreatedAt, &startedAt, &completedAt, &errorMessage, &timeoutSeconds, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan task: %v", err)) return } if runnerID.Valid { task.RunnerID = &runnerID.Int64 } if startedAt.Valid { task.StartedAt = &startedAt.Time } if completedAt.Valid { task.CompletedAt = &completedAt.Time } if timeoutSeconds.Valid { timeout := int(timeoutSeconds.Int64) task.TimeoutSeconds = &timeout } if errorMessage.Valid { task.ErrorMessage = errorMessage.String } if currentStep.Valid { task.CurrentStep = currentStep.String } if outputPath.Valid { task.OutputPath = outputPath.String } tasks = append(tasks, task) } s.respondJSON(w, http.StatusOK, tasks) } // handleGetTaskLogs retrieves logs for a specific task func (s *Manager) handleGetTaskLogs(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } taskID, err := parseID(r, "taskId") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } else { // Admin: verify job exists var exists bool err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) }) if err != nil || !exists { s.respondError(w, http.StatusNotFound, "Job not found") return } } // Verify task belongs to job var taskJobID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT job_id FROM tasks WHERE id = ?", taskID).Scan(&taskJobID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Task not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err)) return } if taskJobID != jobID { s.respondError(w, http.StatusBadRequest, "Task does not belong to this job") return } // Get query parameters for filtering stepName := r.URL.Query().Get("step_name") logLevel := r.URL.Query().Get("log_level") sinceIDStr := r.URL.Query().Get("since_id") limitStr := r.URL.Query().Get("limit") limit := 100 // default (reduced from 1000) if limitStr != "" { if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 10000 { limit = l } } // Build query query := `SELECT id, task_id, runner_id, log_level, message, step_name, created_at FROM task_logs WHERE task_id = ?` args := []interface{}{taskID} // Add since_id filter for incremental updates if sinceIDStr != "" { if sinceID, err := strconv.ParseInt(sinceIDStr, 10, 64); err == nil && sinceID > 0 { query += " AND id > ?" args = append(args, sinceID) } } if stepName != "" { query += " AND step_name = ?" args = append(args, stepName) } if logLevel != "" { query += " AND log_level = ?" args = append(args, logLevel) } query += " ORDER BY id ASC LIMIT ?" args = append(args, limit) var rows *sql.Rows err = s.db.With(func(conn *sql.DB) error { var err error rows, err = conn.Query(query, args...) return err }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query logs: %v", err)) return } defer rows.Close() logs := []types.TaskLog{} for rows.Next() { var log types.TaskLog var runnerID sql.NullInt64 err := rows.Scan( &log.ID, &log.TaskID, &runnerID, &log.LogLevel, &log.Message, &log.StepName, &log.CreatedAt, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan log: %v", err)) return } if runnerID.Valid { log.RunnerID = &runnerID.Int64 } logs = append(logs, log) } // Return last_id for next incremental fetch lastID := int64(0) if len(logs) > 0 { lastID = logs[len(logs)-1].ID } response := map[string]interface{}{ "logs": logs, "last_id": lastID, "limit": limit, } s.respondJSON(w, http.StatusOK, response) } // handleGetTaskSteps retrieves step timeline for a specific task func (s *Manager) handleGetTaskSteps(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } taskID, err := parseID(r, "taskId") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } else { // Admin: verify job exists var exists bool err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) }) if err != nil || !exists { s.respondError(w, http.StatusNotFound, "Job not found") return } } // Verify task belongs to job var taskJobID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT job_id FROM tasks WHERE id = ?", taskID).Scan(&taskJobID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Task not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err)) return } if taskJobID != jobID { s.respondError(w, http.StatusBadRequest, "Task does not belong to this job") return } var rows *sql.Rows err = s.db.With(func(conn *sql.DB) error { var err error rows, err = conn.Query( `SELECT id, task_id, step_name, status, started_at, completed_at, duration_ms, error_message FROM task_steps WHERE task_id = ? ORDER BY started_at ASC`, taskID, ) return err }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to query steps: %v", err)) return } defer rows.Close() steps := []types.TaskStep{} for rows.Next() { var step types.TaskStep var startedAt, completedAt sql.NullTime var durationMs sql.NullInt64 var errorMessage sql.NullString err := rows.Scan( &step.ID, &step.TaskID, &step.StepName, &step.Status, &startedAt, &completedAt, &durationMs, &errorMessage, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan step: %v", err)) return } if startedAt.Valid { step.StartedAt = &startedAt.Time } if completedAt.Valid { step.CompletedAt = &completedAt.Time } if durationMs.Valid { duration := int(durationMs.Int64) step.DurationMs = &duration } if errorMessage.Valid { step.ErrorMessage = errorMessage.String } steps = append(steps, step) } s.respondJSON(w, http.StatusOK, steps) } // handleRetryTask retries a failed task func (s *Manager) handleRetryTask(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } taskID, err := parseID(r, "taskId") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } // Verify task belongs to job and is in a retryable state var taskJobID int64 var taskStatus string var retryCount, maxRetries int err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow( "SELECT job_id, status, retry_count, max_retries FROM tasks WHERE id = ?", taskID, ).Scan(&taskJobID, &taskStatus, &retryCount, &maxRetries) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Task not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err)) return } if taskJobID != jobID { s.respondError(w, http.StatusBadRequest, "Task does not belong to this job") return } if taskStatus != string(types.TaskStatusFailed) { s.respondError(w, http.StatusBadRequest, "Task is not in failed state") return } if retryCount >= maxRetries { s.respondError(w, http.StatusBadRequest, "Maximum retries exceeded") return } // Reset task to pending err = s.db.With(func(conn *sql.DB) error { _, err := conn.Exec( `UPDATE tasks SET status = ?, runner_id = NULL, current_step = NULL, error_message = NULL, started_at = NULL, completed_at = NULL WHERE id = ?`, types.TaskStatusPending, taskID, ) if err != nil { return err } // Clear steps and logs for fresh retry _, err = conn.Exec(`DELETE FROM task_steps WHERE task_id = ?`, taskID) if err != nil { return err } _, err = conn.Exec(`DELETE FROM task_logs WHERE task_id = ?`, taskID) return err }) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to retry task: %v", err)) return } // Broadcast task reset to clients (includes steps_cleared and logs_cleared flags) s.broadcastTaskUpdate(jobID, taskID, "task_reset", map[string]interface{}{ "status": types.TaskStatusPending, "runner_id": nil, "current_step": nil, "error_message": nil, "steps_cleared": true, "logs_cleared": true, }) s.respondJSON(w, http.StatusOK, map[string]string{"message": "Task queued for retry"}) } // handleStreamTaskLogsWebSocket streams task logs via WebSocket // Note: This is called after auth middleware, so userID is already verified func (s *Manager) handleStreamTaskLogsWebSocket(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } taskID, err := parseID(r, "taskId") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } // Verify task belongs to job var taskJobID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT job_id FROM tasks WHERE id = ?", taskID).Scan(&taskJobID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Task not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify task: %v", err)) return } if taskJobID != jobID { s.respondError(w, http.StatusBadRequest, "Task does not belong to this job") return } // Upgrade to WebSocket conn, err := s.wsUpgrader.Upgrade(w, r, nil) if err != nil { log.Printf("Failed to upgrade WebSocket: %v", err) return } defer conn.Close() key := fmt.Sprintf("%d:%d", jobID, taskID) s.frontendConnsMu.Lock() s.frontendConns[key] = conn s.frontendConnsMu.Unlock() // Create a write mutex for this connection s.frontendConnsWriteMuMu.Lock() s.frontendConnsWriteMu[key] = &sync.Mutex{} writeMu := s.frontendConnsWriteMu[key] s.frontendConnsWriteMuMu.Unlock() defer func() { s.frontendConnsMu.Lock() delete(s.frontendConns, key) s.frontendConnsMu.Unlock() s.frontendConnsWriteMuMu.Lock() delete(s.frontendConnsWriteMu, key) s.frontendConnsWriteMuMu.Unlock() }() // Send initial connection message writeMu.Lock() err = conn.WriteJSON(map[string]interface{}{ "type": "connected", "timestamp": time.Now().Unix(), }) writeMu.Unlock() if err != nil { log.Printf("Failed to send initial connection message: %v", err) return } // Get last log ID to start streaming from lastIDStr := r.URL.Query().Get("last_id") lastID := int64(0) if lastIDStr != "" { if id, err := strconv.ParseInt(lastIDStr, 10, 64); err == nil { lastID = id } } // Send existing logs // Order by id ASC to ensure consistent ordering and avoid race conditions var rows *sql.Rows err = s.db.With(func(conn *sql.DB) error { var err error rows, err = conn.Query( `SELECT id, task_id, runner_id, log_level, message, step_name, created_at FROM task_logs WHERE task_id = ? AND id > ? ORDER BY id ASC LIMIT 100`, taskID, lastID, ) return err }) if err == nil { defer rows.Close() for rows.Next() { var log types.TaskLog var runnerID sql.NullInt64 err := rows.Scan( &log.ID, &log.TaskID, &runnerID, &log.LogLevel, &log.Message, &log.StepName, &log.CreatedAt, ) if err != nil { continue } if runnerID.Valid { log.RunnerID = &runnerID.Int64 } // Always update lastID to the highest ID we've seen if log.ID > lastID { lastID = log.ID } // Serialize writes to prevent concurrent write panics writeMu.Lock() writeErr := conn.WriteJSON(map[string]interface{}{ "type": "log", "data": log, "timestamp": time.Now().Unix(), }) writeMu.Unlock() if writeErr != nil { // Connection closed, exit the loop return } } } // Poll for new logs and send them // Use shorter interval for more responsive updates, but order by id for consistency ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() ctx := r.Context() for { select { case <-ctx.Done(): return case <-ticker.C: var logs []types.TaskLog err := s.db.With(func(dbConn *sql.DB) error { rows, err := dbConn.Query( `SELECT id, task_id, runner_id, log_level, message, step_name, created_at FROM task_logs WHERE task_id = ? AND id > ? ORDER BY id ASC LIMIT 100`, taskID, lastID, ) if err != nil { return err } defer rows.Close() for rows.Next() { var log types.TaskLog var runnerID sql.NullInt64 err := rows.Scan( &log.ID, &log.TaskID, &runnerID, &log.LogLevel, &log.Message, &log.StepName, &log.CreatedAt, ) if err != nil { continue } if runnerID.Valid { log.RunnerID = &runnerID.Int64 } lastID = log.ID logs = append(logs, log) } return nil }) if err != nil { continue } // Send logs to client (outside With callback to access websocket conn) for _, log := range logs { msg := map[string]interface{}{ "type": "log", "task_id": taskID, "data": log, "timestamp": time.Now().Unix(), } writeMu.Lock() writeErr := conn.WriteJSON(msg) writeMu.Unlock() if writeErr != nil { return } } } } } // handleClientWebSocket handles the unified client WebSocket connection with subscription protocol func (s *Manager) handleClientWebSocket(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } // Check if user is admin isAdmin := isAdminUser(r) // Upgrade to WebSocket conn, err := s.wsUpgrader.Upgrade(w, r, nil) if err != nil { log.Printf("Failed to upgrade WebSocket: %v", err) return } defer conn.Close() // Generate unique connection ID for this tab/connection connNum := atomic.AddUint64(&s.connIDCounter, 1) connID := fmt.Sprintf("%d:%d", userID, connNum) // Create client connection clientConn := &ClientConnection{ Conn: conn, UserID: userID, ConnID: connID, IsAdmin: isAdmin, Subscriptions: make(map[string]bool), WriteMu: &sync.Mutex{}, } // Register connection (no need to close old - multiple connections per user are allowed) s.clientConnsMu.Lock() s.clientConns[connID] = clientConn s.clientConnsMu.Unlock() log.Printf("handleClientWebSocket: Registered client connection %s for user %d", connID, userID) defer func() { s.clientConnsMu.Lock() delete(s.clientConns, connID) s.clientConnsMu.Unlock() log.Printf("handleClientWebSocket: Removed client connection %s for user %d", connID, userID) }() // Send initial connection message clientConn.WriteMu.Lock() err = conn.WriteJSON(map[string]interface{}{ "type": "connected", "timestamp": time.Now().Unix(), }) clientConn.WriteMu.Unlock() if err != nil { log.Printf("Failed to send initial connection message: %v", err) return } // Set up ping/pong conn.SetReadDeadline(time.Now().Add(WSReadDeadline)) conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(WSReadDeadline)) // Reset deadline on pong return nil }) // Start ping ticker ticker := time.NewTicker(WSPingInterval) defer ticker.Stop() // Message handling channel - increased buffer size to prevent blocking messageChan := make(chan map[string]interface{}, 100) // Read messages in background readDone := make(chan struct{}) go func() { defer close(readDone) for { conn.SetReadDeadline(time.Now().Add(WSReadDeadline)) // Increased timeout messageType, message, err := conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("WebSocket read error for client %d: %v", userID, err) } else { log.Printf("WebSocket read error for client %d (expected close): %v", userID, err) } return } // Handle control frames (pong, ping, close) if messageType == websocket.PongMessage { // Pong received - connection is alive, reset deadline conn.SetReadDeadline(time.Now().Add(WSReadDeadline)) continue } if messageType == websocket.PingMessage { // Respond to ping with pong conn.SetWriteDeadline(time.Now().Add(WSWriteDeadline)) conn.WriteMessage(websocket.PongMessage, message) conn.SetReadDeadline(time.Now().Add(WSReadDeadline)) continue } if messageType != websocket.TextMessage { // Skip non-text messages continue } // Parse JSON message var msg map[string]interface{} if err := json.Unmarshal(message, &msg); err != nil { log.Printf("Failed to parse JSON message from client %d: %v", userID, err) continue } messageChan <- msg conn.SetReadDeadline(time.Now().Add(WSReadDeadline)) } }() ctx := r.Context() for { select { case <-ctx.Done(): log.Printf("handleClientWebSocket: Context cancelled for user %d", userID) return case <-readDone: log.Printf("handleClientWebSocket: Read done for user %d", userID) return case msg := <-messageChan: s.handleClientMessage(clientConn, msg) case <-ticker.C: // Reset read deadline before sending ping to ensure we can receive pong conn.SetReadDeadline(time.Now().Add(WSReadDeadline)) clientConn.WriteMu.Lock() // Use WriteControl for ping frames (control frames) if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WSWriteDeadline)); err != nil { log.Printf("handleClientWebSocket: Ping failed for user %d: %v", userID, err) clientConn.WriteMu.Unlock() return } clientConn.WriteMu.Unlock() } } } // handleClientMessage processes messages from client WebSocket func (s *Manager) handleClientMessage(clientConn *ClientConnection, msg map[string]interface{}) { msgType, ok := msg["type"].(string) if !ok { return } switch msgType { case "subscribe": channel, ok := msg["channel"].(string) if !ok { // Send error for invalid channel format clientConn.WriteMu.Lock() if err := clientConn.Conn.WriteJSON(map[string]interface{}{ "type": "subscription_error", "channel": channel, "error": "Invalid channel format", }); err != nil { log.Printf("Failed to send subscription_error to client %d: %v", clientConn.UserID, err) } clientConn.WriteMu.Unlock() return } // Check if already subscribed clientConn.SubsMu.Lock() alreadySubscribed := clientConn.Subscriptions[channel] clientConn.SubsMu.Unlock() if alreadySubscribed { // Already subscribed - just send confirmation, don't send initial state again if s.verboseWSLogging { log.Printf("Client %d already subscribed to channel: %s (skipping initial state)", clientConn.UserID, channel) } clientConn.WriteMu.Lock() if err := clientConn.Conn.WriteJSON(map[string]interface{}{ "type": "subscribed", "channel": channel, }); err != nil { log.Printf("Failed to send subscribed confirmation to client %d: %v", clientConn.UserID, err) } clientConn.WriteMu.Unlock() return } // Validate channel access if s.canSubscribe(clientConn, channel) { clientConn.SubsMu.Lock() clientConn.Subscriptions[channel] = true clientConn.SubsMu.Unlock() if s.verboseWSLogging { log.Printf("Client %d subscribed to channel: %s", clientConn.UserID, channel) } // Send success confirmation clientConn.WriteMu.Lock() if err := clientConn.Conn.WriteJSON(map[string]interface{}{ "type": "subscribed", "channel": channel, }); err != nil { log.Printf("Failed to send subscribed confirmation to client %d: %v", clientConn.UserID, err) clientConn.WriteMu.Unlock() return } clientConn.WriteMu.Unlock() // Send initial state for the subscribed channel (only on first subscription) go s.sendInitialState(clientConn, channel) } else { // Subscription failed - send error to client log.Printf("Client %d failed to subscribe to channel: %s (job may not exist or access denied)", clientConn.UserID, channel) clientConn.WriteMu.Lock() if err := clientConn.Conn.WriteJSON(map[string]interface{}{ "type": "subscription_error", "channel": channel, "error": "Channel not found or access denied", }); err != nil { log.Printf("Failed to send subscription_error to client %d: %v", clientConn.UserID, err) } clientConn.WriteMu.Unlock() } case "unsubscribe": channel, ok := msg["channel"].(string) if !ok { return } clientConn.SubsMu.Lock() delete(clientConn.Subscriptions, channel) clientConn.SubsMu.Unlock() if s.verboseWSLogging { log.Printf("Client %d unsubscribed from channel: %s", clientConn.UserID, channel) } } } // canSubscribe checks if a client can subscribe to a channel func (s *Manager) canSubscribe(clientConn *ClientConnection, channel string) bool { // Always allow jobs channel (always broadcasted, but subscription doesn't hurt) if channel == "jobs" { return true } // Check channel format if strings.HasPrefix(channel, "job:") { // Extract job ID jobIDStr := strings.TrimPrefix(channel, "job:") jobID, err := strconv.ParseInt(jobIDStr, 10, 64) if err != nil { return false } // Verify job belongs to user (unless admin) if clientConn.IsAdmin { var exists bool err := s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) }) return err == nil && exists } var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) return err == nil && jobUserID == clientConn.UserID } if strings.HasPrefix(channel, "logs:") { // Format: logs:jobId:taskId parts := strings.Split(channel, ":") if len(parts) != 3 { return false } jobID, err := strconv.ParseInt(parts[1], 10, 64) if err != nil { return false } // Verify job belongs to user (unless admin) if clientConn.IsAdmin { var exists bool err := s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) }) return err == nil && exists } var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) return err == nil && jobUserID == clientConn.UserID } if strings.HasPrefix(channel, "upload:") { // Format: upload:sessionId sessionID := strings.TrimPrefix(channel, "upload:") s.uploadSessionsMu.RLock() session, exists := s.uploadSessions[sessionID] s.uploadSessionsMu.RUnlock() // Verify session belongs to user return exists && session.UserID == clientConn.UserID } if channel == "runners" { // Only admins can subscribe to runners return clientConn.IsAdmin } return false } // sendInitialState sends the current state when a client subscribes to a channel func (s *Manager) sendInitialState(clientConn *ClientConnection, channel string) { // Use a shorter write deadline for initial state to avoid blocking too long // If the connection is slow/dead, we want to fail fast writeTimeout := 5 * time.Second // Check if connection is still valid before starting clientConn.WriteMu.Lock() // Set a reasonable write deadline clientConn.Conn.SetWriteDeadline(time.Now().Add(writeTimeout)) clientConn.WriteMu.Unlock() if strings.HasPrefix(channel, "job:") { // Send initial job state jobIDStr := strings.TrimPrefix(channel, "job:") jobID, err := strconv.ParseInt(jobIDStr, 10, 64) if err != nil { return } // Get job from database var job types.Job var jobType string var startedAt, completedAt sql.NullTime var blendMetadataJSON sql.NullString var errorMessage sql.NullString var frameStart, frameEnd sql.NullInt64 var outputFormat sql.NullString query := "SELECT id, user_id, job_type, name, status, progress, frame_start, frame_end, output_format, blend_metadata, created_at, started_at, completed_at, error_message FROM jobs WHERE id = ?" if !clientConn.IsAdmin { query += " AND user_id = ?" } var err2 error err2 = s.db.With(func(conn *sql.DB) error { if clientConn.IsAdmin { return conn.QueryRow(query, jobID).Scan( &job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress, &frameStart, &frameEnd, &outputFormat, &blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &errorMessage, ) } else { return conn.QueryRow(query, jobID, clientConn.UserID).Scan( &job.ID, &job.UserID, &jobType, &job.Name, &job.Status, &job.Progress, &frameStart, &frameEnd, &outputFormat, &blendMetadataJSON, &job.CreatedAt, &startedAt, &completedAt, &errorMessage, ) } }) if err2 != nil { return } if frameStart.Valid { fs := int(frameStart.Int64) job.FrameStart = &fs } if frameEnd.Valid { fe := int(frameEnd.Int64) job.FrameEnd = &fe } if outputFormat.Valid { of := outputFormat.String job.OutputFormat = &of } if startedAt.Valid { job.StartedAt = &startedAt.Time } if completedAt.Valid { job.CompletedAt = &completedAt.Time } if errorMessage.Valid { job.ErrorMessage = errorMessage.String } // Send job_update with full job data clientConn.WriteMu.Lock() clientConn.Conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) writeErr := clientConn.Conn.WriteJSON(map[string]interface{}{ "type": "job_update", "channel": channel, "job_id": jobID, "data": job, "timestamp": time.Now().Unix(), }) clientConn.WriteMu.Unlock() if writeErr != nil { log.Printf("Failed to send initial job_update to client %d: %v", clientConn.UserID, writeErr) return } // Get and send tasks (no limit - send all) err = s.db.With(func(conn *sql.DB) error { rows, err2 := conn.Query( `SELECT id, job_id, runner_id, frame, status, task_type, current_step, retry_count, max_retries, output_path, created_at, started_at, completed_at, error_message, timeout_seconds FROM tasks WHERE job_id = ? ORDER BY frame ASC`, jobID, ) if err2 != nil { return err2 } defer rows.Close() for rows.Next() { var task types.Task var runnerID sql.NullInt64 var startedAt, completedAt sql.NullTime var timeoutSeconds sql.NullInt64 var errorMessage sql.NullString var currentStep sql.NullString var outputPath sql.NullString err := rows.Scan( &task.ID, &task.JobID, &runnerID, &task.Frame, &task.Status, &task.TaskType, ¤tStep, &task.RetryCount, &task.MaxRetries, &outputPath, &task.CreatedAt, &startedAt, &completedAt, &errorMessage, &timeoutSeconds, ) if err != nil { continue } if runnerID.Valid { task.RunnerID = &runnerID.Int64 } if startedAt.Valid { task.StartedAt = &startedAt.Time } if completedAt.Valid { task.CompletedAt = &completedAt.Time } if timeoutSeconds.Valid { timeout := int(timeoutSeconds.Int64) task.TimeoutSeconds = &timeout } if errorMessage.Valid { task.ErrorMessage = errorMessage.String } if currentStep.Valid { task.CurrentStep = currentStep.String } if outputPath.Valid { task.OutputPath = outputPath.String } // Send task_update clientConn.WriteMu.Lock() clientConn.Conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) writeErr := clientConn.Conn.WriteJSON(map[string]interface{}{ "type": "task_update", "channel": channel, "job_id": jobID, "task_id": task.ID, "data": task, "timestamp": time.Now().Unix(), }) clientConn.WriteMu.Unlock() if writeErr != nil { log.Printf("Failed to send initial task_update to client %d: %v", clientConn.UserID, writeErr) // Connection is likely closed, stop sending more messages break } } return nil }) } else if strings.HasPrefix(channel, "logs:") { // Send initial logs for the task parts := strings.Split(channel, ":") if len(parts) != 3 { return } jobID, err := strconv.ParseInt(parts[1], 10, 64) if err != nil { return } taskID, err := strconv.ParseInt(parts[2], 10, 64) if err != nil { return } // Get existing logs (no limit - send all) err = s.db.With(func(conn *sql.DB) error { rows, err2 := conn.Query( `SELECT id, task_id, runner_id, log_level, message, step_name, created_at FROM task_logs WHERE task_id = ? ORDER BY id ASC`, taskID, ) if err2 != nil { return err2 } defer rows.Close() for rows.Next() { var taskLog types.TaskLog var runnerID sql.NullInt64 err := rows.Scan( &taskLog.ID, &taskLog.TaskID, &runnerID, &taskLog.LogLevel, &taskLog.Message, &taskLog.StepName, &taskLog.CreatedAt, ) if err != nil { continue } if runnerID.Valid { taskLog.RunnerID = &runnerID.Int64 } // Send log clientConn.WriteMu.Lock() clientConn.Conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) writeErr := clientConn.Conn.WriteJSON(map[string]interface{}{ "type": "log", "channel": channel, "task_id": taskID, "job_id": jobID, "data": taskLog, "timestamp": time.Now().Unix(), }) clientConn.WriteMu.Unlock() if writeErr != nil { log.Printf("Failed to send initial log to client %d: %v", clientConn.UserID, writeErr) // Connection is likely closed, stop sending more messages break } } return nil }) } else if channel == "runners" { // Send initial runner list (only for admins) if !clientConn.IsAdmin { return } s.db.With(func(conn *sql.DB) error { rows, err2 := conn.Query( `SELECT id, name, hostname, ip_address, status, last_heartbeat, capabilities, priority, created_at FROM runners ORDER BY id ASC`, ) if err2 != nil { return err2 } defer rows.Close() for rows.Next() { var runner types.Runner err := rows.Scan( &runner.ID, &runner.Name, &runner.Hostname, &runner.IPAddress, &runner.Status, &runner.LastHeartbeat, &runner.Capabilities, &runner.Priority, &runner.CreatedAt, ) if err != nil { continue } // Send runner_status clientConn.WriteMu.Lock() clientConn.Conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) writeErr := clientConn.Conn.WriteJSON(map[string]interface{}{ "type": "runner_status", "channel": channel, "runner_id": runner.ID, "data": runner, "timestamp": time.Now().Unix(), }) clientConn.WriteMu.Unlock() if writeErr != nil { log.Printf("Failed to send initial runner_status to client %d: %v", clientConn.UserID, writeErr) // Connection is likely closed, stop sending more messages break } } return nil }) } else if strings.HasPrefix(channel, "upload:") { // Send initial upload session state sessionID := strings.TrimPrefix(channel, "upload:") s.uploadSessionsMu.RLock() session, exists := s.uploadSessions[sessionID] s.uploadSessionsMu.RUnlock() if exists && session.UserID == clientConn.UserID { msgType := "upload_progress" if session.Status != "uploading" { msgType = "processing_status" } clientConn.WriteMu.Lock() clientConn.Conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) writeErr := clientConn.Conn.WriteJSON(map[string]interface{}{ "type": msgType, "channel": channel, "session_id": sessionID, "data": map[string]interface{}{ "progress": session.Progress, "status": session.Status, "message": session.Message, }, "timestamp": time.Now().Unix(), }) clientConn.WriteMu.Unlock() if writeErr != nil { log.Printf("Failed to send initial upload state to client %d: %v", clientConn.UserID, writeErr) return } } } } // handleJobsWebSocket handles WebSocket connection for job list updates func (s *Manager) handleJobsWebSocket(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } // Upgrade to WebSocket conn, err := s.wsUpgrader.Upgrade(w, r, nil) if err != nil { log.Printf("Failed to upgrade WebSocket: %v", err) return } defer conn.Close() // Register connection s.jobListConnsMu.Lock() // Close existing connection if any if oldConn, exists := s.jobListConns[userID]; exists && oldConn != nil { oldConn.Close() } s.jobListConns[userID] = conn s.jobListConnsMu.Unlock() defer func() { s.jobListConnsMu.Lock() delete(s.jobListConns, userID) s.jobListConnsMu.Unlock() }() // Send initial connection message err = conn.WriteJSON(map[string]interface{}{ "type": "connected", "timestamp": time.Now().Unix(), }) if err != nil { log.Printf("Failed to send initial connection message: %v", err) return } // Keep connection alive and handle ping/pong conn.SetReadDeadline(time.Now().Add(60 * time.Second)) conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(60 * time.Second)) return nil }) // Start ping ticker ticker := time.NewTicker(WSPingInterval) defer ticker.Stop() // Read messages in background to keep connection alive and handle pongs readDone := make(chan struct{}) go func() { defer close(readDone) for { conn.SetReadDeadline(time.Now().Add(60 * time.Second)) _, _, err := conn.ReadMessage() if err != nil { // Connection closed or error - exit read loop if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("WebSocket read error for job list: %v", err) } return } // Reset read deadline after successful read (pong received) conn.SetReadDeadline(time.Now().Add(60 * time.Second)) } }() ctx := r.Context() for { select { case <-ctx.Done(): return case <-readDone: // Read loop exited, close connection return case <-ticker.C: // Reset read deadline before sending ping to ensure we can receive pong conn.SetReadDeadline(time.Now().Add(60 * time.Second)) // Use WriteControl for ping frames (control frames) if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WSWriteDeadline)); err != nil { return } } } } // handleJobWebSocket handles WebSocket connection for single job updates func (s *Manager) handleJobWebSocket(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } jobID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } // Verify job belongs to user (unless admin) isAdmin := isAdminUser(r) if !isAdmin { var jobUserID int64 err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&jobUserID) }) if err == sql.ErrNoRows { s.respondError(w, http.StatusNotFound, "Job not found") return } if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to verify job: %v", err)) return } if jobUserID != userID { s.respondError(w, http.StatusForbidden, "Access denied") return } } else { var exists bool err = s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT EXISTS(SELECT 1 FROM jobs WHERE id = ?)", jobID).Scan(&exists) }) if err != nil || !exists { s.respondError(w, http.StatusNotFound, "Job not found") return } } // Upgrade to WebSocket conn, err := s.wsUpgrader.Upgrade(w, r, nil) if err != nil { log.Printf("Failed to upgrade WebSocket: %v", err) return } defer conn.Close() key := fmt.Sprintf("%d:%d", userID, jobID) s.jobConnsMu.Lock() // Close existing connection if any if oldConn, exists := s.jobConns[key]; exists && oldConn != nil { oldConn.Close() } s.jobConns[key] = conn s.jobConnsMu.Unlock() // Create a write mutex for this connection s.jobConnsWriteMuMu.Lock() s.jobConnsWriteMu[key] = &sync.Mutex{} writeMu := s.jobConnsWriteMu[key] s.jobConnsWriteMuMu.Unlock() defer func() { s.jobConnsMu.Lock() delete(s.jobConns, key) s.jobConnsMu.Unlock() s.jobConnsWriteMuMu.Lock() delete(s.jobConnsWriteMu, key) s.jobConnsWriteMuMu.Unlock() }() // Send initial connection message writeMu.Lock() err = conn.WriteJSON(map[string]interface{}{ "type": "connected", "timestamp": time.Now().Unix(), }) writeMu.Unlock() if err != nil { log.Printf("Failed to send initial connection message: %v", err) return } // Keep connection alive and handle ping/pong conn.SetReadDeadline(time.Now().Add(60 * time.Second)) conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(60 * time.Second)) return nil }) // Start ping ticker ticker := time.NewTicker(WSPingInterval) defer ticker.Stop() // Read messages in background to keep connection alive and handle pongs readDone := make(chan struct{}) go func() { defer close(readDone) for { conn.SetReadDeadline(time.Now().Add(60 * time.Second)) _, _, err := conn.ReadMessage() if err != nil { // Connection closed or error - exit read loop if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("WebSocket read error for job %d: %v", jobID, err) } return } // Reset read deadline after successful read (pong received) conn.SetReadDeadline(time.Now().Add(60 * time.Second)) } }() ctx := r.Context() for { select { case <-ctx.Done(): return case <-readDone: // Read loop exited, close connection return case <-ticker.C: // Reset read deadline before sending ping to ensure we can receive pong conn.SetReadDeadline(time.Now().Add(60 * time.Second)) // Use WriteControl for ping frames (control frames) if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WSWriteDeadline)); err != nil { return } } } } // broadcastJobUpdate broadcasts job update to connected clients func (s *Manager) broadcastJobUpdate(jobID int64, updateType string, data interface{}) { // Get user_id from job var userID int64 err := s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&userID) }) if err != nil { return } msg := map[string]interface{}{ "type": updateType, "job_id": jobID, "data": data, "timestamp": time.Now().Unix(), } // Always broadcast to jobs channel (all clients receive this) if updateType == "job_update" || updateType == "job_created" { // For job_update, only send status and progress to jobs channel if updateType == "job_update" { if dataMap, ok := data.(map[string]interface{}); ok { // Only include status and progress for jobs channel jobsData := map[string]interface{}{} if status, ok := dataMap["status"]; ok { jobsData["status"] = status } if progress, ok := dataMap["progress"]; ok { jobsData["progress"] = progress } jobsMsg := map[string]interface{}{ "type": updateType, "job_id": jobID, "data": jobsData, "timestamp": time.Now().Unix(), } s.broadcastToAllClients("jobs", jobsMsg) } } else { // job_created - send full data to all clients s.broadcastToAllClients("jobs", msg) } } // Only broadcast if client is connected if s.isClientConnected(userID) { // Broadcast to client WebSocket if subscribed to job:{id} channel := fmt.Sprintf("job:%d", jobID) s.broadcastToClient(userID, channel, msg) } // Also broadcast to old WebSocket connections (for backwards compatibility during migration) s.jobListConnsMu.RLock() if conn, exists := s.jobListConns[userID]; exists && conn != nil { s.jobListConnsMu.RUnlock() conn.SetWriteDeadline(time.Now().Add(WSWriteDeadline)) conn.WriteJSON(msg) } else { s.jobListConnsMu.RUnlock() } // Broadcast to single job connection key := fmt.Sprintf("%d:%d", userID, jobID) s.jobConnsMu.RLock() conn, exists := s.jobConns[key] s.jobConnsMu.RUnlock() if exists && conn != nil { s.jobConnsWriteMuMu.RLock() writeMu, hasMu := s.jobConnsWriteMu[key] s.jobConnsWriteMuMu.RUnlock() if hasMu && writeMu != nil { writeMu.Lock() conn.SetWriteDeadline(time.Now().Add(WSWriteDeadline)) err := conn.WriteJSON(msg) writeMu.Unlock() if err != nil { log.Printf("Failed to broadcast %s to job %d WebSocket: %v", updateType, jobID, err) } } else { conn.SetWriteDeadline(time.Now().Add(WSWriteDeadline)) conn.WriteJSON(msg) } } } // broadcastTaskUpdate broadcasts task update to connected clients func (s *Manager) broadcastTaskUpdate(jobID int64, taskID int64, updateType string, data interface{}) { // Get user_id from job var userID int64 err := s.db.With(func(conn *sql.DB) error { return conn.QueryRow("SELECT user_id FROM jobs WHERE id = ?", jobID).Scan(&userID) }) if err != nil { log.Printf("broadcastTaskUpdate: Failed to get user_id for job %d: %v", jobID, err) return } msg := map[string]interface{}{ "type": updateType, "job_id": jobID, "data": data, "timestamp": time.Now().Unix(), } // Always include task_id if it's provided (even if 0, for consistency) // For bulk operations like "tasks_added", task_id will be 0 if taskID > 0 { msg["task_id"] = taskID // Also include task_id in data for convenience if dataMap, ok := data.(map[string]interface{}); ok { dataMap["task_id"] = taskID } } // Only broadcast if client is connected if !s.isClientConnected(userID) { if s.verboseWSLogging { log.Printf("broadcastTaskUpdate: Client %d not connected, skipping broadcast for task %d (job %d)", userID, taskID, jobID) } // Still broadcast to old WebSocket connections for backwards compatibility } else { // Broadcast to client WebSocket if subscribed to job:{id} channel := fmt.Sprintf("job:%d", jobID) if s.verboseWSLogging { log.Printf("broadcastTaskUpdate: Broadcasting %s for task %d (job %d, user %d) on channel %s, data=%+v", updateType, taskID, jobID, userID, channel, data) } s.broadcastToClient(userID, channel, msg) } // Also broadcast to old WebSocket connection (for backwards compatibility during migration) key := fmt.Sprintf("%d:%d", userID, jobID) s.jobConnsMu.RLock() conn, exists := s.jobConns[key] s.jobConnsMu.RUnlock() if exists && conn != nil { s.jobConnsWriteMuMu.RLock() writeMu, hasMu := s.jobConnsWriteMu[key] s.jobConnsWriteMuMu.RUnlock() if hasMu && writeMu != nil { writeMu.Lock() conn.SetWriteDeadline(time.Now().Add(WSWriteDeadline)) conn.WriteJSON(msg) writeMu.Unlock() } else { conn.SetWriteDeadline(time.Now().Add(WSWriteDeadline)) conn.WriteJSON(msg) } } } // isClientConnected checks if a user has at least one active connection func (s *Manager) isClientConnected(userID int64) bool { s.clientConnsMu.RLock() defer s.clientConnsMu.RUnlock() for _, clientConn := range s.clientConns { if clientConn != nil && clientConn.UserID == userID { return true } } return false } // getClientConnections returns all connections for a specific user func (s *Manager) getClientConnections(userID int64) []*ClientConnection { s.clientConnsMu.RLock() defer s.clientConnsMu.RUnlock() var conns []*ClientConnection for _, clientConn := range s.clientConns { if clientConn != nil && clientConn.UserID == userID { conns = append(conns, clientConn) } } return conns } // broadcastToClient sends a message to all connections for a specific user func (s *Manager) broadcastToClient(userID int64, channel string, msg map[string]interface{}) { conns := s.getClientConnections(userID) if len(conns) == 0 { // Client not connected - this is normal, don't log it (only log at verbose level) if s.verboseWSLogging { log.Printf("broadcastToClient: Client %d not connected (channel: %s)", userID, channel) } return } // Add channel to message msg["channel"] = channel sentCount := 0 var deadConns []string for _, clientConn := range conns { // Check if client is subscribed to this channel (jobs channel is always sent) if channel != "jobs" { clientConn.SubsMu.RLock() subscribed := clientConn.Subscriptions[channel] clientConn.SubsMu.RUnlock() if !subscribed { continue } } clientConn.WriteMu.Lock() clientConn.Conn.SetWriteDeadline(time.Now().Add(WSWriteDeadline)) if err := clientConn.Conn.WriteJSON(msg); err != nil { // Mark connection for removal - don't spam logs, just remove dead connections deadConns = append(deadConns, clientConn.ConnID) } else { sentCount++ } clientConn.WriteMu.Unlock() } // Remove dead connections if len(deadConns) > 0 { s.clientConnsMu.Lock() for _, connID := range deadConns { if conn, exists := s.clientConns[connID]; exists { log.Printf("Removing dead connection %s for user %d (write failed)", connID, conn.UserID) conn.Conn.Close() delete(s.clientConns, connID) } } s.clientConnsMu.Unlock() } if s.verboseWSLogging { log.Printf("broadcastToClient: Sent to %d/%d connections for user %d on channel %s: type=%v", sentCount, len(conns), userID, channel, msg["type"]) } } // broadcastToAllClients sends a message to all connected clients (for jobs channel) func (s *Manager) broadcastToAllClients(channel string, msg map[string]interface{}) { msg["channel"] = channel s.clientConnsMu.RLock() clients := make([]*ClientConnection, 0, len(s.clientConns)) for _, clientConn := range s.clientConns { clients = append(clients, clientConn) } s.clientConnsMu.RUnlock() var deadConns []string for _, clientConn := range clients { clientConn.WriteMu.Lock() clientConn.Conn.SetWriteDeadline(time.Now().Add(WSWriteDeadline)) if err := clientConn.Conn.WriteJSON(msg); err != nil { deadConns = append(deadConns, clientConn.ConnID) } clientConn.WriteMu.Unlock() } // Remove dead connections if len(deadConns) > 0 { s.clientConnsMu.Lock() for _, connID := range deadConns { if conn, exists := s.clientConns[connID]; exists { log.Printf("Removing dead connection %s for user %d (write failed)", connID, conn.UserID) conn.Conn.Close() delete(s.clientConns, connID) } } s.clientConnsMu.Unlock() } } // broadcastUploadProgress broadcasts upload/processing progress to subscribed clients // This function updates the session synchronously (quick operation) but broadcasts // asynchronously to avoid blocking the upload handler on slow WebSocket writes. func (s *Manager) broadcastUploadProgress(sessionID string, progress float64, status, message string) { s.uploadSessionsMu.RLock() session, exists := s.uploadSessions[sessionID] s.uploadSessionsMu.RUnlock() if !exists { return } // Update session synchronously (quick operation - just updating struct fields) s.uploadSessionsMu.Lock() session.Progress = progress session.Status = status session.Message = message userID := session.UserID // Capture userID before releasing lock s.uploadSessionsMu.Unlock() // Broadcast asynchronously to avoid blocking upload handler on slow WebSocket writes // This prevents the entire HTTP server from freezing during large file uploads go func() { // Determine message type msgType := "upload_progress" if status != "uploading" { msgType = "processing_status" } msg := map[string]interface{}{ "type": msgType, "session_id": sessionID, "data": map[string]interface{}{ "progress": progress, "status": status, "message": message, }, "timestamp": time.Now().Unix(), } // Only broadcast if client is connected if s.isClientConnected(userID) { channel := fmt.Sprintf("upload:%s", sessionID) s.broadcastToClient(userID, channel, msg) } }() } // broadcastUploadProgressSync sends upload progress synchronously (for completion messages) // This ensures the message is sent immediately and not lost func (s *Manager) broadcastUploadProgressSync(userID int64, sessionID string, progress float64, status, message string) { // Update session synchronously s.uploadSessionsMu.Lock() if session, exists := s.uploadSessions[sessionID]; exists { session.Progress = progress session.Status = status session.Message = message } s.uploadSessionsMu.Unlock() // Determine message type msgType := "upload_progress" if status != "uploading" { msgType = "processing_status" } msg := map[string]interface{}{ "type": msgType, "session_id": sessionID, "data": map[string]interface{}{ "progress": progress, "status": status, "message": message, }, "timestamp": time.Now().Unix(), } // Send synchronously to ensure delivery if s.isClientConnected(userID) { channel := fmt.Sprintf("upload:%s", sessionID) s.broadcastToClient(userID, channel, msg) } } // truncateString truncates a string to a maximum length, appending "..." if truncated func truncateString(s string, maxLen int) string { if len(s) <= maxLen { return s } if maxLen <= 3 { return "..." } return s[:maxLen-3] + "..." }