diff --git a/cmd/runner/main.go b/cmd/runner/main.go index 36231c4..d51cb67 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -28,7 +28,6 @@ func main() { managerURL = flag.String("manager", getEnv("MANAGER_URL", "http://localhost:8080"), "Manager URL") name = flag.String("name", getEnv("RUNNER_NAME", ""), "Runner name") hostname = flag.String("hostname", getEnv("RUNNER_HOSTNAME", ""), "Runner hostname") - ipAddress = flag.String("ip", getEnv("RUNNER_IP", ""), "Runner IP address") token = flag.String("token", getEnv("REGISTRATION_TOKEN", ""), "Registration token") secretsFile = flag.String("secrets-file", getEnv("SECRETS_FILE", ""), "Path to secrets file for persistent storage (default: ./runner-secrets.json, or ./runner-secrets-{id}.json if multiple runners)") runnerIDSuffix = flag.String("runner-id", getEnv("RUNNER_ID", ""), "Unique runner ID suffix (auto-generated if not provided)") @@ -42,9 +41,6 @@ func main() { if *hostname == "" { *hostname, _ = os.Hostname() } - if *ipAddress == "" { - *ipAddress = "127.0.0.1" - } // Generate or use provided runner ID suffix runnerIDStr := *runnerIDSuffix @@ -65,7 +61,7 @@ func main() { sanitizedName := strings.ReplaceAll(*name, "/", "_") sanitizedName = strings.ReplaceAll(sanitizedName, "\\", "_") logFileName := fmt.Sprintf("runner-%s.log", sanitizedName) - + if err := logger.Init(*logDir, logFileName, *logMaxSize, *logMaxBackups, *logMaxAge); err != nil { log.Fatalf("Failed to initialize logger: %v", err) } @@ -87,7 +83,7 @@ func main() { } } - client := runner.NewClient(*managerURL, *name, *hostname, *ipAddress) + client := runner.NewClient(*managerURL, *name, *hostname) // Probe capabilities once at startup (before any registration attempts) log.Printf("Probing runner capabilities...") diff --git a/internal/api/admin.go b/internal/api/admin.go index fd97728..1fb1d6d 100644 --- a/internal/api/admin.go +++ b/internal/api/admin.go @@ -136,7 +136,7 @@ func (s *Server) handleDeleteRunner(w http.ResponseWriter, r *http.Request) { // handleListRunnersAdmin lists all runners with admin details func (s *Server) handleListRunnersAdmin(w http.ResponseWriter, r *http.Request) { rows, err := s.db.Query( - `SELECT id, name, hostname, ip_address, status, last_heartbeat, capabilities, + `SELECT id, name, hostname, status, last_heartbeat, capabilities, registration_token, verified, priority, created_at FROM runners ORDER BY created_at DESC`, ) @@ -153,7 +153,7 @@ func (s *Server) handleListRunnersAdmin(w http.ResponseWriter, r *http.Request) var verified bool err := rows.Scan( - &runner.ID, &runner.Name, &runner.Hostname, &runner.IPAddress, + &runner.ID, &runner.Name, &runner.Hostname, &runner.Status, &runner.LastHeartbeat, &runner.Capabilities, ®istrationToken, &verified, &runner.Priority, &runner.CreatedAt, ) @@ -166,7 +166,6 @@ func (s *Server) handleListRunnersAdmin(w http.ResponseWriter, r *http.Request) "id": runner.ID, "name": runner.Name, "hostname": runner.Hostname, - "ip_address": runner.IPAddress, "status": runner.Status, "last_heartbeat": runner.LastHeartbeat, "capabilities": runner.Capabilities, diff --git a/internal/api/jobs.go b/internal/api/jobs.go index 03c3915..737c90e 100644 --- a/internal/api/jobs.go +++ b/internal/api/jobs.go @@ -346,7 +346,7 @@ func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) { } // Immediately try to distribute tasks to connected runners - go s.distributeTasksToRunners() + s.triggerTaskDistribution() s.respondJSON(w, http.StatusCreated, job) } diff --git a/internal/api/runners.go b/internal/api/runners.go index 242daa5..3e2a92b 100644 --- a/internal/api/runners.go +++ b/internal/api/runners.go @@ -15,6 +15,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "jiggablend/pkg/types" @@ -144,7 +145,7 @@ func (s *Server) handleRegisterRunner(w http.ResponseWriter, r *http.Request) { registration_token, runner_secret, manager_secret, verified, priority) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id`, - req.Name, req.Hostname, req.IPAddress, types.RunnerStatusOnline, time.Now(), req.Capabilities, + req.Name, req.Hostname, "", types.RunnerStatusOnline, time.Now(), req.Capabilities, req.RegistrationToken, runnerSecret, managerSecret, true, priority, ).Scan(&runnerID) if err != nil { @@ -157,7 +158,6 @@ func (s *Server) handleRegisterRunner(w http.ResponseWriter, r *http.Request) { "id": runnerID, "name": req.Name, "hostname": req.Hostname, - "ip_address": req.IPAddress, "status": types.RunnerStatusOnline, "runner_secret": runnerSecret, "manager_secret": managerSecret, @@ -683,14 +683,25 @@ func (s *Server) handleRunnerWebSocket(w http.ResponseWriter, r *http.Request) { defer conn.Close() // Register connection (must be done before any distribution checks) + // Close old connection outside lock to avoid blocking + var oldConn *websocket.Conn s.runnerConnsMu.Lock() - // Remove old connection if exists - if oldConn, exists := s.runnerConns[runnerID]; exists { - oldConn.Close() + if existingConn, exists := s.runnerConns[runnerID]; exists { + oldConn = existingConn } s.runnerConns[runnerID] = conn s.runnerConnsMu.Unlock() + // Close old connection outside lock (if it existed) + if oldConn != nil { + oldConn.Close() + } + + // Create a write mutex for this connection + s.runnerConnsWriteMuMu.Lock() + s.runnerConnsWriteMu[runnerID] = &sync.Mutex{} + s.runnerConnsWriteMuMu.Unlock() + // Update runner status to online _, _ = s.db.Exec( `UPDATE runners SET status = ?, last_heartbeat = ? WHERE id = ?`, @@ -698,12 +709,8 @@ func (s *Server) handleRunnerWebSocket(w http.ResponseWriter, r *http.Request) { ) // Immediately try to distribute pending tasks to this newly connected runner - // Use a small delay to ensure connection registration is fully visible to other goroutines log.Printf("Runner %d connected, distributing pending tasks", runnerID) - go func() { - time.Sleep(50 * time.Millisecond) // Small delay to ensure map update is visible - s.distributeTasksToRunners() - }() + s.triggerTaskDistribution() // Note: We don't log to task logs here because we don't know which tasks will be assigned yet // Task assignment logging happens in distributeTasksToRunners @@ -713,6 +720,9 @@ func (s *Server) handleRunnerWebSocket(w http.ResponseWriter, r *http.Request) { s.runnerConnsMu.Lock() delete(s.runnerConns, runnerID) s.runnerConnsMu.Unlock() + s.runnerConnsWriteMuMu.Lock() + delete(s.runnerConnsWriteMu, runnerID) + s.runnerConnsWriteMuMu.Unlock() _, _ = s.db.Exec( `UPDATE runners SET status = ? WHERE id = ?`, types.RunnerStatusOffline, runnerID, @@ -743,15 +753,28 @@ func (s *Server) handleRunnerWebSocket(w http.ResponseWriter, r *http.Request) { defer ticker.Stop() for range ticker.C { s.runnerConnsMu.RLock() - conn, exists := s.runnerConns[runnerID] + currentConn, exists := s.runnerConns[runnerID] s.runnerConnsMu.RUnlock() - if !exists { + if !exists || currentConn != conn { + // Connection was replaced or removed + return + } + // Get write mutex for this connection + s.runnerConnsWriteMuMu.RLock() + writeMu, hasMu := s.runnerConnsWriteMu[runnerID] + s.runnerConnsWriteMuMu.RUnlock() + if !hasMu || writeMu == nil { return } // Send ping - runner should respond with pong automatically // Reset read deadline before sending ping to ensure we can receive pong conn.SetReadDeadline(time.Now().Add(90 * time.Second)) // Increased to 90 seconds - if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil { + writeMu.Lock() + err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)) + writeMu.Unlock() + if err != nil { + // Write failed - connection is likely dead, read loop will detect and cleanup + log.Printf("Failed to send ping to runner %d: %v", runnerID, err) return } } @@ -1191,7 +1214,7 @@ func (s *Server) updateJobStatusFromTasks(jobID int64) { // Update job status to ensure it's marked as running (has pending video task) s.updateJobStatusFromTasks(jobID) // Try to distribute the task immediately - go s.distributeTasksToRunners() + s.triggerTaskDistribution() } } else { log.Printf("Skipping video generation task creation for job %d (video task already exists)", jobID) @@ -1284,7 +1307,20 @@ func (s *Server) broadcastLogToFrontend(taskID int64, logEntry WSLogEntry) { } } +// triggerTaskDistribution triggers task distribution in a serialized manner +func (s *Server) triggerTaskDistribution() { + go func() { + // Try to acquire lock - if already running, skip + if !s.taskDistMu.TryLock() { + return // Distribution already in progress + } + defer s.taskDistMu.Unlock() + s.distributeTasksToRunners() + }() +} + // distributeTasksToRunners pushes available tasks to connected runners +// This function should only be called while holding taskDistMu lock func (s *Server) distributeTasksToRunners() { // Quick check: if there are no pending tasks, skip the expensive query var pendingCount int @@ -1677,21 +1713,25 @@ func (s *Server) distributeTasksToRunners() { continue } - err = tx.Commit() - if err != nil { - log.Printf("Failed to commit transaction for task %d: %v", task.TaskID, err) - continue - } - // Check if the update actually affected a row (task was successfully assigned) rowsAffected, err := result.RowsAffected() if err != nil { + tx.Rollback() log.Printf("Failed to get rows affected for task %d: %v", task.TaskID, err) continue } if rowsAffected == 0 { // Task was already assigned by another goroutine, skip + tx.Rollback() + continue + } + + // Commit the assignment before attempting WebSocket send + // If send fails, we'll rollback in a separate transaction + err = tx.Commit() + if err != nil { + log.Printf("Failed to commit transaction for task %d: %v", task.TaskID, err) continue } @@ -1702,27 +1742,41 @@ func (s *Server) distributeTasksToRunners() { "started_at": now, }) - // Task was successfully assigned, send via WebSocket + // Task was successfully assigned in database, now send via WebSocket log.Printf("Assigned task %d (type: %s, job: %d) to runner %d", task.TaskID, task.TaskType, task.JobID, selectedRunnerID) - // Update job status to running if this is the first task starting - s.updateJobStatusFromTasks(task.JobID) - // Log runner assignment to task logs s.logTaskEvent(task.TaskID, nil, types.LogLevelInfo, fmt.Sprintf("Task assigned to runner %d", selectedRunnerID), "") + // Attempt to send task to runner via WebSocket if err := s.assignTaskToRunner(selectedRunnerID, task.TaskID); err != nil { log.Printf("Failed to send task %d to runner %d: %v", task.TaskID, selectedRunnerID, err) // Log assignment failure s.logTaskEvent(task.TaskID, nil, types.LogLevelError, fmt.Sprintf("Failed to send task to runner %d: %v", selectedRunnerID, err), "") - // Rollback the assignment if WebSocket send fails - s.db.Exec( - `UPDATE tasks SET runner_id = NULL, status = ?, started_at = NULL - WHERE id = ?`, - types.TaskStatusPending, task.TaskID, - ) - // Log rollback - s.logTaskEvent(task.TaskID, nil, types.LogLevelWarn, fmt.Sprintf("Task assignment rolled back - runner %d connection failed", selectedRunnerID), "") + // Rollback the assignment if WebSocket send fails using a new transaction + rollbackTx, rollbackErr := s.db.Begin() + if rollbackErr == nil { + _, rollbackErr = rollbackTx.Exec( + `UPDATE tasks SET runner_id = NULL, status = ?, started_at = NULL + WHERE id = ? AND runner_id = ?`, + types.TaskStatusPending, task.TaskID, selectedRunnerID, + ) + if rollbackErr == nil { + rollbackTx.Commit() + // Log rollback + s.logTaskEvent(task.TaskID, nil, types.LogLevelWarn, fmt.Sprintf("Task assignment rolled back - runner %d connection failed", selectedRunnerID), "") + // Update job status after rollback + s.updateJobStatusFromTasks(task.JobID) + // Trigger redistribution + s.triggerTaskDistribution() + } else { + rollbackTx.Rollback() + log.Printf("Failed to rollback task %d assignment: %v", task.TaskID, rollbackErr) + } + } + } else { + // WebSocket send succeeded, update job status + s.updateJobStatusFromTasks(task.JobID) } } } @@ -1805,13 +1859,34 @@ func (s *Server) assignTaskToRunner(runnerID int64, taskID int64) error { return fmt.Errorf("task %d is not assigned to runner %d", taskID, runnerID) } - // Send task via WebSocket + // Send task via WebSocket with write mutex protection msg := WSMessage{ Type: "task_assignment", Timestamp: time.Now().Unix(), } msg.Data, _ = json.Marshal(task) - return conn.WriteJSON(msg) + + // Get write mutex for this connection + s.runnerConnsWriteMuMu.RLock() + writeMu, hasMu := s.runnerConnsWriteMu[runnerID] + s.runnerConnsWriteMuMu.RUnlock() + + if !hasMu || writeMu == nil { + return fmt.Errorf("runner %d write mutex not found", runnerID) + } + + // Re-check connection is still valid before writing + s.runnerConnsMu.RLock() + _, stillExists := s.runnerConns[runnerID] + s.runnerConnsMu.RUnlock() + if !stillExists { + return fmt.Errorf("runner %d disconnected", runnerID) + } + + writeMu.Lock() + err = conn.WriteJSON(msg) + writeMu.Unlock() + return err } // redistributeRunnerTasks resets tasks assigned to a disconnected/dead runner and redistributes them @@ -1883,7 +1958,7 @@ func (s *Server) redistributeRunnerTasks(runnerID int64) { } // Immediately redistribute the reset tasks - go s.distributeTasksToRunners() + s.triggerTaskDistribution() } // logTaskEvent logs an event to a task's log (manager-side logging) diff --git a/internal/api/server.go b/internal/api/server.go index 1232a2f..714b07d 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -38,6 +38,9 @@ type Server struct { wsUpgrader websocket.Upgrader runnerConns map[int64]*websocket.Conn runnerConnsMu sync.RWMutex + // Mutexes for each runner connection to serialize writes + runnerConnsWriteMu map[int64]*sync.Mutex + runnerConnsWriteMuMu sync.RWMutex frontendConns map[string]*websocket.Conn // key: "jobId:taskId" frontendConnsMu sync.RWMutex // Mutexes for each frontend connection to serialize writes @@ -55,6 +58,8 @@ type Server struct { // Throttling for progress updates (per job) progressUpdateTimes map[int64]time.Time // key: jobID progressUpdateTimesMu sync.RWMutex + // Task distribution serialization + taskDistMu sync.Mutex // Mutex to prevent concurrent distribution } // NewServer creates a new API server @@ -78,6 +83,7 @@ func NewServer(db *database.DB, auth *authpkg.Auth, storage *storage.Storage) (* WriteBufferSize: 1024, }, runnerConns: make(map[int64]*websocket.Conn), + runnerConnsWriteMu: make(map[int64]*sync.Mutex), frontendConns: make(map[string]*websocket.Conn), frontendConnsWriteMu: make(map[string]*sync.Mutex), jobListConns: make(map[int64]*websocket.Conn), @@ -611,7 +617,7 @@ func (s *Server) recoverStuckTasks() { go func() { for range distributeTicker.C { - s.distributeTasksToRunners() + s.triggerTaskDistribution() } }() @@ -675,7 +681,7 @@ func (s *Server) recoverStuckTasks() { s.recoverTaskTimeouts() // Distribute newly recovered tasks - s.distributeTasksToRunners() + s.triggerTaskDistribution() }() } } diff --git a/internal/runner/client.go b/internal/runner/client.go index 35bd9c2..dd27ff9 100644 --- a/internal/runner/client.go +++ b/internal/runner/client.go @@ -1,10 +1,10 @@ package runner import ( - _ "embed" "archive/tar" "bufio" "bytes" + _ "embed" "encoding/json" "errors" "fmt" @@ -33,7 +33,6 @@ type Client struct { managerURL string name string hostname string - ipAddress string httpClient *http.Client runnerID int64 runnerSecret string @@ -58,12 +57,11 @@ type Client struct { } // NewClient creates a new runner client -func NewClient(managerURL, name, hostname, ipAddress string) *Client { +func NewClient(managerURL, name, hostname string) *Client { return &Client{ managerURL: managerURL, name: name, hostname: hostname, - ipAddress: ipAddress, httpClient: &http.Client{Timeout: 30 * time.Second}, longRunningClient: &http.Client{Timeout: 0}, // No timeout for long-running operations (context downloads, file uploads/downloads) stopChan: make(chan struct{}), @@ -412,7 +410,6 @@ func (c *Client) Register(registrationToken string) (int64, string, string, erro req := map[string]interface{}{ "name": c.name, "hostname": c.hostname, - "ip_address": c.ipAddress, "capabilities": string(capabilitiesJSON), "registration_token": registrationToken, } @@ -983,8 +980,8 @@ func (c *Client) processTask(task map[string]interface{}, jobName string, output // Clean up expired cache entries periodically c.cleanupExpiredContextCache() - // Download context tar - contextPath := filepath.Join(workDir, "context.tar") + // Download context tar + contextPath := filepath.Join(workDir, "context.tar") if err := c.downloadJobContext(jobID, contextPath); err != nil { c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error()) return fmt.Errorf("failed to download context: %w", err) @@ -1091,19 +1088,19 @@ func (c *Client) processTask(task map[string]interface{}, jobName string, output // This script will override the blend file's settings based on job metadata formatFilePath := filepath.Join(workDir, "output_format.txt") renderSettingsFilePath := filepath.Join(workDir, "render_settings.json") - + // Check if unhide_objects is enabled unhideObjects := false if jobMetadata != nil && jobMetadata.UnhideObjects != nil && *jobMetadata.UnhideObjects { unhideObjects = true } - + // Build unhide code conditionally from embedded script unhideCode := "" if unhideObjects { unhideCode = scripts.UnhideObjects } - + // Load template and replace placeholders scriptContent := scripts.RenderBlenderTemplate scriptContent = strings.ReplaceAll(scriptContent, "{{UNHIDE_CODE}}", unhideCode) @@ -3016,8 +3013,8 @@ func (c *Client) processMetadataTask(task map[string]interface{}, jobID int64, i c.sendStepUpdate(taskID, "download", types.StepStatusRunning, "") c.sendLog(taskID, types.LogLevelInfo, "Downloading job context...", "download") - // Download context tar - contextPath := filepath.Join(workDir, "context.tar") + // Download context tar + contextPath := filepath.Join(workDir, "context.tar") if err := c.downloadJobContext(jobID, contextPath); err != nil { c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error()) return fmt.Errorf("failed to download context: %w", err) diff --git a/pkg/types/types.go b/pkg/types/types.go index 5b88782..f699b10 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -153,7 +153,7 @@ type UpdateJobProgressRequest struct { type RegisterRunnerRequest struct { Name string `json:"name"` Hostname string `json:"hostname"` - IPAddress string `json:"ip_address"` + IPAddress string `json:"ip_address,omitempty"` // Optional, extracted from request by manager Capabilities string `json:"capabilities"` Priority *int `json:"priority,omitempty"` // Optional, defaults to 100 if not provided } diff --git a/web/app.js b/web/app.js index c11b40d..e1569dc 100644 --- a/web/app.js +++ b/web/app.js @@ -241,7 +241,6 @@ function displayRunners(runners) {