diff --git a/Makefile b/Makefile index fc426f2..9a7a7e2 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ -.PHONY: build build-manager build-runner build-web run-manager run-runner run cleanup cleanup-manager cleanup-runner clean test +.PHONY: build build-manager build-runner build-web run-manager run-runner run cleanup cleanup-manager cleanup-runner kill-all clean test help -# Build all +# Build all components build: clean-bin build-manager build-runner # Build manager @@ -24,35 +24,91 @@ cleanup-manager: @rm -rf jiggablend-storage 2>/dev/null || true @echo "Manager cleanup complete" -# Cleanup runner (workspaces and secrets) +# Cleanup runner workspaces cleanup-runner: - @echo "Cleaning up runner workspaces and secrets..." - @rm -rf jiggablend-workspaces jiggablend-workspace* *workspace* runner-secrets*.json 2>/dev/null || true + @echo "Cleaning up runner workspaces..." + @rm -rf jiggablend-workspaces jiggablend-workspace* *workspace* 2>/dev/null || true @echo "Runner cleanup complete" # Cleanup both manager and runner cleanup: cleanup-manager cleanup-runner +# Kill all manager and runner processes +kill-all: + @echo "Killing all manager and runner processes..." + @# Kill manager processes (compiled binaries in bin/, root, and go run) + @-pkill -f "bin/manager" 2>/dev/null || true + @-pkill -f "\./manager" 2>/dev/null || true + @-pkill -f "manager" 2>/dev/null || true + @-pkill -f "main.*cmd/manager" 2>/dev/null || true + @-pkill -f "go run.*cmd/manager" 2>/dev/null || true + @# Kill runner processes (compiled binaries in bin/, root, and go run) + @-pkill -f "bin/runner" 2>/dev/null || true + @-pkill -f "\./runner" 2>/dev/null || true + @-pkill -f "runner" 2>/dev/null || true + @-pkill -f "main.*cmd/runner" 2>/dev/null || true + @-pkill -f "go run.*cmd/runner" 2>/dev/null || true + @# Wait a moment for graceful shutdown + @echo "Waiting for 5 seconds for graceful shutdown..." + @sleep 1 + @echo "5" + @sleep 1 + @echo "4" + @sleep 1 + @echo "3" + @sleep 1 + @echo "2" + @sleep 1 + @echo "1" + @sleep 1 + @echo "0" + + @# Check if any manager or runner processes are still running + @MANAGER_COUNT=$$(pgrep -f "bin/manager\|\./manager\|manager\|main.*cmd/manager\|go run.*cmd/manager" | wc -l); \ + RUNNER_COUNT=$$(pgrep -f "bin/runner\|\./runner\|runner\|main.*cmd/runner\|go run.*cmd/runner" | wc -l); \ + if [ $$MANAGER_COUNT -eq 0 ] && [ $$RUNNER_COUNT -eq 0 ]; then \ + echo "All manager and runner processes have shut down gracefully"; \ + exit 0; \ + else \ + echo "Some processes still running ($$MANAGER_COUNT managers, $$RUNNER_COUNT runners), proceeding with force kill..."; \ + fi + @# Force kill any remaining processes + @-pkill -9 -f "bin/manager" 2>/dev/null || true + @-pkill -9 -f "\./manager" 2>/dev/null || true + @-pkill -9 -f "main.*cmd/manager" 2>/dev/null || true + @-pkill -9 -f "go run.*cmd/manager" 2>/dev/null || true + @-pkill -9 -f "bin/runner" 2>/dev/null || true + @-pkill -9 -f "\./runner" 2>/dev/null || true + @-pkill -9 -f "main.*cmd/runner" 2>/dev/null || true + @-pkill -9 -f "go run.*cmd/runner" 2>/dev/null || true + @echo "All manager and runner processes killed after 5 seconds" + # Run all parallel run: cleanup-manager cleanup-runner build-manager build-runner @echo "Starting manager and runner in parallel..." @echo "Press Ctrl+C to stop both..." + @echo "Note: This will create a test API key for the runner to use" @trap 'kill $$MANAGER_PID $$RUNNER_PID 2>/dev/null; exit' INT TERM; \ - FIXED_REGISTRATION_TOKEN=test-token ENABLE_LOCAL_AUTH=true LOCAL_TEST_EMAIL=test@example.com LOCAL_TEST_PASSWORD=testpassword bin/manager & \ + FIXED_API_KEY=jk_r0_test_key_123456789012345678901234567890 ENABLE_LOCAL_AUTH=true LOCAL_TEST_EMAIL=test@example.com LOCAL_TEST_PASSWORD=testpassword bin/manager & \ MANAGER_PID=$$!; \ - REGISTRATION_TOKEN=test-token bin/runner & \ + sleep 2; \ + API_KEY=jk_r0_test_key_123456789012345678901234567890 bin/runner & \ RUNNER_PID=$$!; \ wait $$MANAGER_PID $$RUNNER_PID -# Run manager +# Run manager with test API key # Note: ENABLE_LOCAL_AUTH enables local user registration/login # LOCAL_TEST_EMAIL and LOCAL_TEST_PASSWORD create a test user on startup (if it doesn't exist) +# FIXED_API_KEY provides a pre-configured API key for testing (jk_r0_... format) +# The manager will accept this API key for runner registration run-manager: cleanup-manager build-manager - FIXED_REGISTRATION_TOKEN=test-token ENABLE_LOCAL_AUTH=true LOCAL_TEST_EMAIL=test@example.com LOCAL_TEST_PASSWORD=testpassword bin/manager + FIXED_API_KEY=jk_r0_test_key_123456789012345678901234567890 ENABLE_LOCAL_AUTH=true LOCAL_TEST_EMAIL=test@example.com LOCAL_TEST_PASSWORD=testpassword bin/manager -# Run runner +# Run runner with test API key +# Note: API_KEY must match what the manager accepts (see run-manager) +# The runner will use this API key for all authentication with the manager run-runner: cleanup-runner build-runner - REGISTRATION_TOKEN=test-token bin/runner + API_KEY=jk_r0_test_key_123456789012345678901234567890 bin/runner # Clean bin build artifacts clean-bin: @@ -66,3 +122,39 @@ clean-web: test: go test ./... -timeout 30s +# Show help +help: + @echo "Jiggablend Build and Run Makefile" + @echo "" + @echo "Build targets:" + @echo " build - Build manager, runner, and web UI" + @echo " build-manager - Build manager with web UI" + @echo " build-runner - Build runner binary" + @echo " build-web - Build web UI" + @echo "" + @echo "Run targets:" + @echo " run - Run manager and runner in parallel with test API key" + @echo " run-manager - Run manager with test API key enabled" + @echo " run-runner - Run runner with test API key" + @echo "" + @echo "Cleanup targets:" + @echo " cleanup - Clean manager and runner data" + @echo " cleanup-manager - Clean manager database and storage" + @echo " cleanup-runner - Clean runner workspaces and API keys" + @echo "" + @echo "Other targets:" + @echo " clean - Clean build artifacts" + @echo " kill-all - Kill all running manager and runner processes (binaries in bin/, root, or go run)" + @echo " test - Run Go tests" + @echo " help - Show this help" + @echo "" + @echo "API Key System:" + @echo " - FIXED_API_KEY: Pre-configured API key for manager (optional)" + @echo " - API_KEY: API key for runner authentication" + @echo " - Format: jk_r{N}_{32-char-hex}" + @echo " - Generate via admin UI or set FIXED_API_KEY for testing" + @echo "" + @echo "Timeouts:" + @echo " - Use 'timeout make run' to prevent hanging during testing" + @echo " - Example: timeout 30s make run" + diff --git a/cmd/runner/main.go b/cmd/runner/main.go index d51cb67..387e6d6 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -3,7 +3,6 @@ package main import ( "crypto/rand" "encoding/hex" - "encoding/json" "flag" "fmt" "log" @@ -17,36 +16,46 @@ import ( "jiggablend/internal/runner" ) -type SecretsFile struct { - RunnerID int64 `json:"runner_id"` - RunnerSecret string `json:"runner_secret"` - ManagerSecret string `json:"manager_secret"` -} +// Removed SecretsFile - runners now generate ephemeral instance IDs func main() { + log.Printf("Runner starting up...") + + // Create client early so we can clean it up on panic + var client *runner.Client + + defer func() { + if r := recover(); r != nil { + log.Printf("Runner panicked: %v", r) + // Clean up workspace even on panic + if client != nil { + client.CleanupWorkspace() + } + os.Exit(1) + } + }() + var ( - managerURL = flag.String("manager", getEnv("MANAGER_URL", "http://localhost:8080"), "Manager URL") - name = flag.String("name", getEnv("RUNNER_NAME", ""), "Runner name") - hostname = flag.String("hostname", getEnv("RUNNER_HOSTNAME", ""), "Runner hostname") - token = flag.String("token", getEnv("REGISTRATION_TOKEN", ""), "Registration token") - secretsFile = flag.String("secrets-file", getEnv("SECRETS_FILE", ""), "Path to secrets file for persistent storage (default: ./runner-secrets.json, or ./runner-secrets-{id}.json if multiple runners)") - runnerIDSuffix = flag.String("runner-id", getEnv("RUNNER_ID", ""), "Unique runner ID suffix (auto-generated if not provided)") - logDir = flag.String("log-dir", getEnv("LOG_DIR", "./logs"), "Log directory") - logMaxSize = flag.Int("log-max-size", getEnvInt("LOG_MAX_SIZE", 100), "Maximum log file size in MB before rotation") - logMaxBackups = flag.Int("log-max-backups", getEnvInt("LOG_MAX_BACKUPS", 5), "Maximum number of rotated log files to keep") - logMaxAge = flag.Int("log-max-age", getEnvInt("LOG_MAX_AGE", 30), "Maximum age in days for rotated log files") + managerURL = flag.String("manager", getEnv("MANAGER_URL", "http://localhost:8080"), "Manager URL") + name = flag.String("name", getEnv("RUNNER_NAME", ""), "Runner name") + hostname = flag.String("hostname", getEnv("RUNNER_HOSTNAME", ""), "Runner hostname") + apiKeyFlag = flag.String("api-key", getEnv("API_KEY", ""), "API key for authentication") + logDir = flag.String("log-dir", getEnv("LOG_DIR", "./logs"), "Log directory") + logMaxSize = flag.Int("log-max-size", getEnvInt("LOG_MAX_SIZE", 100), "Maximum log file size in MB before rotation") + logMaxBackups = flag.Int("log-max-backups", getEnvInt("LOG_MAX_BACKUPS", 5), "Maximum number of rotated log files to keep") + logMaxAge = flag.Int("log-max-age", getEnvInt("LOG_MAX_AGE", 30), "Maximum age in days for rotated log files") ) flag.Parse() + log.Printf("Flags parsed, hostname: %s", *hostname) if *hostname == "" { *hostname, _ = os.Hostname() } - // Generate or use provided runner ID suffix - runnerIDStr := *runnerIDSuffix - if runnerIDStr == "" { - runnerIDStr = generateShortID() - } + // Always generate a random runner ID suffix on startup + // This ensures every runner has a unique local identifier + runnerIDStr := generateShortID() + log.Printf("Generated runner ID suffix: %s", runnerIDStr) // Generate runner name with ID if not provided if *name == "" { @@ -70,20 +79,15 @@ func main() { l.Close() } }() + log.Printf("Logger initialized, continuing with startup...") log.Printf("Log rotation configured: max_size=%dMB, max_backups=%d, max_age=%d days", *logMaxSize, *logMaxBackups, *logMaxAge) - // Set default secrets file if not provided - always use current directory - if *secretsFile == "" { - if *runnerIDSuffix != "" || getEnv("RUNNER_ID", "") != "" { - // Multiple runners - use local file with ID - *secretsFile = fmt.Sprintf("./runner-secrets-%s.json", runnerIDStr) - } else { - // Single runner - use local file - *secretsFile = "./runner-secrets.json" - } - } + log.Printf("About to create client...") + client = runner.NewClient(*managerURL, *name, *hostname) + log.Printf("Client created successfully") - client := runner.NewClient(*managerURL, *name, *hostname) + // Clean up any orphaned workspace directories from previous runs + client.CleanupWorkspace() // Probe capabilities once at startup (before any registration attempts) log.Printf("Probing runner capabilities...") @@ -106,69 +110,44 @@ func main() { log.Printf("Warning: No capabilities detected") } - // Try to load secrets from file - var runnerID int64 - var runnerSecret, managerSecret string - if *secretsFile != "" { - if secrets, err := loadSecrets(*secretsFile); err == nil { - runnerID = secrets.RunnerID - runnerSecret = secrets.RunnerSecret - managerSecret = secrets.ManagerSecret - client.SetSecrets(runnerID, runnerSecret, managerSecret) - log.Printf("Loaded secrets from %s", *secretsFile) - } + // Register with API key (with retry logic) + if *apiKeyFlag == "" { + log.Fatalf("API key required (use --api-key or set API_KEY env var)") } - // If no secrets loaded, register with token (with retry logic) - if runnerID == 0 { - if *token == "" { - log.Fatalf("Registration token required (use --token or set REGISTRATION_TOKEN env var)") + // Retry registration with exponential backoff + backoff := 1 * time.Second + maxBackoff := 30 * time.Second + maxRetries := 10 + retryCount := 0 + + var runnerID int64 + + for { + var err error + runnerID, _, _, err = client.Register(*apiKeyFlag) + if err == nil { + log.Printf("Registered runner with ID: %d", runnerID) + break } - // Retry registration with exponential backoff - backoff := 1 * time.Second - maxBackoff := 30 * time.Second - maxRetries := 10 - retryCount := 0 + // Check if it's a token error (invalid/expired/used token) - shutdown immediately + errMsg := err.Error() + if strings.Contains(errMsg, "token error:") { + log.Fatalf("Registration failed (token error): %v", err) + } - for { - var err error - runnerID, runnerSecret, managerSecret, err = client.Register(*token) - if err == nil { - log.Printf("Registered runner with ID: %d", runnerID) + // Only retry on connection errors or other retryable errors + retryCount++ + if retryCount >= maxRetries { + log.Fatalf("Failed to register runner after %d attempts: %v", maxRetries, err) + } - // Always save secrets to file (secretsFile is now always set to a default if not provided) - secrets := SecretsFile{ - RunnerID: runnerID, - RunnerSecret: runnerSecret, - ManagerSecret: managerSecret, - } - if err := saveSecrets(*secretsFile, secrets); err != nil { - log.Printf("Warning: Failed to save secrets to %s: %v", *secretsFile, err) - } else { - log.Printf("Saved secrets to %s", *secretsFile) - } - break - } - - // Check if it's a token error (invalid/expired/used token) - shutdown immediately - errMsg := err.Error() - if strings.Contains(errMsg, "token error:") { - log.Fatalf("Registration failed (token error): %v", err) - } - - // Only retry on connection errors or other retryable errors - retryCount++ - if retryCount >= maxRetries { - log.Fatalf("Failed to register runner after %d attempts: %v", maxRetries, err) - } - - log.Printf("Registration failed (attempt %d/%d): %v, retrying in %v", retryCount, maxRetries, err, backoff) - time.Sleep(backoff) - backoff *= 2 - if backoff > maxBackoff { - backoff = maxBackoff - } + log.Printf("Registration failed (attempt %d/%d): %v, retrying in %v", retryCount, maxRetries, err, backoff) + time.Sleep(backoff) + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff } } @@ -188,8 +167,10 @@ func main() { go func() { sig := <-sigChan - log.Printf("Received signal: %v, killing all processes and shutting down...", sig) + log.Printf("Received signal: %v, killing all processes and cleaning up...", sig) client.KillAllProcesses() + // Cleanup happens in defer, but also do it here for good measure + client.CleanupWorkspace() os.Exit(0) }() @@ -197,28 +178,6 @@ func main() { select {} } -func loadSecrets(path string) (*SecretsFile, error) { - data, err := os.ReadFile(path) - if err != nil { - return nil, err - } - - var secrets SecretsFile - if err := json.Unmarshal(data, &secrets); err != nil { - return nil, err - } - - return &secrets, nil -} - -func saveSecrets(path string, secrets SecretsFile) error { - data, err := json.MarshalIndent(secrets, "", " ") - if err != nil { - return err - } - - return os.WriteFile(path, data, 0600) -} func getEnv(key, defaultValue string) string { if value := os.Getenv(key); value != "" { diff --git a/internal/api/admin.go b/internal/api/admin.go index 1fb1d6d..2106d38 100644 --- a/internal/api/admin.go +++ b/internal/api/admin.go @@ -10,75 +10,115 @@ import ( "jiggablend/pkg/types" ) -// handleGenerateRegistrationToken generates a new registration token -func (s *Server) handleGenerateRegistrationToken(w http.ResponseWriter, r *http.Request) { +// handleGenerateRunnerAPIKey generates a new runner API key +func (s *Server) handleGenerateRunnerAPIKey(w http.ResponseWriter, r *http.Request) { userID, err := getUserID(r) if err != nil { s.respondError(w, http.StatusUnauthorized, err.Error()) return } - // Default expiration: 24 hours - expiresIn := 24 * time.Hour - var req struct { - ExpiresInHours int `json:"expires_in_hours,omitempty"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Scope string `json:"scope,omitempty"` // 'manager' or 'user' } - if r.Body != nil && r.ContentLength > 0 { - if err := json.NewDecoder(r.Body).Decode(&req); err == nil { - if req.ExpiresInHours == 0 { - // 0 hours means infinite expiration - expiresIn = 0 - } else if req.ExpiresInHours > 0 { - expiresIn = time.Duration(req.ExpiresInHours) * time.Hour - } - } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) + return } - token, err := s.secrets.GenerateRegistrationToken(userID, expiresIn) + if req.Name == "" { + s.respondError(w, http.StatusBadRequest, "API key name is required") + return + } + + // Default scope to 'user' if not specified + scope := req.Scope + if scope == "" { + scope = "user" + } + if scope != "manager" && scope != "user" { + s.respondError(w, http.StatusBadRequest, "Scope must be 'manager' or 'user'") + return + } + + keyInfo, err := s.secrets.GenerateRunnerAPIKey(userID, req.Name, req.Description, scope) if err != nil { - s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to generate token: %v", err)) + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to generate API key: %v", err)) return } response := map[string]interface{}{ - "token": token, - } - if expiresIn == 0 { - response["expires_in"] = "infinite" - response["expires_at"] = nil - } else { - response["expires_in"] = expiresIn.String() - response["expires_at"] = time.Now().Add(expiresIn) + "id": keyInfo.ID, + "key": keyInfo.Key, + "name": keyInfo.Name, + "description": keyInfo.Description, + "is_active": keyInfo.IsActive, + "created_at": keyInfo.CreatedAt, } + s.respondJSON(w, http.StatusCreated, response) } -// handleListRegistrationTokens lists all registration tokens -func (s *Server) handleListRegistrationTokens(w http.ResponseWriter, r *http.Request) { - tokens, err := s.secrets.ListRegistrationTokens() +// handleListRunnerAPIKeys lists all runner API keys +func (s *Server) handleListRunnerAPIKeys(w http.ResponseWriter, r *http.Request) { + keys, err := s.secrets.ListRunnerAPIKeys() if err != nil { - s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to list tokens: %v", err)) + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to list API keys: %v", err)) return } - s.respondJSON(w, http.StatusOK, tokens) + // Convert to response format (hide sensitive hash data) + var response []map[string]interface{} + for _, key := range keys { + item := map[string]interface{}{ + "id": key.ID, + "key_prefix": key.Key, // Only show prefix, not full key + "name": key.Name, + "is_active": key.IsActive, + "created_at": key.CreatedAt, + "created_by": key.CreatedBy, + } + if key.Description != nil { + item["description"] = *key.Description + } + response = append(response, item) + } + + s.respondJSON(w, http.StatusOK, response) } -// handleRevokeRegistrationToken revokes a registration token -func (s *Server) handleRevokeRegistrationToken(w http.ResponseWriter, r *http.Request) { - tokenID, err := parseID(r, "id") +// handleRevokeRunnerAPIKey revokes a runner API key +func (s *Server) handleRevokeRunnerAPIKey(w http.ResponseWriter, r *http.Request) { + keyID, err := parseID(r, "id") if err != nil { s.respondError(w, http.StatusBadRequest, err.Error()) return } - if err := s.secrets.RevokeRegistrationToken(tokenID); err != nil { - s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to revoke token: %v", err)) + if err := s.secrets.RevokeRunnerAPIKey(keyID); err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to revoke API key: %v", err)) return } - s.respondJSON(w, http.StatusOK, map[string]string{"message": "Token revoked"}) + s.respondJSON(w, http.StatusOK, map[string]string{"message": "API key revoked"}) +} + +// handleDeleteRunnerAPIKey deletes a runner API key +func (s *Server) handleDeleteRunnerAPIKey(w http.ResponseWriter, r *http.Request) { + keyID, err := parseID(r, "id") + if err != nil { + s.respondError(w, http.StatusBadRequest, err.Error()) + return + } + + if err := s.secrets.DeleteRunnerAPIKey(keyID); err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to delete API key: %v", err)) + return + } + + s.respondJSON(w, http.StatusOK, map[string]string{"message": "API key deleted"}) } // handleVerifyRunner manually verifies a runner @@ -136,8 +176,8 @@ func (s *Server) handleDeleteRunner(w http.ResponseWriter, r *http.Request) { // handleListRunnersAdmin lists all runners with admin details func (s *Server) handleListRunnersAdmin(w http.ResponseWriter, r *http.Request) { rows, err := s.db.Query( - `SELECT id, name, hostname, status, last_heartbeat, capabilities, - registration_token, verified, priority, created_at + `SELECT id, name, hostname, status, last_heartbeat, capabilities, + api_key_id, api_key_scope, priority, created_at FROM runners ORDER BY created_at DESC`, ) if err != nil { @@ -149,13 +189,13 @@ func (s *Server) handleListRunnersAdmin(w http.ResponseWriter, r *http.Request) runners := []map[string]interface{}{} for rows.Next() { var runner types.Runner - var registrationToken sql.NullString - var verified bool + var apiKeyID sql.NullInt64 + var apiKeyScope string err := rows.Scan( &runner.ID, &runner.Name, &runner.Hostname, &runner.Status, &runner.LastHeartbeat, &runner.Capabilities, - ®istrationToken, &verified, &runner.Priority, &runner.CreatedAt, + &apiKeyID, &apiKeyScope, &runner.Priority, &runner.CreatedAt, ) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to scan runner: %v", err)) @@ -163,16 +203,16 @@ func (s *Server) handleListRunnersAdmin(w http.ResponseWriter, r *http.Request) } runners = append(runners, map[string]interface{}{ - "id": runner.ID, - "name": runner.Name, - "hostname": runner.Hostname, - "status": runner.Status, - "last_heartbeat": runner.LastHeartbeat, - "capabilities": runner.Capabilities, - "registration_token": registrationToken.String, - "verified": verified, - "priority": runner.Priority, - "created_at": runner.CreatedAt, + "id": runner.ID, + "name": runner.Name, + "hostname": runner.Hostname, + "status": runner.Status, + "last_heartbeat": runner.LastHeartbeat, + "capabilities": runner.Capabilities, + "api_key_id": apiKeyID.Int64, + "api_key_scope": apiKeyScope, + "priority": runner.Priority, + "created_at": runner.CreatedAt, }) } @@ -335,7 +375,7 @@ func (s *Server) handleSetRegistrationEnabled(w http.ResponseWriter, r *http.Req Enabled bool `json:"enabled"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid request body") + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } @@ -359,7 +399,7 @@ func (s *Server) handleSetUserAdminStatus(w http.ResponseWriter, r *http.Request IsAdmin bool `json:"is_admin"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid request body") + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } diff --git a/internal/api/jobs.go b/internal/api/jobs.go index 737c90e..43d8fb4 100644 --- a/internal/api/jobs.go +++ b/internal/api/jobs.go @@ -25,7 +25,6 @@ import ( authpkg "jiggablend/internal/auth" "jiggablend/pkg/types" - "github.com/go-chi/chi/v5" "github.com/gorilla/websocket" "jiggablend/pkg/scripts" @@ -62,7 +61,7 @@ func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) { var req types.CreateJobRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid request body") + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } @@ -83,7 +82,7 @@ func (s *Server) handleCreateJob(w http.ResponseWriter, r *http.Request) { s.respondError(w, http.StatusBadRequest, "frame_start and frame_end are required for render jobs") return } - if *req.FrameStart < 0 || *req.FrameEnd < *req.FrameStart { + if *req.FrameEnd < *req.FrameStart { s.respondError(w, http.StatusBadRequest, "Invalid frame range") return } @@ -671,7 +670,7 @@ func (s *Server) handleBatchGetJobs(w http.ResponseWriter, r *http.Request) { JobIDs []int64 `json:"job_ids"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid request body") + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } @@ -1677,7 +1676,7 @@ func (s *Server) extractMetadataFromTempContext(contextPath string) (*types.Blen }) if err != nil || blendFile == "" { - return nil, fmt.Errorf("no .blend file found in context") + return nil, fmt.Errorf("no .blend file found in context - the uploaded context archive must contain at least one .blend file to render") } // Use the same extraction script and process as extractMetadataFromContext @@ -1894,7 +1893,7 @@ func (s *Server) createContextFromDir(sourceDir, destPath string, excludeFiles . } if blendFilesAtRoot == 0 { - return "", fmt.Errorf("no .blend file found at root level in context archive") + return "", fmt.Errorf("no .blend file found at root level in context archive - .blend files must be at the root level of the uploaded archive, not in subdirectories") } if blendFilesAtRoot > 1 { return "", fmt.Errorf("multiple .blend files found at root level in context archive (found %d, expected 1)", blendFilesAtRoot) @@ -2958,7 +2957,7 @@ func (s *Server) handleBatchGetTasks(w http.ResponseWriter, r *http.Request) { TaskIDs []int64 `json:"task_ids"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid request body") + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } @@ -3057,10 +3056,9 @@ func (s *Server) handleGetTaskLogs(w http.ResponseWriter, r *http.Request) { return } - taskIDStr := chi.URLParam(r, "taskId") - taskID, err := strconv.ParseInt(taskIDStr, 10, 64) + taskID, err := parseID(r, "taskId") if err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid task ID") + s.respondError(w, http.StatusBadRequest, err.Error()) return } @@ -3196,10 +3194,9 @@ func (s *Server) handleGetTaskSteps(w http.ResponseWriter, r *http.Request) { return } - taskIDStr := chi.URLParam(r, "taskId") - taskID, err := strconv.ParseInt(taskIDStr, 10, 64) + taskID, err := parseID(r, "taskId") if err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid task ID") + s.respondError(w, http.StatusBadRequest, err.Error()) return } @@ -3304,10 +3301,9 @@ func (s *Server) handleRetryTask(w http.ResponseWriter, r *http.Request) { return } - taskIDStr := chi.URLParam(r, "taskId") - taskID, err := strconv.ParseInt(taskIDStr, 10, 64) + taskID, err := parseID(r, "taskId") if err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid task ID") + s.respondError(w, http.StatusBadRequest, err.Error()) return } @@ -3396,10 +3392,9 @@ func (s *Server) handleStreamTaskLogsWebSocket(w http.ResponseWriter, r *http.Re return } - taskIDStr := chi.URLParam(r, "taskId") - taskID, err := strconv.ParseInt(taskIDStr, 10, 64) + taskID, err := parseID(r, "taskId") if err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid task ID") + s.respondError(w, http.StatusBadRequest, err.Error()) return } diff --git a/internal/api/metadata.go b/internal/api/metadata.go index b126723..44c72f8 100644 --- a/internal/api/metadata.go +++ b/internal/api/metadata.go @@ -38,7 +38,7 @@ func (s *Server) handleSubmitMetadata(w http.ResponseWriter, r *http.Request) { var metadata types.BlendMetadata if err := json.NewDecoder(r.Body).Decode(&metadata); err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid metadata JSON") + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid metadata JSON: %v", err)) return } @@ -230,7 +230,7 @@ func (s *Server) extractMetadataFromContext(jobID int64) (*types.BlendMetadata, } if blendFile == "" { - return nil, fmt.Errorf("no .blend file found in context") + return nil, fmt.Errorf("no .blend file found in context - the uploaded context archive must contain at least one .blend file for metadata extraction") } // Use embedded Python script diff --git a/internal/api/runners.go b/internal/api/runners.go index 3e2a92b..48daeed 100644 --- a/internal/api/runners.go +++ b/internal/api/runners.go @@ -28,41 +28,75 @@ type contextKey string const runnerIDContextKey contextKey = "runner_id" -// runnerAuthMiddleware verifies runner requests using shared secret header +// runnerAuthMiddleware verifies runner requests using API key func (s *Server) runnerAuthMiddleware(next http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - // Get runner ID from query string + // Get API key from header + apiKey := r.Header.Get("Authorization") + if apiKey == "" { + // Try alternative header + apiKey = r.Header.Get("X-API-Key") + } + if apiKey == "" { + s.respondError(w, http.StatusUnauthorized, "API key required") + return + } + + // Remove "Bearer " prefix if present + if strings.HasPrefix(apiKey, "Bearer ") { + apiKey = strings.TrimPrefix(apiKey, "Bearer ") + } + + // Validate API key and get its ID + apiKeyID, _, err := s.secrets.ValidateRunnerAPIKey(apiKey) + if err != nil { + log.Printf("API key validation failed: %v", err) + s.respondError(w, http.StatusUnauthorized, "invalid API key") + return + } + + // Get runner ID from query string or find runner by API key runnerIDStr := r.URL.Query().Get("runner_id") - if runnerIDStr == "" { - s.respondError(w, http.StatusBadRequest, "runner_id required in query string") - return - } - var runnerID int64 - _, err := fmt.Sscanf(runnerIDStr, "%d", &runnerID) - if err != nil { - s.respondError(w, http.StatusBadRequest, "invalid runner_id") - return - } - // Get runner secret - runnerSecret, err := s.secrets.GetRunnerSecret(runnerID) - if err != nil { - log.Printf("Failed to get runner secret for runner %d: %v", runnerID, err) - s.respondError(w, http.StatusUnauthorized, "runner not found or not verified") - return - } + if runnerIDStr != "" { + // Runner ID provided - verify it belongs to this API key + _, err := fmt.Sscanf(runnerIDStr, "%d", &runnerID) + if err != nil { + s.respondError(w, http.StatusBadRequest, "invalid runner_id") + return + } - // Verify shared secret from header - providedSecret := r.Header.Get("X-Runner-Secret") - if providedSecret == "" { - s.respondError(w, http.StatusUnauthorized, "missing secret") - return - } - - if providedSecret != runnerSecret { - s.respondError(w, http.StatusUnauthorized, "invalid secret") - return + // For fixed API keys, skip database verification + if apiKeyID != -1 { + // Verify runner exists and uses this API key + var dbAPIKeyID sql.NullInt64 + err = s.db.QueryRow("SELECT api_key_id FROM runners WHERE id = ?", runnerID).Scan(&dbAPIKeyID) + if err == sql.ErrNoRows { + s.respondError(w, http.StatusNotFound, "runner not found") + return + } + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to query runner API key: %v", err)) + return + } + if !dbAPIKeyID.Valid || dbAPIKeyID.Int64 != apiKeyID { + s.respondError(w, http.StatusForbidden, "runner does not belong to this API key") + return + } + } + } else { + // No runner ID provided - find the runner for this API key + // For simplicity, assume each API key has one runner + err = s.db.QueryRow("SELECT id FROM runners WHERE api_key_id = ?", apiKeyID).Scan(&runnerID) + if err == sql.ErrNoRows { + s.respondError(w, http.StatusNotFound, "no runner found for this API key") + return + } + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to query runner by API key: %v", err)) + return + } } // Add runner ID to context @@ -72,66 +106,40 @@ func (s *Server) runnerAuthMiddleware(next http.HandlerFunc) http.HandlerFunc { } } -// handleRegisterRunner registers a new runner -// Note: Token expiration only affects whether the token can be used for registration. -// Once a runner is registered, it receives its own runner_secret and manager_secret -// and operates independently. The token expiration does not affect registered runners. +// handleRegisterRunner registers a new runner using an API key func (s *Server) handleRegisterRunner(w http.ResponseWriter, r *http.Request) { var req struct { types.RegisterRunnerRequest - RegistrationToken string `json:"registration_token"` + APIKey string `json:"api_key"` + Fingerprint string `json:"fingerprint,omitempty"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid request body") + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } + // Lock to prevent concurrent registrations that could create duplicate runners + s.secrets.RegistrationMu.Lock() + defer s.secrets.RegistrationMu.Unlock() + if req.Name == "" { s.respondError(w, http.StatusBadRequest, "Runner name is required") return } - if req.RegistrationToken == "" { - s.respondError(w, http.StatusBadRequest, "Registration token is required") + if req.APIKey == "" { + s.respondError(w, http.StatusBadRequest, "API key is required") return } - // Validate registration token (expiration only affects token usability, not registered runners) - result, err := s.secrets.ValidateRegistrationTokenDetailed(req.RegistrationToken) + // Validate API key + apiKeyID, apiKeyScope, err := s.secrets.ValidateRunnerAPIKey(req.APIKey) if err != nil { - s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to validate token: %v", err)) - return - } - if !result.Valid { - var errorMsg string - switch result.Reason { - case "already_used": - errorMsg = "Registration token has already been used" - case "expired": - errorMsg = "Registration token has expired" - case "not_found": - errorMsg = "Invalid registration token" - default: - errorMsg = "Invalid or expired registration token" - } - s.respondError(w, http.StatusUnauthorized, errorMsg) - return - } - - // Get manager secret - managerSecret, err := s.secrets.GetManagerSecret() - if err != nil { - s.respondError(w, http.StatusInternalServerError, "Failed to get manager secret") - return - } - - // Generate runner secret (runner will use this for all future authentication, independent of token) - runnerSecret, err := s.secrets.GenerateRunnerSecret() - if err != nil { - s.respondError(w, http.StatusInternalServerError, "Failed to generate runner secret") + s.respondError(w, http.StatusUnauthorized, fmt.Sprintf("Invalid API key: %v", err)) return } + // For fixed API keys (keyID = -1), skip fingerprint checking // Set default priority if not provided priority := 100 if req.Priority != nil { @@ -140,28 +148,110 @@ func (s *Server) handleRegisterRunner(w http.ResponseWriter, r *http.Request) { // Register runner var runnerID int64 + // For fixed API keys, don't store api_key_id in database + var dbAPIKeyID interface{} + if apiKeyID == -1 { + dbAPIKeyID = nil // NULL for fixed API keys + } else { + dbAPIKeyID = apiKeyID + } + + // Determine fingerprint value + fingerprint := req.Fingerprint + if apiKeyID == -1 || fingerprint == "" { + // For fixed API keys or when no fingerprint provided, generate a unique fingerprint + // to avoid conflicts while still maintaining some uniqueness + fingerprint = fmt.Sprintf("fixed-%s-%d", req.Name, time.Now().UnixNano()) + } + + // Check fingerprint uniqueness only for non-fixed API keys + if apiKeyID != -1 && req.Fingerprint != "" { + var existingRunnerID int64 + var existingAPIKeyID sql.NullInt64 + err = s.db.QueryRow( + "SELECT id, api_key_id FROM runners WHERE fingerprint = ?", + req.Fingerprint, + ).Scan(&existingRunnerID, &existingAPIKeyID) + + if err == nil { + // Runner already exists with this fingerprint + if existingAPIKeyID.Valid && existingAPIKeyID.Int64 == apiKeyID { + // Same API key - update and return existing runner + log.Printf("Runner with fingerprint %s already exists (ID: %d), updating info", req.Fingerprint, existingRunnerID) + + _, err = s.db.Exec( + `UPDATE runners SET name = ?, hostname = ?, capabilities = ?, status = ?, last_heartbeat = ? WHERE id = ?`, + req.Name, req.Hostname, req.Capabilities, types.RunnerStatusOnline, time.Now(), existingRunnerID, + ) + if err != nil { + log.Printf("Warning: Failed to update existing runner info: %v", err) + } + + s.respondJSON(w, http.StatusOK, map[string]interface{}{ + "id": existingRunnerID, + "name": req.Name, + "hostname": req.Hostname, + "status": types.RunnerStatusOnline, + "reused": true, // Indicates this was a re-registration + }) + return + } else { + // Different API key - reject registration + s.respondError(w, http.StatusConflict, "Runner with this fingerprint already registered with different API key") + return + } + } + // If err is not nil, it means no existing runner with this fingerprint - proceed with new registration + } + + // Insert runner err = s.db.QueryRow( - `INSERT INTO runners (name, hostname, ip_address, status, last_heartbeat, capabilities, - registration_token, runner_secret, manager_secret, verified, priority) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `INSERT INTO runners (name, hostname, ip_address, status, last_heartbeat, capabilities, + api_key_id, api_key_scope, priority, fingerprint) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id`, req.Name, req.Hostname, "", types.RunnerStatusOnline, time.Now(), req.Capabilities, - req.RegistrationToken, runnerSecret, managerSecret, true, priority, + dbAPIKeyID, apiKeyScope, priority, fingerprint, ).Scan(&runnerID) if err != nil { s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to register runner: %v", err)) return } - // Return runner info with secrets + log.Printf("Registered new runner %s (ID: %d) with API key ID: %d", req.Name, runnerID, apiKeyID) + + // Return runner info s.respondJSON(w, http.StatusCreated, map[string]interface{}{ - "id": runnerID, - "name": req.Name, - "hostname": req.Hostname, - "status": types.RunnerStatusOnline, - "runner_secret": runnerSecret, - "manager_secret": managerSecret, - "verified": true, + "id": runnerID, + "name": req.Name, + "hostname": req.Hostname, + "status": types.RunnerStatusOnline, + }) +} + +// handleRunnerPing allows runners to validate their secrets and connection +func (s *Server) handleRunnerPing(w http.ResponseWriter, r *http.Request) { + // This endpoint uses runnerAuthMiddleware, so if we get here, secrets are valid + // Get runner ID from context (set by runnerAuthMiddleware) + runnerID, ok := r.Context().Value(runnerIDContextKey).(int64) + if !ok { + s.respondError(w, http.StatusUnauthorized, "runner_id not found in context") + return + } + + // Update last heartbeat + _, err := s.db.Exec( + `UPDATE runners SET last_heartbeat = ?, status = ? WHERE id = ?`, + time.Now(), types.RunnerStatusOnline, runnerID, + ) + if err != nil { + log.Printf("Warning: Failed to update runner heartbeat: %v", err) + } + + s.respondJSON(w, http.StatusOK, map[string]interface{}{ + "status": "ok", + "runner_id": runnerID, + "timestamp": time.Now().Unix(), }) } @@ -177,7 +267,7 @@ func (s *Server) handleUpdateTaskProgress(w http.ResponseWriter, r *http.Request Progress float64 `json:"progress"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid request body") + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } @@ -207,7 +297,7 @@ func (s *Server) handleUpdateTaskStep(w http.ResponseWriter, r *http.Request) { ErrorMessage string `json:"error_message,omitempty"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid request body") + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } @@ -354,7 +444,7 @@ func (s *Server) handleUploadFileFromRunner(w http.ResponseWriter, r *http.Reque err = r.ParseMultipartForm(50 << 30) // 50 GB (for large output files) if err != nil { - s.respondError(w, http.StatusBadRequest, "Failed to parse form") + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Failed to parse multipart form: %v", err)) return } @@ -530,7 +620,7 @@ func (s *Server) handleGetJobMetadataForRunner(w http.ResponseWriter, r *http.Re var metadata types.BlendMetadata if err := json.Unmarshal([]byte(blendMetadataJSON.String), &metadata); err != nil { - s.respondError(w, http.StatusInternalServerError, "Failed to parse metadata") + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to parse metadata JSON: %v", err)) return } @@ -645,33 +735,66 @@ type WSTaskUpdate struct { // handleRunnerWebSocket handles WebSocket connections from runners func (s *Server) handleRunnerWebSocket(w http.ResponseWriter, r *http.Request) { - // Get runner ID and secret from query params + // Get API key from query params or headers + apiKey := r.URL.Query().Get("api_key") + if apiKey == "" { + apiKey = r.Header.Get("Authorization") + if strings.HasPrefix(apiKey, "Bearer ") { + apiKey = strings.TrimPrefix(apiKey, "Bearer ") + } + } + if apiKey == "" { + s.respondError(w, http.StatusBadRequest, "API key required") + return + } + + // Validate API key + apiKeyID, _, err := s.secrets.ValidateRunnerAPIKey(apiKey) + if err != nil { + s.respondError(w, http.StatusUnauthorized, fmt.Sprintf("Invalid API key: %v", err)) + return + } + + // Get runner ID from query params or find by API key runnerIDStr := r.URL.Query().Get("runner_id") - providedSecret := r.URL.Query().Get("secret") - - if runnerIDStr == "" || providedSecret == "" { - s.respondError(w, http.StatusBadRequest, "runner_id and secret required") - return - } - var runnerID int64 - _, err := fmt.Sscanf(runnerIDStr, "%d", &runnerID) - if err != nil { - s.respondError(w, http.StatusBadRequest, "invalid runner_id") - return - } - // Get runner secret - runnerSecret, err := s.secrets.GetRunnerSecret(runnerID) - if err != nil { - s.respondError(w, http.StatusUnauthorized, "runner not found or not verified") - return - } + if runnerIDStr != "" { + // Runner ID provided - verify it belongs to this API key + _, err := fmt.Sscanf(runnerIDStr, "%d", &runnerID) + if err != nil { + s.respondError(w, http.StatusBadRequest, "invalid runner_id") + return + } - // Verify shared secret - if providedSecret != runnerSecret { - s.respondError(w, http.StatusUnauthorized, "invalid secret") - return + // For fixed API keys, skip database verification + if apiKeyID != -1 { + var dbAPIKeyID sql.NullInt64 + err = s.db.QueryRow("SELECT api_key_id FROM runners WHERE id = ?", runnerID).Scan(&dbAPIKeyID) + if err == sql.ErrNoRows { + s.respondError(w, http.StatusNotFound, "runner not found") + return + } + if err != nil { + s.respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to query runner API key: %v", err)) + return + } + if !dbAPIKeyID.Valid || dbAPIKeyID.Int64 != apiKeyID { + s.respondError(w, http.StatusForbidden, "runner does not belong to this API key") + return + } + } + } else { + // No runner ID provided - find the runner for this API key + err = s.db.QueryRow("SELECT id FROM runners WHERE api_key_id = ?", apiKeyID).Scan(&runnerID) + if err == sql.ErrNoRows { + s.respondError(w, http.StatusNotFound, "no runner found for this API key") + return + } + if err != nil { + s.respondError(w, http.StatusInternalServerError, "database error") + return + } } // Upgrade to WebSocket @@ -685,18 +808,25 @@ func (s *Server) handleRunnerWebSocket(w http.ResponseWriter, r *http.Request) { // Register connection (must be done before any distribution checks) // Close old connection outside lock to avoid blocking var oldConn *websocket.Conn + var hadExistingConnection bool s.runnerConnsMu.Lock() if existingConn, exists := s.runnerConns[runnerID]; exists { oldConn = existingConn + hadExistingConnection = true } s.runnerConns[runnerID] = conn s.runnerConnsMu.Unlock() // Close old connection outside lock (if it existed) if oldConn != nil { + log.Printf("Runner %d: closing existing WebSocket connection (reconnection)", runnerID) oldConn.Close() + } else if hadExistingConnection { + log.Printf("Runner %d: replacing existing WebSocket connection", runnerID) } + log.Printf("Runner %d: WebSocket connection established successfully", runnerID) + // Create a write mutex for this connection s.runnerConnsWriteMuMu.Lock() s.runnerConnsWriteMu[runnerID] = &sync.Mutex{} @@ -717,20 +847,31 @@ func (s *Server) handleRunnerWebSocket(w http.ResponseWriter, r *http.Request) { // Cleanup on disconnect defer func() { + log.Printf("Runner %d: WebSocket connection cleanup started", runnerID) + + // Update database status first + _, err := s.db.Exec( + `UPDATE runners SET status = ?, last_heartbeat = ? WHERE id = ?`, + types.RunnerStatusOffline, time.Now(), runnerID, + ) + if err != nil { + log.Printf("Warning: Failed to update runner %d status to offline: %v", runnerID, err) + } + + // Clean up connection maps s.runnerConnsMu.Lock() delete(s.runnerConns, runnerID) s.runnerConnsMu.Unlock() + s.runnerConnsWriteMuMu.Lock() delete(s.runnerConnsWriteMu, runnerID) s.runnerConnsWriteMuMu.Unlock() - _, _ = s.db.Exec( - `UPDATE runners SET status = ? WHERE id = ?`, - types.RunnerStatusOffline, runnerID, - ) // Immediately redistribute tasks that were assigned to this runner - log.Printf("Runner %d disconnected, redistributing its tasks", runnerID) + log.Printf("Runner %d: WebSocket disconnected, redistributing tasks", runnerID) s.redistributeRunnerTasks(runnerID) + + log.Printf("Runner %d: WebSocket connection cleanup completed", runnerID) }() // Set pong handler to update heartbeat when we receive pong responses from runner @@ -1341,7 +1482,7 @@ func (s *Server) distributeTasksToRunners() { // Get all pending tasks rows, err := s.db.Query( - `SELECT t.id, t.job_id, t.frame_start, t.frame_end, t.task_type, j.allow_parallel_runners, j.status as job_status, j.name as job_name + `SELECT t.id, t.job_id, t.frame_start, t.frame_end, t.task_type, j.allow_parallel_runners, j.status as job_status, j.name as job_name, j.user_id FROM tasks t JOIN jobs j ON t.job_id = j.id WHERE t.status = ? AND j.status != ? @@ -1363,6 +1504,7 @@ func (s *Server) distributeTasksToRunners() { AllowParallelRunners bool JobName string JobStatus string + JobUserID int64 } for rows.Next() { @@ -1375,9 +1517,10 @@ func (s *Server) distributeTasksToRunners() { AllowParallelRunners bool JobName string JobStatus string + JobUserID int64 } var allowParallel sql.NullBool - err := rows.Scan(&t.TaskID, &t.JobID, &t.FrameStart, &t.FrameEnd, &t.TaskType, &allowParallel, &t.JobStatus, &t.JobName) + err := rows.Scan(&t.TaskID, &t.JobID, &t.FrameStart, &t.FrameEnd, &t.TaskType, &allowParallel, &t.JobStatus, &t.JobName, &t.JobUserID) if err != nil { log.Printf("Failed to scan pending task: %v", err) continue @@ -1411,19 +1554,22 @@ func (s *Server) distributeTasksToRunners() { } s.runnerConnsMu.RUnlock() - // Get runner priorities and capabilities for all connected runners + // Get runner priorities, capabilities, and API key scopes for all connected runners runnerPriorities := make(map[int64]int) runnerCapabilities := make(map[int64]map[string]interface{}) + runnerScopes := make(map[int64]string) for _, runnerID := range connectedRunners { var priority int var capabilitiesJSON string - err := s.db.QueryRow("SELECT priority, capabilities FROM runners WHERE id = ?", runnerID).Scan(&priority, &capabilitiesJSON) + var scope string + err := s.db.QueryRow("SELECT priority, capabilities, api_key_scope FROM runners WHERE id = ?", runnerID).Scan(&priority, &capabilitiesJSON, &scope) if err != nil { // Default to 100 if priority not found priority = 100 capabilitiesJSON = "{}" } runnerPriorities[runnerID] = priority + runnerScopes[runnerID] = scope // Parse capabilities JSON (can contain both bools and numbers) var capabilities map[string]interface{} @@ -1512,6 +1658,30 @@ func (s *Server) distributeTasksToRunners() { // Try to find the best runner for this task for _, runnerID := range connectedRunners { + // Check if runner's API key scope allows working on this job + runnerScope := runnerScopes[runnerID] + if runnerScope == "user" && task.JobUserID != 0 { + // User-scoped runner - check if they can work on jobs from this user + // For now, user-scoped runners can only work on jobs from the same user who created their API key + var apiKeyCreatedBy int64 + if runnerScope == "user" { + // Get the user who created this runner's API key + var apiKeyID sql.NullInt64 + err := s.db.QueryRow("SELECT api_key_id FROM runners WHERE id = ?", runnerID).Scan(&apiKeyID) + if err == nil && apiKeyID.Valid { + err = s.db.QueryRow("SELECT created_by FROM runner_api_keys WHERE id = ?", apiKeyID.Int64).Scan(&apiKeyCreatedBy) + if err != nil { + continue // Skip this runner if we can't determine API key ownership + } + // Only allow if the job owner matches the API key creator + if apiKeyCreatedBy != task.JobUserID { + continue // This user-scoped runner cannot work on this job + } + } + } + // Manager-scoped runners can work on any job + } + // Check if runner has required capability capabilities := runnerCapabilities[runnerID] hasRequired := false @@ -1891,9 +2061,11 @@ func (s *Server) assignTaskToRunner(runnerID int64, taskID int64) error { // redistributeRunnerTasks resets tasks assigned to a disconnected/dead runner and redistributes them func (s *Server) redistributeRunnerTasks(runnerID int64) { - // Get tasks assigned to this runner + log.Printf("Starting task redistribution for disconnected runner %d", runnerID) + + // Get tasks assigned to this runner that are still running taskRows, err := s.db.Query( - `SELECT id, retry_count, max_retries FROM tasks + `SELECT id, retry_count, max_retries, job_id FROM tasks WHERE runner_id = ? AND status = ?`, runnerID, types.TaskStatusRunning, ) @@ -1907,6 +2079,7 @@ func (s *Server) redistributeRunnerTasks(runnerID int64) { ID int64 RetryCount int MaxRetries int + JobID int64 } for taskRows.Next() { @@ -1914,51 +2087,78 @@ func (s *Server) redistributeRunnerTasks(runnerID int64) { ID int64 RetryCount int MaxRetries int + JobID int64 } - if err := taskRows.Scan(&t.ID, &t.RetryCount, &t.MaxRetries); err == nil { - tasksToReset = append(tasksToReset, t) + if err := taskRows.Scan(&t.ID, &t.RetryCount, &t.MaxRetries, &t.JobID); err != nil { + log.Printf("Failed to scan task for runner %d: %v", runnerID, err) + continue } + tasksToReset = append(tasksToReset, t) } if len(tasksToReset) == 0 { - return // No tasks to redistribute + log.Printf("No running tasks found for runner %d to redistribute", runnerID) + return } - log.Printf("Redistributing %d tasks from runner %d", len(tasksToReset), runnerID) + log.Printf("Redistributing %d running tasks from disconnected runner %d", len(tasksToReset), runnerID) // Reset or fail tasks + resetCount := 0 + failedCount := 0 + for _, task := range tasksToReset { if task.RetryCount >= task.MaxRetries { // Mark as failed _, err = s.db.Exec( - `UPDATE tasks SET status = ?, error_message = ?, runner_id = NULL - WHERE id = ?`, - types.TaskStatusFailed, "Runner died, max retries exceeded", task.ID, + `UPDATE tasks SET status = ?, error_message = ?, runner_id = NULL, completed_at = ? + WHERE id = ? AND runner_id = ?`, + types.TaskStatusFailed, "Runner disconnected, max retries exceeded", time.Now(), task.ID, runnerID, ) if err != nil { log.Printf("Failed to mark task %d as failed: %v", task.ID, err) } else { + failedCount++ // Log task failure - s.logTaskEvent(task.ID, &runnerID, types.LogLevelError, fmt.Sprintf("Task failed - runner %d disconnected, max retries (%d) exceeded", runnerID, task.MaxRetries), "") + s.logTaskEvent(task.ID, &runnerID, types.LogLevelError, + fmt.Sprintf("Task failed - runner %d disconnected, max retries (%d) exceeded", runnerID, task.MaxRetries), "") } } else { // Reset to pending so it can be redistributed _, err = s.db.Exec( `UPDATE tasks SET status = ?, runner_id = NULL, current_step = NULL, - retry_count = retry_count + 1 WHERE id = ?`, - types.TaskStatusPending, task.ID, + retry_count = retry_count + 1, started_at = NULL WHERE id = ? AND runner_id = ?`, + types.TaskStatusPending, task.ID, runnerID, ) if err != nil { log.Printf("Failed to reset task %d: %v", task.ID, err) } else { + resetCount++ // Log task reset for redistribution - s.logTaskEvent(task.ID, &runnerID, types.LogLevelWarn, fmt.Sprintf("Runner %d disconnected, task reset for redistribution (retry %d/%d)", runnerID, task.RetryCount+1, task.MaxRetries), "") + s.logTaskEvent(task.ID, &runnerID, types.LogLevelWarn, + fmt.Sprintf("Runner %d disconnected, task reset for redistribution (retry %d/%d)", runnerID, task.RetryCount+1, task.MaxRetries), "") } } } + log.Printf("Task redistribution complete for runner %d: %d tasks reset, %d tasks failed", runnerID, resetCount, failedCount) + + // Update job statuses for affected jobs + jobIDs := make(map[int64]bool) + for _, task := range tasksToReset { + jobIDs[task.JobID] = true + } + + for jobID := range jobIDs { + // Update job status based on remaining tasks + go s.updateJobStatusFromTasks(jobID) + } + // Immediately redistribute the reset tasks - s.triggerTaskDistribution() + if resetCount > 0 { + log.Printf("Triggering task distribution for %d reset tasks from runner %d", resetCount, runnerID) + s.triggerTaskDistribution() + } } // logTaskEvent logs an event to a task's log (manager-side logging) @@ -1986,3 +2186,4 @@ func (s *Server) logTaskEvent(taskID int64, runnerID *int64, logLevel types.LogL StepName: stepName, }) } + diff --git a/internal/api/server.go b/internal/api/server.go index 714b07d..f2965fd 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -215,14 +215,20 @@ func (s *Server) setupRoutes() { // WebSocket routes for real-time updates r.With(func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Remove timeout middleware for WebSocket - next.ServeHTTP(w, r) + // Apply authentication middleware first + s.auth.Middleware(func(w http.ResponseWriter, r *http.Request) { + // Remove timeout middleware for WebSocket + next.ServeHTTP(w, r) + })(w, r) }) }).Get("/ws", s.handleJobsWebSocket) r.With(func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Remove timeout middleware for WebSocket - next.ServeHTTP(w, r) + // Apply authentication middleware first + s.auth.Middleware(func(w http.ResponseWriter, r *http.Request) { + // Remove timeout middleware for WebSocket + next.ServeHTTP(w, r) + })(w, r) }) }).Get("/{id}/ws", s.handleJobWebSocket) }) @@ -233,10 +239,11 @@ func (s *Server) setupRoutes() { return http.HandlerFunc(s.auth.AdminMiddleware(next.ServeHTTP)) }) r.Route("/runners", func(r chi.Router) { - r.Route("/tokens", func(r chi.Router) { - r.Post("/", s.handleGenerateRegistrationToken) - r.Get("/", s.handleListRegistrationTokens) - r.Delete("/{id}", s.handleRevokeRegistrationToken) + r.Route("/api-keys", func(r chi.Router) { + r.Post("/", s.handleGenerateRunnerAPIKey) + r.Get("/", s.handleListRunnerAPIKeys) + r.Patch("/{id}/revoke", s.handleRevokeRunnerAPIKey) + r.Delete("/{id}", s.handleDeleteRunnerAPIKey) }) r.Get("/", s.handleListRunnersAdmin) r.Post("/{id}/verify", s.handleVerifyRunner) @@ -266,6 +273,7 @@ func (s *Server) setupRoutes() { r.Use(func(next http.Handler) http.Handler { return http.HandlerFunc(s.runnerAuthMiddleware(next.ServeHTTP)) }) + r.Get("/ping", s.handleRunnerPing) r.Post("/tasks/{id}/progress", s.handleUpdateTaskProgress) r.Post("/tasks/{id}/steps", s.handleUpdateTaskStep) r.Get("/jobs/{jobId}/context.tar", s.handleDownloadJobContext) @@ -441,7 +449,7 @@ func (s *Server) handleLocalRegister(w http.ResponseWriter, r *http.Request) { } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid request body") + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } @@ -489,7 +497,7 @@ func (s *Server) handleLocalLogin(w http.ResponseWriter, r *http.Request) { } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid request body") + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } @@ -540,7 +548,7 @@ func (s *Server) handleChangePassword(w http.ResponseWriter, r *http.Request) { } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid request body") + s.respondError(w, http.StatusBadRequest, fmt.Sprintf("Invalid request body: expected valid JSON - %v", err)) return } diff --git a/internal/auth/secrets.go b/internal/auth/secrets.go index 9135d41..5ce9713 100644 --- a/internal/auth/secrets.go +++ b/internal/auth/secrets.go @@ -1,292 +1,209 @@ package auth import ( - "crypto/hmac" "crypto/rand" "crypto/sha256" "database/sql" "encoding/hex" "fmt" - "io" - "log" - "net/http" "os" "strings" + "sync" "time" ) -// Secrets handles secret and token management +// Secrets handles API key management type Secrets struct { - db *sql.DB - fixedRegistrationToken string // Fixed token from environment variable (reusable, never expires) + db *sql.DB + RegistrationMu sync.Mutex // Protects concurrent runner registrations + fixedAPIKey string // Fixed API key from environment variable (optional) } // NewSecrets creates a new secrets manager func NewSecrets(db *sql.DB) (*Secrets, error) { s := &Secrets{db: db} - // Check for fixed registration token from environment - fixedToken := os.Getenv("FIXED_REGISTRATION_TOKEN") - if fixedToken != "" { - s.fixedRegistrationToken = fixedToken - log.Printf("Fixed registration token enabled (from FIXED_REGISTRATION_TOKEN env var)") - log.Printf("WARNING: Fixed registration token is reusable and never expires - use only for testing/development!") - } - - // Ensure manager secret exists - if err := s.ensureManagerSecret(); err != nil { - return nil, fmt.Errorf("failed to ensure manager secret: %w", err) + // Check for fixed API key from environment + if fixedKey := os.Getenv("FIXED_API_KEY"); fixedKey != "" { + s.fixedAPIKey = fixedKey } return s, nil } -// ensureManagerSecret ensures a manager secret exists in the database -func (s *Secrets) ensureManagerSecret() error { - var count int - err := s.db.QueryRow("SELECT COUNT(*) FROM manager_secrets").Scan(&count) - if err != nil { - return fmt.Errorf("failed to check manager secrets: %w", err) - } - - if count == 0 { - // Generate new manager secret - secret, err := generateSecret(32) - if err != nil { - return fmt.Errorf("failed to generate manager secret: %w", err) - } - - _, err = s.db.Exec("INSERT INTO manager_secrets (secret) VALUES (?)", secret) - if err != nil { - return fmt.Errorf("failed to store manager secret: %w", err) - } - } - - return nil +// APIKeyInfo represents information about an API key +type APIKeyInfo struct { + ID int64 `json:"id"` + Key string `json:"key"` + Name string `json:"name"` + Description *string `json:"description,omitempty"` + Scope string `json:"scope"` // 'manager' or 'user' + IsActive bool `json:"is_active"` + CreatedAt time.Time `json:"created_at"` + CreatedBy int64 `json:"created_by"` } -// GetManagerSecret retrieves the current manager secret -func (s *Secrets) GetManagerSecret() (string, error) { - var secret string - err := s.db.QueryRow("SELECT secret FROM manager_secrets ORDER BY created_at DESC LIMIT 1").Scan(&secret) +// GenerateRunnerAPIKey generates a new API key for runners +func (s *Secrets) GenerateRunnerAPIKey(createdBy int64, name, description string, scope string) (*APIKeyInfo, error) { + // Generate API key in format: jk_r1_abc123def456... + key, err := s.generateAPIKey() if err != nil { - return "", fmt.Errorf("failed to get manager secret: %w", err) - } - return secret, nil -} - -// GenerateRegistrationToken generates a new registration token -// If expiresIn is 0, the token will never expire (uses far future date) -// Note: Token expiration only affects whether the token can be used for registration. -// Once a runner registers, it operates independently using its own secrets. -func (s *Secrets) GenerateRegistrationToken(createdBy int64, expiresIn time.Duration) (string, error) { - token, err := generateSecret(32) - if err != nil { - return "", fmt.Errorf("failed to generate token: %w", err) + return nil, fmt.Errorf("failed to generate API key: %w", err) } - var expiresAt time.Time - if expiresIn == 0 { - // Use far future date (year 9999) to represent infinite expiration - expiresAt = time.Date(9999, 12, 31, 23, 59, 59, 0, time.UTC) - } else { - expiresAt = time.Now().Add(expiresIn) + // Extract prefix (first 5 chars after "jk_") and hash the full key + parts := strings.Split(key, "_") + if len(parts) < 3 { + return nil, fmt.Errorf("invalid API key format generated") } + keyPrefix := fmt.Sprintf("%s_%s", parts[0], parts[1]) + + keyHash := sha256.Sum256([]byte(key)) + keyHashStr := hex.EncodeToString(keyHash[:]) _, err = s.db.Exec( - "INSERT INTO registration_tokens (token, expires_at, created_by) VALUES (?, ?, ?)", - token, expiresAt, createdBy, + `INSERT INTO runner_api_keys (key_prefix, key_hash, name, description, scope, is_active, created_by) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + keyPrefix, keyHashStr, name, description, scope, true, createdBy, ) if err != nil { - return "", fmt.Errorf("failed to store registration token: %w", err) + return nil, fmt.Errorf("failed to store API key: %w", err) } - return token, nil -} + // Get the inserted key info + var keyInfo APIKeyInfo + err = s.db.QueryRow( + `SELECT id, name, description, scope, is_active, created_at, created_by + FROM runner_api_keys WHERE key_prefix = ?`, + keyPrefix, + ).Scan(&keyInfo.ID, &keyInfo.Name, &keyInfo.Description, &keyInfo.Scope, &keyInfo.IsActive, &keyInfo.CreatedAt, &keyInfo.CreatedBy) -// TokenValidationResult represents the result of token validation -type TokenValidationResult struct { - Valid bool - Reason string // "valid", "not_found", "already_used", "expired" - Error error -} - -// ValidateRegistrationToken validates a registration token -func (s *Secrets) ValidateRegistrationToken(token string) (bool, error) { - result, err := s.ValidateRegistrationTokenDetailed(token) if err != nil { - return false, err + return nil, fmt.Errorf("failed to retrieve created API key: %w", err) } - // For backward compatibility, return just the valid boolean - return result.Valid, nil + + keyInfo.Key = key + return &keyInfo, nil } -// ValidateRegistrationTokenDetailed validates a registration token and returns detailed result -func (s *Secrets) ValidateRegistrationTokenDetailed(token string) (*TokenValidationResult, error) { - // Check fixed token first (if set) - it's reusable and never expires - if s.fixedRegistrationToken != "" && token == s.fixedRegistrationToken { - log.Printf("Fixed registration token used (from FIXED_REGISTRATION_TOKEN env var)") - return &TokenValidationResult{Valid: true, Reason: "valid"}, nil +// generateAPIKey generates a new API key in format jk_r1_abc123def456... +func (s *Secrets) generateAPIKey() (string, error) { + // Generate random suffix + randomBytes := make([]byte, 16) + if _, err := rand.Read(randomBytes); err != nil { + return "", err + } + randomStr := hex.EncodeToString(randomBytes) + + // Generate a unique prefix (jk_r followed by 1 random digit) + prefixDigit := make([]byte, 1) + if _, err := rand.Read(prefixDigit); err != nil { + return "", err } - // Check database tokens - var used bool - var expiresAt time.Time - var id int64 + prefix := fmt.Sprintf("jk_r%d", prefixDigit[0]%10) + return fmt.Sprintf("%s_%s", prefix, randomStr), nil +} + +// ValidateRunnerAPIKey validates an API key and returns the key ID and scope if valid +func (s *Secrets) ValidateRunnerAPIKey(apiKey string) (int64, string, error) { + if apiKey == "" { + return 0, "", fmt.Errorf("API key is required") + } + + // Check fixed API key first (for testing/development) + if s.fixedAPIKey != "" && apiKey == s.fixedAPIKey { + // Return a special ID for fixed API key (doesn't exist in database) + return -1, "manager", nil + } + + // Parse API key format: jk_rX_... + if !strings.HasPrefix(apiKey, "jk_r") { + return 0, "", fmt.Errorf("invalid API key format: expected format 'jk_rX_...' where X is a number (e.g., 'jk_r1_abc123...')") + } + + parts := strings.Split(apiKey, "_") + if len(parts) < 3 { + return 0, "", fmt.Errorf("invalid API key format: expected format 'jk_rX_...' with at least 3 parts separated by underscores") + } + + keyPrefix := fmt.Sprintf("%s_%s", parts[0], parts[1]) + + // Hash the full key for comparison + keyHash := sha256.Sum256([]byte(apiKey)) + keyHashStr := hex.EncodeToString(keyHash[:]) + + var keyID int64 + var scope string + var isActive bool err := s.db.QueryRow( - "SELECT id, expires_at, used FROM registration_tokens WHERE token = ?", - token, - ).Scan(&id, &expiresAt, &used) + `SELECT id, scope, is_active FROM runner_api_keys + WHERE key_prefix = ? AND key_hash = ?`, + keyPrefix, keyHashStr, + ).Scan(&keyID, &scope, &isActive) if err == sql.ErrNoRows { - return &TokenValidationResult{Valid: false, Reason: "not_found"}, nil + return 0, "", fmt.Errorf("API key not found or invalid - please check that the key is correct and active") } if err != nil { - return nil, fmt.Errorf("failed to query token: %w", err) + return 0, "", fmt.Errorf("failed to validate API key: %w", err) } - if used { - return &TokenValidationResult{Valid: false, Reason: "already_used"}, nil + if !isActive { + return 0, "", fmt.Errorf("API key is inactive") } - // Check if token has infinite expiration (year 9999 or later) - // Tokens with infinite expiration never expire - infiniteExpirationThreshold := time.Date(3000, 1, 1, 0, 0, 0, 0, time.UTC) - if expiresAt.Before(infiniteExpirationThreshold) { - // Normal expiration check for tokens with finite expiration - if time.Now().After(expiresAt) { - return &TokenValidationResult{Valid: false, Reason: "expired"}, nil - } - } - // If expiresAt is after the threshold, treat it as infinite (never expires) + // Update last_used_at (don't fail if this update fails) + s.db.Exec(`UPDATE runner_api_keys SET last_used_at = ? WHERE id = ?`, time.Now(), keyID) - // Mark token as used - _, err = s.db.Exec("UPDATE registration_tokens SET used = 1 WHERE id = ?", id) - if err != nil { - return nil, fmt.Errorf("failed to mark token as used: %w", err) - } - - return &TokenValidationResult{Valid: true, Reason: "valid"}, nil + return keyID, scope, nil } -// ListRegistrationTokens lists all registration tokens -func (s *Secrets) ListRegistrationTokens() ([]map[string]interface{}, error) { +// ListRunnerAPIKeys lists all runner API keys +func (s *Secrets) ListRunnerAPIKeys() ([]APIKeyInfo, error) { rows, err := s.db.Query( - `SELECT id, token, expires_at, used, created_at, created_by - FROM registration_tokens + `SELECT id, key_prefix, name, description, scope, is_active, created_at, created_by + FROM runner_api_keys ORDER BY created_at DESC`, ) if err != nil { - return nil, fmt.Errorf("failed to query tokens: %w", err) + return nil, fmt.Errorf("failed to query API keys: %w", err) } defer rows.Close() - var tokens []map[string]interface{} + var keys []APIKeyInfo for rows.Next() { - var id, createdBy sql.NullInt64 - var token string - var expiresAt, createdAt time.Time - var used bool + var key APIKeyInfo + var description sql.NullString - err := rows.Scan(&id, &token, &expiresAt, &used, &createdAt, &createdBy) + err := rows.Scan(&key.ID, &key.Key, &key.Name, &description, &key.Scope, &key.IsActive, &key.CreatedAt, &key.CreatedBy) if err != nil { continue } - tokens = append(tokens, map[string]interface{}{ - "id": id.Int64, - "token": token, - "expires_at": expiresAt, - "used": used, - "created_at": createdAt, - "created_by": createdBy.Int64, - }) + if description.Valid { + key.Description = &description.String + } + + keys = append(keys, key) } - return tokens, nil + return keys, nil } -// RevokeRegistrationToken revokes a registration token -func (s *Secrets) RevokeRegistrationToken(tokenID int64) error { - _, err := s.db.Exec("UPDATE registration_tokens SET used = 1 WHERE id = ?", tokenID) +// RevokeRunnerAPIKey revokes (deactivates) a runner API key +func (s *Secrets) RevokeRunnerAPIKey(keyID int64) error { + _, err := s.db.Exec("UPDATE runner_api_keys SET is_active = false WHERE id = ?", keyID) return err } -// GenerateRunnerSecret generates a unique secret for a runner -func (s *Secrets) GenerateRunnerSecret() (string, error) { - return generateSecret(32) +// DeleteRunnerAPIKey deletes a runner API key +func (s *Secrets) DeleteRunnerAPIKey(keyID int64) error { + _, err := s.db.Exec("DELETE FROM runner_api_keys WHERE id = ?", keyID) + return err } -// SignRequest signs a request with the given secret -func SignRequest(method, path, body, secret string, timestamp time.Time) string { - message := fmt.Sprintf("%s\n%s\n%s\n%d", method, path, body, timestamp.Unix()) - h := hmac.New(sha256.New, []byte(secret)) - h.Write([]byte(message)) - return hex.EncodeToString(h.Sum(nil)) -} - -// VerifyRequest verifies a signed request -func VerifyRequest(r *http.Request, secret string, maxAge time.Duration) (bool, error) { - signature := r.Header.Get("X-Runner-Signature") - if signature == "" { - return false, fmt.Errorf("missing signature") - } - - timestampStr := r.Header.Get("X-Runner-Timestamp") - if timestampStr == "" { - return false, fmt.Errorf("missing timestamp") - } - - var timestampUnix int64 - _, err := fmt.Sscanf(timestampStr, "%d", ×tampUnix) - if err != nil { - return false, fmt.Errorf("invalid timestamp: %w", err) - } - timestamp := time.Unix(timestampUnix, 0) - - // Check timestamp is not too old - if time.Since(timestamp) > maxAge { - return false, fmt.Errorf("request too old") - } - - // Check timestamp is not in the future (allow 1 minute clock skew) - if timestamp.After(time.Now().Add(1 * time.Minute)) { - return false, fmt.Errorf("timestamp in future") - } - - // Read body - bodyBytes, err := io.ReadAll(r.Body) - if err != nil { - return false, fmt.Errorf("failed to read body: %w", err) - } - // Restore body for handler - r.Body = io.NopCloser(strings.NewReader(string(bodyBytes))) - - // Verify signature - use path without query parameters (query params are not part of signature) - // The runner signs with the path including query params, but we verify with just the path - // This is intentional - query params are for identification, not part of the signature - path := r.URL.Path - expectedSig := SignRequest(r.Method, path, string(bodyBytes), secret, timestamp) - - return hmac.Equal([]byte(signature), []byte(expectedSig)), nil -} - -// GetRunnerSecret retrieves the runner secret for a runner ID -func (s *Secrets) GetRunnerSecret(runnerID int64) (string, error) { - var secret string - err := s.db.QueryRow("SELECT runner_secret FROM runners WHERE id = ?", runnerID).Scan(&secret) - if err == sql.ErrNoRows { - return "", fmt.Errorf("runner not found") - } - if err != nil { - return "", fmt.Errorf("failed to get runner secret: %w", err) - } - if secret == "" { - return "", fmt.Errorf("runner not verified") - } - return secret, nil -} // generateSecret generates a random secret of the given length func generateSecret(length int) (string, error) { diff --git a/internal/database/schema.go b/internal/database/schema.go index 1519e61..f67ed82 100644 --- a/internal/database/schema.go +++ b/internal/database/schema.go @@ -43,6 +43,7 @@ func (db *DB) migrate() error { `CREATE SEQUENCE IF NOT EXISTS seq_job_files_id START 1`, `CREATE SEQUENCE IF NOT EXISTS seq_manager_secrets_id START 1`, `CREATE SEQUENCE IF NOT EXISTS seq_registration_tokens_id START 1`, + `CREATE SEQUENCE IF NOT EXISTS seq_runner_api_keys_id START 1`, `CREATE SEQUENCE IF NOT EXISTS seq_task_logs_id START 1`, `CREATE SEQUENCE IF NOT EXISTS seq_task_steps_id START 1`, } @@ -66,6 +67,20 @@ func (db *DB) migrate() error { UNIQUE(oauth_provider, oauth_id) ); + CREATE TABLE IF NOT EXISTS runner_api_keys ( + id BIGINT PRIMARY KEY DEFAULT nextval('seq_runner_api_keys_id'), + key_prefix TEXT NOT NULL, -- First part of API key (e.g., "jk_r1") + key_hash TEXT NOT NULL, -- SHA256 hash of full API key + name TEXT NOT NULL, -- Human-readable name + description TEXT, -- Optional description + scope TEXT NOT NULL DEFAULT 'user', -- 'manager' or 'user' - manager scope allows all jobs, user scope only allows jobs from key owner + is_active BOOLEAN NOT NULL DEFAULT true, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + created_by BIGINT, + FOREIGN KEY (created_by) REFERENCES users(id), + UNIQUE(key_prefix) + ); + CREATE TABLE IF NOT EXISTS jobs ( id BIGINT PRIMARY KEY DEFAULT nextval('seq_jobs_id'), user_id BIGINT NOT NULL, @@ -76,7 +91,7 @@ func (db *DB) migrate() error { frame_start INTEGER, frame_end INTEGER, output_format TEXT, - allow_parallel_runners BOOLEAN, + allow_parallel_runners BOOLEAN NOT NULL DEFAULT true, timeout_seconds INTEGER DEFAULT 86400, blend_metadata TEXT, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -94,12 +109,12 @@ func (db *DB) migrate() error { status TEXT NOT NULL DEFAULT 'offline', last_heartbeat TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, capabilities TEXT, - registration_token TEXT, - runner_secret TEXT, - manager_secret TEXT, - verified BOOLEAN NOT NULL DEFAULT false, + api_key_id BIGINT, -- Reference to the API key used for this runner + api_key_scope TEXT NOT NULL DEFAULT 'user', -- Scope of the API key ('manager' or 'user') priority INTEGER NOT NULL DEFAULT 100, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + fingerprint TEXT, -- Hardware fingerprint (NULL for fixed API keys) + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (api_key_id) REFERENCES runner_api_keys(id) ); CREATE TABLE IF NOT EXISTS tasks ( @@ -110,6 +125,11 @@ func (db *DB) migrate() error { frame_end INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'pending', output_path TEXT, + task_type TEXT NOT NULL DEFAULT 'render', + current_step TEXT, + retry_count INTEGER NOT NULL DEFAULT 0, + max_retries INTEGER NOT NULL DEFAULT 3, + timeout_seconds INTEGER, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, started_at TIMESTAMP, completed_at TIMESTAMP, @@ -132,16 +152,6 @@ func (db *DB) migrate() error { created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); - CREATE TABLE IF NOT EXISTS registration_tokens ( - id BIGINT PRIMARY KEY DEFAULT nextval('seq_registration_tokens_id'), - token TEXT UNIQUE NOT NULL, - expires_at TIMESTAMP NOT NULL, - used BOOLEAN NOT NULL DEFAULT false, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - created_by BIGINT, - FOREIGN KEY (created_by) REFERENCES users(id) - ); - CREATE TABLE IF NOT EXISTS task_logs ( id BIGINT PRIMARY KEY DEFAULT nextval('seq_task_logs_id'), task_id BIGINT NOT NULL, @@ -172,8 +182,9 @@ func (db *DB) migrate() error { CREATE INDEX IF NOT EXISTS idx_tasks_job_status ON tasks(job_id, status); CREATE INDEX IF NOT EXISTS idx_tasks_started_at ON tasks(started_at); CREATE INDEX IF NOT EXISTS idx_job_files_job_id ON job_files(job_id); - CREATE INDEX IF NOT EXISTS idx_registration_tokens_token ON registration_tokens(token); - CREATE INDEX IF NOT EXISTS idx_registration_tokens_expires_at ON registration_tokens(expires_at); + CREATE INDEX IF NOT EXISTS idx_runner_api_keys_prefix ON runner_api_keys(key_prefix); + CREATE INDEX IF NOT EXISTS idx_runner_api_keys_active ON runner_api_keys(is_active); + CREATE INDEX IF NOT EXISTS idx_runners_api_key_id ON runners(api_key_id); CREATE INDEX IF NOT EXISTS idx_task_logs_task_id_created_at ON task_logs(task_id, created_at); CREATE INDEX IF NOT EXISTS idx_task_logs_task_id_id ON task_logs(task_id, id DESC); CREATE INDEX IF NOT EXISTS idx_task_logs_runner_id ON task_logs(runner_id); @@ -191,45 +202,13 @@ func (db *DB) migrate() error { return fmt.Errorf("failed to create schema: %w", err) } - // Migrate existing tables to add new columns + // Database migrations for schema updates + // NOTE: Migrations are currently disabled since the database is cleared by 'make cleanup-manager' + // before running. All schema changes have been rolled into the main schema above. + // When ready to implement proper migrations for production, uncomment and populate this array. + // TODO: Implement proper database migration system for production use migrations := []string{ - // Add is_admin to users if it doesn't exist - `ALTER TABLE users ADD COLUMN IF NOT EXISTS is_admin BOOLEAN NOT NULL DEFAULT false`, - // Add new columns to runners if they don't exist - `ALTER TABLE runners ADD COLUMN IF NOT EXISTS registration_token TEXT`, - `ALTER TABLE runners ADD COLUMN IF NOT EXISTS runner_secret TEXT`, - `ALTER TABLE runners ADD COLUMN IF NOT EXISTS manager_secret TEXT`, - `ALTER TABLE runners ADD COLUMN IF NOT EXISTS verified BOOLEAN NOT NULL DEFAULT false`, - `ALTER TABLE runners ADD COLUMN IF NOT EXISTS priority INTEGER NOT NULL DEFAULT 100`, - // Add allow_parallel_runners to jobs if it doesn't exist - `ALTER TABLE jobs ADD COLUMN IF NOT EXISTS allow_parallel_runners BOOLEAN NOT NULL DEFAULT true`, - // Add timeout_seconds to jobs if it doesn't exist - `ALTER TABLE jobs ADD COLUMN IF NOT EXISTS timeout_seconds INTEGER DEFAULT 86400`, - // Add blend_metadata to jobs if it doesn't exist - `ALTER TABLE jobs ADD COLUMN IF NOT EXISTS blend_metadata TEXT`, - // Add job_type to jobs if it doesn't exist - `ALTER TABLE jobs ADD COLUMN IF NOT EXISTS job_type TEXT DEFAULT 'render'`, - // Add task_type to tasks if it doesn't exist - `ALTER TABLE tasks ADD COLUMN IF NOT EXISTS task_type TEXT DEFAULT 'render'`, - // Add new columns to tasks if they don't exist - `ALTER TABLE tasks ADD COLUMN IF NOT EXISTS current_step TEXT`, - `ALTER TABLE tasks ADD COLUMN IF NOT EXISTS retry_count INTEGER DEFAULT 0`, - `ALTER TABLE tasks ADD COLUMN IF NOT EXISTS max_retries INTEGER DEFAULT 3`, - `ALTER TABLE tasks ADD COLUMN IF NOT EXISTS timeout_seconds INTEGER`, - // Add updated_at columns for ETag support - `ALTER TABLE jobs ADD COLUMN IF NOT EXISTS updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP`, - `ALTER TABLE tasks ADD COLUMN IF NOT EXISTS updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP`, - // Migrate file_size from INTEGER to BIGINT to support large files (>2GB) - // DuckDB doesn't support direct ALTER COLUMN TYPE, so we use a workaround: - // 1. Add new column as BIGINT - // 2. Copy data from old column - // 3. Drop old column - // 4. Rename new column - // Note: This will only run if the column exists and is INTEGER - `ALTER TABLE job_files ADD COLUMN IF NOT EXISTS file_size_new BIGINT`, - `UPDATE job_files SET file_size_new = CAST(file_size AS BIGINT) WHERE file_size_new IS NULL`, - `ALTER TABLE job_files DROP COLUMN IF EXISTS file_size`, - `ALTER TABLE job_files RENAME COLUMN file_size_new TO file_size`, + // Future migrations will go here when we implement proper migration handling } for _, migration := range migrations { diff --git a/internal/runner/client.go b/internal/runner/client.go index dd27ff9..2fce75b 100644 --- a/internal/runner/client.go +++ b/internal/runner/client.go @@ -4,13 +4,16 @@ import ( "archive/tar" "bufio" "bytes" + "crypto/sha256" _ "embed" + "encoding/hex" "encoding/json" "errors" "fmt" "io" "log" "mime/multipart" + "net" "net/http" "net/url" "os" @@ -35,8 +38,7 @@ type Client struct { hostname string httpClient *http.Client runnerID int64 - runnerSecret string - managerSecret string + apiKey string // API key for authentication wsConn *websocket.Conn wsConnMu sync.RWMutex wsWriteMu sync.Mutex // Protects concurrent writes to WebSocket (WebSocket is not thread-safe) @@ -54,11 +56,13 @@ type Client struct { allocatedDevices map[int64]string // map[taskID]device - tracks which device is allocated to which task allocatedDevicesMu sync.RWMutex // Protects allocatedDevices longRunningClient *http.Client // HTTP client for long-running operations (no timeout) + fingerprint string // Unique hardware fingerprint for this runner + fingerprintMu sync.RWMutex // Protects fingerprint } // NewClient creates a new runner client func NewClient(managerURL, name, hostname string) *Client { - return &Client{ + client := &Client{ managerURL: managerURL, name: name, hostname: hostname, @@ -67,13 +71,88 @@ func NewClient(managerURL, name, hostname string) *Client { stopChan: make(chan struct{}), stepStartTimes: make(map[string]time.Time), } + // Generate fingerprint immediately + client.generateFingerprint() + return client } -// SetSecrets sets the runner and manager secrets -func (c *Client) SetSecrets(runnerID int64, runnerSecret, managerSecret string) { +// generateFingerprint creates a unique hardware fingerprint for this runner +// This fingerprint should be stable across restarts but unique per physical/virtual machine +func (c *Client) generateFingerprint() { + c.fingerprintMu.Lock() + defer c.fingerprintMu.Unlock() + + // Use a combination of stable hardware identifiers + var components []string + + // Add hostname (stable on most systems) + components = append(components, c.hostname) + + // Try to get machine ID from /etc/machine-id (Linux) + if machineID, err := os.ReadFile("/etc/machine-id"); err == nil { + components = append(components, strings.TrimSpace(string(machineID))) + } + + // Try to get product UUID from /sys/class/dmi/id/product_uuid (Linux) + if productUUID, err := os.ReadFile("/sys/class/dmi/id/product_uuid"); err == nil { + components = append(components, strings.TrimSpace(string(productUUID))) + } + + // Try to get MAC address of first network interface (cross-platform) + if macAddr, err := c.getMACAddress(); err == nil { + components = append(components, macAddr) + } + + // If no stable identifiers found, fall back to hostname + process ID + timestamp + // This is less ideal but ensures uniqueness + if len(components) <= 1 { + components = append(components, fmt.Sprintf("%d", os.Getpid())) + components = append(components, fmt.Sprintf("%d", time.Now().Unix())) + } + + // Create fingerprint by hashing the components + h := sha256.New() + for _, comp := range components { + h.Write([]byte(comp)) + h.Write([]byte{0}) // separator + } + + c.fingerprint = hex.EncodeToString(h.Sum(nil)) +} + +// getMACAddress returns the MAC address of the first non-loopback network interface +func (c *Client) getMACAddress() (string, error) { + interfaces, err := net.Interfaces() + if err != nil { + return "", err + } + + for _, iface := range interfaces { + // Skip loopback and down interfaces + if iface.Flags&net.FlagLoopback != 0 || iface.Flags&net.FlagUp == 0 { + continue + } + // Skip interfaces without hardware address + if iface.HardwareAddr == nil || len(iface.HardwareAddr) == 0 { + continue + } + return iface.HardwareAddr.String(), nil + } + + return "", fmt.Errorf("no suitable network interface found") +} + +// GetFingerprint returns the runner's hardware fingerprint +func (c *Client) GetFingerprint() string { + c.fingerprintMu.RLock() + defer c.fingerprintMu.RUnlock() + return c.fingerprint +} + +// SetAPIKey sets the runner ID and API key +func (c *Client) SetAPIKey(runnerID int64, apiKey string) { c.runnerID = runnerID - c.runnerSecret = runnerSecret - c.managerSecret = managerSecret + c.apiKey = apiKey // Initialize runner workspace directory if not already initialized if c.workspaceDir == "" { @@ -408,10 +487,15 @@ func (c *Client) Register(registrationToken string) (int64, string, string, erro } req := map[string]interface{}{ - "name": c.name, - "hostname": c.hostname, - "capabilities": string(capabilitiesJSON), - "registration_token": registrationToken, + "name": c.name, + "hostname": c.hostname, + "capabilities": string(capabilitiesJSON), + "api_key": registrationToken, // API key passed as registrationToken param for compatibility + } + + // Only send fingerprint for non-fixed API keys to avoid uniqueness conflicts + if !strings.HasPrefix(registrationToken, "jk_r0_") { // Fixed test key + req["fingerprint"] = c.GetFingerprint() } body, _ := json.Marshal(req) @@ -447,19 +531,16 @@ func (c *Client) Register(registrationToken string) (int64, string, string, erro } var result struct { - ID int64 `json:"id"` - RunnerSecret string `json:"runner_secret"` - ManagerSecret string `json:"manager_secret"` + ID int64 `json:"id"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return 0, "", "", fmt.Errorf("failed to decode response: %w", err) } c.runnerID = result.ID - c.runnerSecret = result.RunnerSecret - c.managerSecret = result.ManagerSecret + c.apiKey = registrationToken // Store the API key for future use - return result.ID, result.RunnerSecret, result.ManagerSecret, nil + return result.ID, registrationToken, "", nil // Return API key as "runner secret" for compatibility } // doSignedRequest performs an authenticated HTTP request using shared secret @@ -476,7 +557,7 @@ func (c *Client) doSignedRequestLong(method, path string, body []byte, queryPara // doSignedRequestWithClient performs an authenticated HTTP request using the specified client func (c *Client) doSignedRequestWithClient(method, path string, body []byte, client *http.Client, queryParams ...string) (*http.Response, error) { - if c.runnerSecret == "" { + if c.apiKey == "" { return nil, fmt.Errorf("runner not authenticated") } @@ -491,15 +572,18 @@ func (c *Client) doSignedRequestWithClient(method, path string, body []byte, cli return nil, err } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("X-Runner-Secret", c.runnerSecret) + // Add authentication - use API key in Authorization header + req.Header.Set("Authorization", "Bearer "+c.apiKey) + if len(body) > 0 { + req.Header.Set("Content-Type", "application/json") + } return client.Do(req) } // ConnectWebSocket establishes a WebSocket connection to the manager func (c *Client) ConnectWebSocket() error { - if c.runnerID == 0 || c.runnerSecret == "" { + if c.runnerID == 0 || c.apiKey == "" { return fmt.Errorf("runner not authenticated") } @@ -509,8 +593,8 @@ func (c *Client) ConnectWebSocket() error { // Convert HTTP URL to WebSocket URL wsURL := strings.Replace(c.managerURL, "http://", "ws://", 1) wsURL = strings.Replace(wsURL, "https://", "wss://", 1) - wsURL = fmt.Sprintf("%s%s?runner_id=%d&secret=%s", - wsURL, path, c.runnerID, url.QueryEscape(c.runnerSecret)) + wsURL = fmt.Sprintf("%s%s?runner_id=%d&api_key=%s", + wsURL, path, c.runnerID, url.QueryEscape(c.apiKey)) // Parse URL u, err := url.Parse(wsURL) @@ -868,6 +952,44 @@ func (c *Client) KillAllProcesses() { log.Printf("Killed %d process(es)", killedCount) } +// CleanupWorkspace removes the runner's workspace directory and all contents +func (c *Client) CleanupWorkspace() { + log.Printf("DEBUG: CleanupWorkspace method called") + log.Printf("CleanupWorkspace called, workspaceDir: %s", c.workspaceDir) + if c.workspaceDir != "" { + log.Printf("Cleaning up workspace directory: %s", c.workspaceDir) + if err := os.RemoveAll(c.workspaceDir); err != nil { + log.Printf("Warning: Failed to remove workspace directory %s: %v", c.workspaceDir, err) + } else { + log.Printf("Successfully removed workspace directory: %s", c.workspaceDir) + } + } + + // Also clean up any orphaned jiggablend directories that might exist + // This ensures zero persistence even if workspaceDir wasn't set + cleanupOrphanedWorkspaces() +} + +// cleanupOrphanedWorkspaces removes any jiggablend workspace directories +// that might be left behind from previous runs or crashes +func cleanupOrphanedWorkspaces() { + log.Printf("Cleaning up orphaned jiggablend workspace directories...") + + // Clean up jiggablend-workspaces directories in current and temp directories + dirsToCheck := []string{".", os.TempDir()} + for _, baseDir := range dirsToCheck { + workspaceDir := filepath.Join(baseDir, "jiggablend-workspaces") + if _, err := os.Stat(workspaceDir); err == nil { + log.Printf("Removing orphaned workspace directory: %s", workspaceDir) + if err := os.RemoveAll(workspaceDir); err != nil { + log.Printf("Warning: Failed to remove workspace directory %s: %v", workspaceDir, err) + } else { + log.Printf("Successfully removed workspace directory: %s", workspaceDir) + } + } + } +} + // sendStepUpdate sends a step start/complete event to the manager func (c *Client) sendStepUpdate(taskID int64, stepName string, status types.StepStatus, errorMsg string) { key := fmt.Sprintf("%d:%s", taskID, stepName) @@ -955,7 +1077,7 @@ func (c *Client) sendStepUpdate(taskID int64, stepName string, status types.Step } // processTask processes a single task -func (c *Client) processTask(task map[string]interface{}, jobName string, outputFormat string, inputFiles []interface{}) error { +func (c *Client) processTask(task map[string]interface{}, jobName string, outputFormat string, inputFiles []interface{}) (err error) { _ = jobName taskID := int64(task["id"].(float64)) @@ -963,15 +1085,29 @@ func (c *Client) processTask(task map[string]interface{}, jobName string, output frameStart := int(task["frame_start"].(float64)) frameEnd := int(task["frame_end"].(float64)) - c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting task: job %d, frames %d-%d, format: %s", jobID, frameStart, frameEnd, outputFormat), "") - log.Printf("Processing task %d: job %d, frames %d-%d, format: %s (from task assignment)", taskID, jobID, frameStart, frameEnd, outputFormat) - // Create temporary job workspace within runner workspace workDir := filepath.Join(c.getWorkspaceDir(), fmt.Sprintf("job-%d-task-%d", jobID, taskID)) - if err := os.MkdirAll(workDir, 0755); err != nil { - return fmt.Errorf("failed to create work directory: %w", err) + if mkdirErr := os.MkdirAll(workDir, 0755); mkdirErr != nil { + return fmt.Errorf("failed to create work directory: %w", mkdirErr) } - defer os.RemoveAll(workDir) + + // Guaranteed cleanup even on panic + defer func() { + if cleanupErr := os.RemoveAll(workDir); cleanupErr != nil { + log.Printf("Warning: Failed to cleanup work directory %s: %v", workDir, cleanupErr) + } + }() + + // Panic recovery for this task + defer func() { + if r := recover(); r != nil { + log.Printf("Task %d panicked: %v", taskID, r) + err = fmt.Errorf("task panicked: %v", r) + } + }() + + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting task: job %d, frames %d-%d, format: %s", jobID, frameStart, frameEnd, outputFormat), "") + log.Printf("Processing task %d: job %d, frames %d-%d, format: %s (from task assignment)", taskID, jobID, frameStart, frameEnd, outputFormat) // Step: download c.sendStepUpdate(taskID, "download", types.StepStatusRunning, "") @@ -996,7 +1132,7 @@ func (c *Client) processTask(task map[string]interface{}, jobName string, output // Find .blend file in extracted contents blendFile := "" - err := filepath.Walk(workDir, func(path string, info os.FileInfo, err error) error { + err = filepath.Walk(workDir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } @@ -1032,7 +1168,7 @@ func (c *Client) processTask(task map[string]interface{}, jobName string, output } if blendFile == "" { - err := fmt.Errorf("no .blend file found in context") + err := fmt.Errorf("no .blend file found in context - the uploaded context archive must contain at least one .blend file to render") c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error()) return err } @@ -1062,10 +1198,6 @@ func (c *Client) processTask(task map[string]interface{}, jobName string, output renderFormat = "EXR" // Use EXR for maximum quality (32-bit float, HDR) } - // Blender uses # characters for frame number placeholders (not %04d) - // Use #### for 4-digit zero-padded frame numbers - outputPattern := filepath.Join(outputDir, fmt.Sprintf("frame_####.%s", strings.ToLower(renderFormat))) - // Step: render_blender c.sendStepUpdate(taskID, "render_blender", types.StepStatusRunning, "") if frameStart == frameEnd { @@ -1074,14 +1206,8 @@ func (c *Client) processTask(task map[string]interface{}, jobName string, output c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting Blender render for frames %d-%d...", frameStart, frameEnd), "render_blender") } - // Execute Blender - use absolute path for output pattern - absOutputPattern, err := filepath.Abs(outputPattern) - if err != nil { - errMsg := fmt.Sprintf("failed to get absolute path for output: %v", err) - c.sendLog(taskID, types.LogLevelError, errMsg, "render_blender") - c.sendStepUpdate(taskID, "render_blender", types.StepStatusFailed, errMsg) - return errors.New(errMsg) - } + // Always render frames individually for precise control over file naming + // This avoids Blender's automatic frame numbering quirks // Override output format and render settings from job submission // For MP4, we render as EXR (handled above) for highest fidelity, so renderFormat is already EXR @@ -1151,23 +1277,50 @@ func (c *Client) processTask(task map[string]interface{}, jobName string, output if enableExecution { args = append(args, "--enable-autoexec") } - if frameStart == frameEnd { - // Single frame - args = append(args, "-o", absOutputPattern, "-f", fmt.Sprintf("%d", frameStart)) - cmd = exec.Command("blender", args...) - } else { - // Frame range - args = append(args, "-o", absOutputPattern, - "-s", fmt.Sprintf("%d", frameStart), - "-e", fmt.Sprintf("%d", frameEnd), - "-a") // -a renders animation (all frames in range) - cmd = exec.Command("blender", args...) - } - cmd.Dir = workDir + // Always render frames individually for precise control over file naming + // This avoids Blender's automatic frame numbering quirks + for frame := frameStart; frame <= frameEnd; frame++ { + // Create temp output pattern for this frame + tempPattern := filepath.Join(outputDir, fmt.Sprintf("temp_frame.%s", strings.ToLower(renderFormat))) + tempAbsPattern, _ := filepath.Abs(tempPattern) - // Set environment variables for headless rendering - // This helps ensure proper OpenGL context initialization, especially for EEVEE - cmd.Env = os.Environ() + // Build args for this specific frame + frameArgs := []string{"-b", blendFile, "--python", scriptPath} + if enableExecution { + frameArgs = append(frameArgs, "--enable-autoexec") + } + frameArgs = append(frameArgs, "-o", tempAbsPattern, "-f", fmt.Sprintf("%d", frame)) + + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Rendering frame %d...", frame), "render_blender") + + frameCmd := exec.Command("blender", frameArgs...) + frameCmd.Dir = workDir + frameCmd.Env = os.Environ() + + // Run this frame + if output, err := frameCmd.CombinedOutput(); err != nil { + errMsg := fmt.Sprintf("blender failed on frame %d: %v (output: %s)", frame, err, string(output)) + c.sendLog(taskID, types.LogLevelError, errMsg, "render_blender") + return errors.New(errMsg) + } + + // Immediately rename the temp file to the proper frame-numbered name + finalName := fmt.Sprintf("frame_%04d.%s", frame, strings.ToLower(renderFormat)) + finalPath := filepath.Join(outputDir, finalName) + tempPath := filepath.Join(outputDir, fmt.Sprintf("temp_frame.%s", strings.ToLower(renderFormat))) + + if err := os.Rename(tempPath, finalPath); err != nil { + errMsg := fmt.Sprintf("failed to rename temp file for frame %d: %v", frame, err) + c.sendLog(taskID, types.LogLevelError, errMsg, "render_blender") + return errors.New(errMsg) + } + + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Completed frame %d -> %s", frame, finalName), "render_blender") + } + + // Skip the rest of the function since we handled all frames above + c.sendStepUpdate(taskID, "render_blender", types.StepStatusCompleted, "") + return nil // Blender will handle headless rendering automatically // We preserve the environment to allow GPU access if available @@ -1249,6 +1402,10 @@ func (c *Client) processTask(task map[string]interface{}, jobName string, output return errors.New(errMsg) } + // For frame ranges, we rendered each frame individually with temp naming + // The files are already properly named during the individual frame rendering + // No additional renaming needed + // Find rendered output file(s) // For frame ranges, we'll find all frames in the upload step // For single frames, we need to find the specific output file @@ -1454,9 +1611,30 @@ func (c *Client) processTask(task map[string]interface{}, jobName string, output } // processVideoGenerationTask processes a video generation task -func (c *Client) processVideoGenerationTask(task map[string]interface{}, jobID int64) error { +func (c *Client) processVideoGenerationTask(task map[string]interface{}, jobID int64) (err error) { taskID := int64(task["id"].(float64)) + // Create temporary job workspace for video generation within runner workspace + workDir := filepath.Join(c.getWorkspaceDir(), fmt.Sprintf("job-%d-video", jobID)) + if mkdirErr := os.MkdirAll(workDir, 0755); mkdirErr != nil { + return fmt.Errorf("failed to create work directory: %w", mkdirErr) + } + + // Guaranteed cleanup even on panic + defer func() { + if cleanupErr := os.RemoveAll(workDir); cleanupErr != nil { + log.Printf("Warning: Failed to cleanup work directory %s: %v", workDir, cleanupErr) + } + }() + + // Panic recovery for this task + defer func() { + if r := recover(); r != nil { + log.Printf("Video generation task %d panicked: %v", taskID, r) + err = fmt.Errorf("video generation task panicked: %v", r) + } + }() + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting video generation task: job %d", jobID), "") log.Printf("Processing video generation task %d for job %d", taskID, jobID) @@ -1474,6 +1652,16 @@ func (c *Client) processVideoGenerationTask(task map[string]interface{}, jobID i } } + // Debug logging for output format detection + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Video generation: detected output format '%s'", outputFormat), "generate_video") + + // Get frame rate from render settings + var frameRate float64 = 24.0 // Default fallback + if err == nil && jobMetadata != nil && jobMetadata.RenderSettings.FrameRate > 0 { + frameRate = jobMetadata.RenderSettings.FrameRate + } + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Video generation: using frame rate %.2f fps", frameRate), "generate_video") + // Get all output files for this job files, err := c.getJobFiles(jobID) if err != nil { @@ -1507,14 +1695,6 @@ func (c *Client) processVideoGenerationTask(task map[string]interface{}, jobID i c.sendStepUpdate(taskID, "download_frames", types.StepStatusRunning, "") c.sendLog(taskID, types.LogLevelInfo, "Downloading EXR frames...", "download_frames") - // Create temporary job workspace for video generation within runner workspace - workDir := filepath.Join(c.getWorkspaceDir(), fmt.Sprintf("job-%d-video", jobID)) - if err := os.MkdirAll(workDir, 0755); err != nil { - c.sendStepUpdate(taskID, "download_frames", types.StepStatusFailed, err.Error()) - return fmt.Errorf("failed to create work directory: %w", err) - } - defer os.RemoveAll(workDir) - // Download all EXR frames var frameFiles []string for _, file := range exrFiles { @@ -1568,8 +1748,8 @@ func (c *Client) processVideoGenerationTask(task map[string]interface{}, jobID i // Extract frame number pattern (e.g., frame_2470.exr -> frame_%04d.exr) baseName := filepath.Base(firstFrame) // Find the numeric part and replace it with %04d pattern - // Use regex to find digits after underscore and before extension - re := regexp.MustCompile(`_(\d+)\.`) + // Use regex to find digits (including negative) after underscore and before extension + re := regexp.MustCompile(`_(-?\d+)\.`) var pattern string var startNumber int frameNumStr := re.FindStringSubmatch(baseName) @@ -1637,31 +1817,158 @@ func (c *Client) processVideoGenerationTask(task map[string]interface{}, jobID i vf = "zscale=t=linear:npl=100,format=gbrpf32le,zscale=p=bt709,tonemap=tonemap=hable:desat=0,zscale=t=bt709:m=bt709:r=tv,format=yuv420p" } - cmd = exec.Command("ffmpeg", "-y", "-start_number", fmt.Sprintf("%d", startNumber), - "-framerate", "24", "-i", patternPath, + // Build ffmpeg command with high-quality EXR input processing + cmd = exec.Command("ffmpeg", "-y", + "-f", "image2", // Force image sequence input format + "-start_number", fmt.Sprintf("%d", startNumber), + "-framerate", fmt.Sprintf("%.2f", frameRate), + "-i", patternPath, "-vf", vf, - "-c:v", codec, "-pix_fmt", pixFmt, "-r", "24", outputMP4) + "-c:v", codec, "-pix_fmt", pixFmt, + "-r", fmt.Sprintf("%.2f", frameRate), + "-color_primaries", "bt709", // Ensure proper color primaries + "-color_trc", "bt709", // Ensure proper transfer characteristics + "-colorspace", "bt709", // Ensure proper color space + outputMP4) + // Prepare codec-specific arguments + var codecArgs []string if outputFormat == "EXR_AV1_MP4" { - // AV1 encoding options for quality - cmd.Args = append(cmd.Args, "-cpu-used", "4", "-crf", "30", "-b:v", "0") + // AV1 encoding options for maximum quality + codecArgs = []string{"-cpu-used", "1", "-crf", "15", "-b:v", "0", "-row-mt", "1", "-tiles", "4x4", "-lag-in-frames", "25", "-arnr-max-frames", "15", "-arnr-strength", "4"} + } else { + // H.264 encoding options for maximum quality + codecArgs = []string{"-preset", "veryslow", "-crf", "15", "-profile:v", "high", "-level", "5.2", "-tune", "film", "-keyint_min", "24", "-g", "240", "-bf", "2", "-refs", "4"} } - } - cmd.Dir = workDir - output, err := cmd.CombinedOutput() - if err != nil { - outputStr := string(output) + // Perform 2-pass encoding for optimal quality distribution + c.sendLog(taskID, types.LogLevelInfo, "Starting 2-pass video encoding for optimal quality...", "generate_video") + + // PASS 1: Analysis pass (collects statistics for better rate distribution) + c.sendLog(taskID, types.LogLevelInfo, "Pass 1/2: Analyzing video content for optimal encoding...", "generate_video") + pass1Args := append([]string{"-y", "-f", "image2", "-start_number", fmt.Sprintf("%d", startNumber), "-framerate", fmt.Sprintf("%.2f", frameRate), "-i", patternPath, "-vf", vf, "-c:v", codec, "-pix_fmt", pixFmt, "-r", fmt.Sprintf("%.2f", frameRate), "-color_primaries", "bt709", "-color_trc", "bt709", "-colorspace", "bt709"}, codecArgs...) + pass1Args = append(pass1Args, "-pass", "1", "-f", "null", "/dev/null") + + pass1Cmd := exec.Command("ffmpeg", pass1Args...) + pass1Cmd.Dir = workDir + pass1Err := pass1Cmd.Run() + if pass1Err != nil { + c.sendLog(taskID, types.LogLevelWarn, fmt.Sprintf("Pass 1 completed (warnings expected): %v", pass1Err), "generate_video") + } + + // PASS 2: Encoding pass (uses statistics from pass 1 for optimal quality) + c.sendLog(taskID, types.LogLevelInfo, "Pass 2/2: Encoding video with optimal quality distribution...", "generate_video") + cmd = exec.Command("ffmpeg", "-y", "-f", "image2", "-start_number", fmt.Sprintf("%d", startNumber), "-framerate", fmt.Sprintf("%.2f", frameRate), "-i", patternPath, "-vf", vf, "-c:v", codec, "-pix_fmt", pixFmt, "-r", fmt.Sprintf("%.2f", frameRate), "-color_primaries", "bt709", "-color_trc", "bt709", "-colorspace", "bt709") + cmd.Args = append(cmd.Args, codecArgs...) + cmd.Args = append(cmd.Args, "-pass", "2", outputMP4) + } + + // Create stdout and stderr pipes for streaming + stdoutPipe, err := cmd.StdoutPipe() + if err != nil { + errMsg := fmt.Sprintf("failed to create ffmpeg stdout pipe: %v", err) + c.sendLog(taskID, types.LogLevelError, errMsg, "generate_video") + c.sendStepUpdate(taskID, "generate_video", types.StepStatusFailed, errMsg) + return errors.New(errMsg) + } + + stderrPipe, err := cmd.StderrPipe() + if err != nil { + errMsg := fmt.Sprintf("failed to create ffmpeg stderr pipe: %v", err) + c.sendLog(taskID, types.LogLevelError, errMsg, "generate_video") + c.sendStepUpdate(taskID, "generate_video", types.StepStatusFailed, errMsg) + return errors.New(errMsg) + } + + cmd.Dir = workDir + + // Start the command + if err := cmd.Start(); err != nil { + errMsg := fmt.Sprintf("failed to start ffmpeg: %v", err) + c.sendLog(taskID, types.LogLevelError, errMsg, "generate_video") + c.sendStepUpdate(taskID, "generate_video", types.StepStatusFailed, errMsg) + return errors.New(errMsg) + } + + // Register process for cleanup on shutdown + c.runningProcs.Store(taskID, cmd) + defer c.runningProcs.Delete(taskID) + + // Stream stdout line by line + stdoutDone := make(chan bool) + go func() { + defer close(stdoutDone) + scanner := bufio.NewScanner(stdoutPipe) + for scanner.Scan() { + line := scanner.Text() + if line != "" { + // Filter out common ffmpeg informational messages that aren't useful + if !strings.Contains(line, "Input #") && + !strings.Contains(line, "Duration:") && + !strings.Contains(line, "Stream mapping:") && + !strings.Contains(line, "Output #") && + !strings.Contains(line, "encoder") && + !strings.Contains(line, "fps=") && + !strings.Contains(line, "size=") && + !strings.Contains(line, "time=") && + !strings.Contains(line, "bitrate=") && + !strings.Contains(line, "speed=") { + c.sendLog(taskID, types.LogLevelInfo, line, "generate_video") + } + } + } + }() + + // Stream stderr line by line + stderrDone := make(chan bool) + go func() { + defer close(stderrDone) + scanner := bufio.NewScanner(stderrPipe) + for scanner.Scan() { + line := scanner.Text() + if line != "" { + // Filter out common ffmpeg informational messages and show only warnings/errors + if strings.Contains(line, "error") || + strings.Contains(line, "Error") || + strings.Contains(line, "failed") || + strings.Contains(line, "Failed") || + strings.Contains(line, "warning") || + strings.Contains(line, "Warning") { + c.sendLog(taskID, types.LogLevelWarn, line, "generate_video") + } else if !strings.Contains(line, "Input #") && + !strings.Contains(line, "Duration:") && + !strings.Contains(line, "Stream mapping:") && + !strings.Contains(line, "Output #") && + !strings.Contains(line, "encoder") && + !strings.Contains(line, "fps=") && + !strings.Contains(line, "size=") && + !strings.Contains(line, "time=") && + !strings.Contains(line, "bitrate=") && + !strings.Contains(line, "speed=") { + c.sendLog(taskID, types.LogLevelInfo, line, "generate_video") + } + } + } + }() + + // Wait for command to complete + err = cmd.Wait() + + // Wait for streaming goroutines to finish + <-stdoutDone + <-stderrDone + + if err != nil { // Check for size-related errors and provide helpful messages - if sizeErr := c.checkFFmpegSizeError(outputStr); sizeErr != nil { + if sizeErr := c.checkFFmpegSizeError("ffmpeg encoding failed"); sizeErr != nil { c.sendLog(taskID, types.LogLevelError, sizeErr.Error(), "generate_video") c.sendStepUpdate(taskID, "generate_video", types.StepStatusFailed, sizeErr.Error()) return sizeErr } // Try alternative method with concat demuxer - log.Printf("First ffmpeg attempt failed, trying concat method: %s", outputStr) - err = c.generateMP4WithConcat(frameFiles, outputMP4, workDir, allocatedDevice, outputFormat, codec, pixFmt, useAlpha, useHardware) + c.sendLog(taskID, types.LogLevelWarn, "Primary ffmpeg encoding failed, trying concat method...", "generate_video") + err = c.generateMP4WithConcat(frameFiles, outputMP4, workDir, allocatedDevice, outputFormat, codec, pixFmt, useAlpha, useHardware, frameRate) if err != nil { // Check for size errors in concat method too if sizeErr := c.checkFFmpegSizeError(err.Error()); sizeErr != nil { @@ -1681,8 +1988,12 @@ func (c *Client) processVideoGenerationTask(task map[string]interface{}, jobID i return err } + // Clean up 2-pass log files + _ = os.Remove(filepath.Join(workDir, "ffmpeg2pass-0.log")) + _ = os.Remove(filepath.Join(workDir, "ffmpeg2pass-0.log.mbtree")) + c.sendStepUpdate(taskID, "generate_video", types.StepStatusCompleted, "") - c.sendLog(taskID, types.LogLevelInfo, "MP4 video generated successfully", "generate_video") + c.sendLog(taskID, types.LogLevelInfo, "MP4 video generated with 2-pass encoding successfully", "generate_video") // Step: upload_video c.sendStepUpdate(taskID, "upload_video", types.StepStatusRunning, "") @@ -1779,7 +2090,7 @@ func (c *Client) buildFFmpegCommand(device string, args ...string) (*exec.Cmd, e } // No hardware acceleration available - return nil, fmt.Errorf("no hardware encoder available") + return nil, fmt.Errorf("no hardware encoder available for video encoding - falling back to software encoding which may be slower") } // buildFFmpegCommandAV1 builds an ffmpeg command with AV1 hardware acceleration if available @@ -1881,7 +2192,7 @@ func (c *Client) buildFFmpegCommandAV1(device string, useAlpha bool, args ...str } // No AV1 hardware acceleration available - return nil, fmt.Errorf("no AV1 hardware encoder available") + return nil, fmt.Errorf("no AV1 hardware encoder available - falling back to software AV1 encoding which may be slower") } // probeAllHardwareAccelerators probes ffmpeg for all available hardware acceleration methods @@ -2460,7 +2771,7 @@ func (c *Client) testGenericEncoder(encoder string) bool { // generateMP4WithConcat uses ffmpeg concat demuxer as fallback // device parameter is optional - if provided, it will be used for VAAPI encoding -func (c *Client) generateMP4WithConcat(frameFiles []string, outputMP4, workDir string, device string, outputFormat string, codec string, pixFmt string, useAlpha bool, useHardware bool) error { +func (c *Client) generateMP4WithConcat(frameFiles []string, outputMP4, workDir string, device string, outputFormat string, codec string, pixFmt string, useAlpha bool, useHardware bool, frameRate float64) error { // Create file list for ffmpeg concat demuxer listFile := filepath.Join(workDir, "frames.txt") listFileHandle, err := os.Create(listFile) @@ -2509,30 +2820,108 @@ func (c *Client) generateMP4WithConcat(frameFiles []string, outputMP4, workDir s } if !useHardware { - // Software encoding with HDR tonemapping - cmd = exec.Command("ffmpeg", "-f", "concat", "-safe", "0", "-i", listFile, - "-vf", vf, - "-c:v", codec, "-pix_fmt", pixFmt, "-r", "24", "-y", outputMP4) - + // Software encoding with HDR tonemapping - 2-pass for optimal quality + var codecArgs []string if outputFormat == "EXR_AV1_MP4" { - // AV1 encoding options for quality - cmd.Args = append(cmd.Args, "-cpu-used", "4", "-crf", "30", "-b:v", "0") + codecArgs = []string{"-cpu-used", "1", "-crf", "15", "-b:v", "0", "-row-mt", "1", "-tiles", "4x4", "-lag-in-frames", "25", "-arnr-max-frames", "15", "-arnr-strength", "4"} + } else { + codecArgs = []string{"-preset", "veryslow", "-crf", "15", "-profile:v", "high", "-level", "5.2", "-tune", "film", "-keyint_min", "24", "-g", "240", "-bf", "2", "-refs", "4"} } + + // PASS 1: Analysis pass + pass1Args := append([]string{"-f", "concat", "-safe", "0", "-i", listFile, "-vf", vf, "-c:v", codec, "-pix_fmt", pixFmt, "-r", fmt.Sprintf("%.2f", frameRate)}, codecArgs...) + pass1Args = append(pass1Args, "-pass", "1", "-f", "null", "/dev/null") + pass1Cmd := exec.Command("ffmpeg", pass1Args...) + pass1Cmd.Dir = workDir + _ = pass1Cmd.Run() // Ignore errors for pass 1 + + // PASS 2: Encoding pass + cmd = exec.Command("ffmpeg", "-f", "concat", "-safe", "0", "-i", listFile, "-vf", vf, "-c:v", codec, "-pix_fmt", pixFmt, "-r", fmt.Sprintf("%.2f", frameRate)) + cmd.Args = append(cmd.Args, codecArgs...) + cmd.Args = append(cmd.Args, "-pass", "2", "-y", outputMP4) } - output, err := cmd.CombinedOutput() + + // Create stdout and stderr pipes for streaming + stdoutPipe, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("failed to create ffmpeg stdout pipe: %w", err) + } + + stderrPipe, err := cmd.StderrPipe() + if err != nil { + return fmt.Errorf("failed to create ffmpeg stderr pipe: %w", err) + } + + cmd.Dir = workDir + + // Start the command + if err := cmd.Start(); err != nil { + return fmt.Errorf("failed to start ffmpeg: %w", err) + } + + // Stream stdout line by line (minimal logging for concat method) + stdoutDone := make(chan bool) + go func() { + defer close(stdoutDone) + scanner := bufio.NewScanner(stdoutPipe) + for scanner.Scan() { + line := scanner.Text() + if line != "" { + // Only log actual errors/warnings for concat method + if strings.Contains(line, "error") || + strings.Contains(line, "Error") || + strings.Contains(line, "failed") || + strings.Contains(line, "Failed") { + log.Printf("FFmpeg concat stdout: %s", line) + } + } + } + }() + + // Stream stderr line by line + stderrDone := make(chan bool) + go func() { + defer close(stderrDone) + scanner := bufio.NewScanner(stderrPipe) + for scanner.Scan() { + line := scanner.Text() + if line != "" { + // Log warnings and errors for concat method + if strings.Contains(line, "error") || + strings.Contains(line, "Error") || + strings.Contains(line, "failed") || + strings.Contains(line, "Failed") || + strings.Contains(line, "warning") || + strings.Contains(line, "Warning") { + log.Printf("FFmpeg concat stderr: %s", line) + } + } + } + }() + + // Wait for command to complete + err = cmd.Wait() + + // Wait for streaming goroutines to finish + <-stdoutDone + <-stderrDone + if err != nil { - outputStr := string(output) // Check for size-related errors - if sizeErr := c.checkFFmpegSizeError(outputStr); sizeErr != nil { + if sizeErr := c.checkFFmpegSizeError("ffmpeg concat failed"); sizeErr != nil { return sizeErr } - return fmt.Errorf("ffmpeg concat failed: %w\nOutput: %s", err, outputStr) + return fmt.Errorf("ffmpeg concat failed: %w", err) } if _, err := os.Stat(outputMP4); os.IsNotExist(err) { return fmt.Errorf("MP4 file not created: %s", outputMP4) } + // Clean up 2-pass log files + _ = os.Remove(filepath.Join(workDir, "ffmpeg2pass-0.log")) + _ = os.Remove(filepath.Join(workDir, "ffmpeg2pass-0.log.mbtree")) + return nil } @@ -2774,7 +3163,7 @@ func (c *Client) uploadFile(jobID int64, filePath string) (string, error) { } req.Header.Set("Content-Type", formWriter.FormDataContentType()) - req.Header.Set("X-Runner-Secret", c.runnerSecret) + req.Header.Set("Authorization", "Bearer "+c.apiKey) // Use long-running client for file uploads (no timeout) resp, err := c.longRunningClient.Do(req) @@ -2996,18 +3385,32 @@ func (c *Client) cleanupExpiredContextCache() { } // processMetadataTask processes a metadata extraction task -func (c *Client) processMetadataTask(task map[string]interface{}, jobID int64, inputFiles []interface{}) error { +func (c *Client) processMetadataTask(task map[string]interface{}, jobID int64, inputFiles []interface{}) (err error) { taskID := int64(task["id"].(float64)) - c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting metadata extraction task: job %d", jobID), "") - log.Printf("Processing metadata extraction task %d for job %d", taskID, jobID) - // Create temporary job workspace for metadata extraction within runner workspace workDir := filepath.Join(c.getWorkspaceDir(), fmt.Sprintf("job-%d-metadata-%d", jobID, taskID)) - if err := os.MkdirAll(workDir, 0755); err != nil { - return fmt.Errorf("failed to create work directory: %w", err) + if mkdirErr := os.MkdirAll(workDir, 0755); mkdirErr != nil { + return fmt.Errorf("failed to create work directory: %w", mkdirErr) } - defer os.RemoveAll(workDir) + + // Guaranteed cleanup even on panic + defer func() { + if cleanupErr := os.RemoveAll(workDir); cleanupErr != nil { + log.Printf("Warning: Failed to cleanup work directory %s: %v", workDir, cleanupErr) + } + }() + + // Panic recovery for this task + defer func() { + if r := recover(); r != nil { + log.Printf("Metadata extraction task %d panicked: %v", taskID, r) + err = fmt.Errorf("metadata extraction task panicked: %v", r) + } + }() + + c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting metadata extraction task: job %d", jobID), "") + log.Printf("Processing metadata extraction task %d for job %d", taskID, jobID) // Step: download c.sendStepUpdate(taskID, "download", types.StepStatusRunning, "") @@ -3029,7 +3432,7 @@ func (c *Client) processMetadataTask(task map[string]interface{}, jobID int64, i // Find .blend file in extracted contents blendFile := "" - err := filepath.Walk(workDir, func(path string, info os.FileInfo, err error) error { + err = filepath.Walk(workDir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } @@ -3065,7 +3468,7 @@ func (c *Client) processMetadataTask(task map[string]interface{}, jobID int64, i } if blendFile == "" { - err := fmt.Errorf("no .blend file found in context") + err := fmt.Errorf("no .blend file found in context - the uploaded context archive must contain at least one .blend file to render") c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error()) return err } @@ -3406,7 +3809,7 @@ func (c *Client) submitMetadata(jobID int64, metadata types.BlendMetadata) error } req.Header.Set("Content-Type", "application/json") - req.Header.Set("X-Runner-Secret", c.runnerSecret) + req.Header.Set("Authorization", "Bearer "+c.apiKey) resp, err := c.httpClient.Do(req) if err != nil { diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 2bfdb85..1b7fee7 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -527,7 +527,7 @@ func (s *Storage) CreateJobContextFromDir(sourceDir string, jobID int64, exclude } if blendFilesAtRoot == 0 { - return "", fmt.Errorf("no .blend file found at root level in context archive") + return "", fmt.Errorf("no .blend file found at root level in context archive - .blend files must be at the root level of the uploaded archive, not in subdirectories") } if blendFilesAtRoot > 1 { return "", fmt.Errorf("multiple .blend files found at root level in context archive (found %d, expected 1)", blendFilesAtRoot) diff --git a/pkg/scripts/scripts/extract_metadata.py b/pkg/scripts/scripts/extract_metadata.py index 8ac162e..d04775b 100644 --- a/pkg/scripts/scripts/extract_metadata.py +++ b/pkg/scripts/scripts/extract_metadata.py @@ -76,6 +76,7 @@ if animation_start is not None and animation_end is not None: render = scene.render resolution_x = render.resolution_x resolution_y = render.resolution_y +frame_rate = render.fps / render.fps_base if render.fps_base != 0 else render.fps engine = scene.render.engine.upper() # Determine output format from file format @@ -155,6 +156,7 @@ metadata = { "render_settings": { "resolution_x": resolution_x, "resolution_y": resolution_y, + "frame_rate": frame_rate, "output_format": output_format, "engine": engine.lower(), "engine_settings": engine_settings diff --git a/pkg/types/types.go b/pkg/types/types.go index f699b10..d46e11c 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -249,6 +249,7 @@ type MissingFilesInfo struct { type RenderSettings struct { ResolutionX int `json:"resolution_x"` ResolutionY int `json:"resolution_y"` + FrameRate float64 `json:"frame_rate"` Samples int `json:"samples,omitempty"` // Deprecated, use EngineSettings OutputFormat string `json:"output_format"` Engine string `json:"engine"` diff --git a/web/src/components/AdminPanel.jsx b/web/src/components/AdminPanel.jsx index 510c41b..92603d5 100644 --- a/web/src/components/AdminPanel.jsx +++ b/web/src/components/AdminPanel.jsx @@ -4,20 +4,22 @@ import UserJobs from './UserJobs'; import PasswordChange from './PasswordChange'; export default function AdminPanel() { - const [activeSection, setActiveSection] = useState('tokens'); - const [tokens, setTokens] = useState([]); + const [activeSection, setActiveSection] = useState('api-keys'); + const [apiKeys, setApiKeys] = useState([]); const [runners, setRunners] = useState([]); const [users, setUsers] = useState([]); const [loading, setLoading] = useState(false); - const [newTokenExpires, setNewTokenExpires] = useState(24); - const [newToken, setNewToken] = useState(null); + const [newAPIKeyName, setNewAPIKeyName] = useState(''); + const [newAPIKeyDescription, setNewAPIKeyDescription] = useState(''); + const [newAPIKeyScope, setNewAPIKeyScope] = useState('user'); // Default to user scope + const [newAPIKey, setNewAPIKey] = useState(null); const [selectedUser, setSelectedUser] = useState(null); const [registrationEnabled, setRegistrationEnabled] = useState(true); const [passwordChangeUser, setPasswordChangeUser] = useState(null); useEffect(() => { - if (activeSection === 'tokens') { - loadTokens(); + if (activeSection === 'api-keys') { + loadAPIKeys(); } else if (activeSection === 'runners') { loadRunners(); } else if (activeSection === 'users') { @@ -27,15 +29,15 @@ export default function AdminPanel() { } }, [activeSection]); - const loadTokens = async () => { + const loadAPIKeys = async () => { setLoading(true); try { - const data = await admin.listTokens(); - setTokens(Array.isArray(data) ? data : []); + const data = await admin.listAPIKeys(); + setApiKeys(Array.isArray(data) ? data : []); } catch (error) { - console.error('Failed to load tokens:', error); - setTokens([]); - alert('Failed to load tokens'); + console.error('Failed to load API keys:', error); + setApiKeys([]); + alert('Failed to load API keys'); } finally { setLoading(false); } @@ -97,44 +99,55 @@ export default function AdminPanel() { } }; - const generateToken = async () => { + const generateAPIKey = async () => { + if (!newAPIKeyName.trim()) { + alert('API key name is required'); + return; + } + setLoading(true); try { - const data = await admin.generateToken(newTokenExpires); - setNewToken(data.token); - await loadTokens(); + const data = await admin.generateAPIKey(newAPIKeyName.trim(), newAPIKeyDescription.trim() || undefined, newAPIKeyScope); + setNewAPIKey(data); + setNewAPIKeyName(''); + setNewAPIKeyDescription(''); + setNewAPIKeyScope('user'); + await loadAPIKeys(); } catch (error) { - console.error('Failed to generate token:', error); - alert('Failed to generate token'); + console.error('Failed to generate API key:', error); + alert('Failed to generate API key'); } finally { setLoading(false); } }; - const revokeToken = async (tokenId) => { - if (!confirm('Are you sure you want to revoke this token?')) { + const revokeAPIKey = async (keyId) => { + if (!confirm('Are you sure you want to revoke this API key? Revoked keys cannot be used for new runner registrations.')) { return; } try { - await admin.revokeToken(tokenId); - await loadTokens(); + await admin.revokeAPIKey(keyId); + await loadAPIKeys(); } catch (error) { - console.error('Failed to revoke token:', error); - alert('Failed to revoke token'); + console.error('Failed to revoke API key:', error); + alert('Failed to revoke API key'); } }; - const verifyRunner = async (runnerId) => { + const deleteAPIKey = async (keyId) => { + if (!confirm('Are you sure you want to permanently delete this API key? This action cannot be undone.')) { + return; + } try { - await admin.verifyRunner(runnerId); - await loadRunners(); - alert('Runner verified'); + await admin.deleteAPIKey(keyId); + await loadAPIKeys(); } catch (error) { - console.error('Failed to verify runner:', error); - alert('Failed to verify runner'); + console.error('Failed to delete API key:', error); + alert('Failed to delete API key'); } }; + const deleteRunner = async (runnerId) => { if (!confirm('Are you sure you want to delete this runner?')) { return; @@ -153,12 +166,8 @@ export default function AdminPanel() { alert('Copied to clipboard!'); }; - const isTokenExpired = (expiresAt) => { - return new Date(expiresAt) < new Date(); - }; - - const isTokenUsed = (used) => { - return used; + const isAPIKeyActive = (isActive) => { + return isActive; }; return ( @@ -166,16 +175,16 @@ export default function AdminPanel() {
- {activeSection === 'tokens' && ( + {activeSection === 'api-keys' && (
-

Generate Registration Token

-
-
- - setNewTokenExpires(parseInt(e.target.value) || 24)} - className="w-32 px-3 py-2 bg-gray-900 border border-gray-600 rounded-lg text-gray-100 focus:ring-2 focus:ring-orange-500 focus:border-transparent" - /> +

Generate API Key

+
+
+
+ + setNewAPIKeyName(e.target.value)} + placeholder="e.g., production-runner-01" + className="w-full px-3 py-2 bg-gray-900 border border-gray-600 rounded-lg text-gray-100 focus:ring-2 focus:ring-orange-500 focus:border-transparent" + required + /> +
+
+ + setNewAPIKeyDescription(e.target.value)} + placeholder="Optional description" + className="w-full px-3 py-2 bg-gray-900 border border-gray-600 rounded-lg text-gray-100 focus:ring-2 focus:ring-orange-500 focus:border-transparent" + /> +
+
+ + +
+
+
+
-
- {newToken && ( + {newAPIKey && (
-

New Token Generated:

-
- - {newToken} - - +

New API Key Generated:

+
+
+ + {newAPIKey.key} + + +
+
+

Name: {newAPIKey.name}

+ {newAPIKey.description &&

Description: {newAPIKey.description}

} +
+

+ ⚠️ Save this API key securely. It will not be shown again. +

-

- Save this token securely. It will not be shown again. -

)}
-

Active Tokens

+

API Keys

{loading ? (
- ) : !tokens || tokens.length === 0 ? ( -

No tokens generated yet.

+ ) : !apiKeys || apiKeys.length === 0 ? ( +

No API keys generated yet.

) : (
+ + - @@ -297,46 +344,62 @@ export default function AdminPanel() { - {tokens.map((token) => { - const expired = isTokenExpired(token.expires_at); - const used = isTokenUsed(token.used); - return ( - - - + - - - + + + + + ); @@ -373,7 +436,7 @@ export default function AdminPanel() { Status -
- Token + Name + + Scope + + Key Prefix Status - Expires At - Created At
- - {token.token.substring(0, 16)}... - - - {expired ? ( - - Expired - - ) : used ? ( - - Used - - ) : ( - - Active - + {apiKeys.map((key) => { + return ( +
+
+
{key.name}
+ {key.description && ( +
{key.description}
)} -
- {new Date(token.expires_at).toLocaleString()} - - {new Date(token.created_at).toLocaleString()} - - {!used && !expired && ( + + + + {key.scope === 'manager' ? 'Manager' : 'User'} + + + + {key.key_prefix} + + + {!key.is_active ? ( + + Revoked + + ) : ( + + Active + + )} + + {new Date(key.created_at).toLocaleString()} + + {key.is_active && !expired && ( )} +
- Verified + API Key Priority @@ -411,16 +474,10 @@ export default function AdminPanel() { {isOnline ? 'Online' : 'Offline'} - - - {runner.verified ? 'Verified' : 'Unverified'} - + + + jk_r{runner.id % 10}_... + {runner.priority} @@ -446,15 +503,7 @@ export default function AdminPanel() { {new Date(runner.last_heartbeat).toLocaleString()} - {!runner.verified && ( - - )} +