diff --git a/internal/api/admin.go b/internal/api/admin.go index 98beec2..02dbf3e 100644 --- a/internal/api/admin.go +++ b/internal/api/admin.go @@ -7,8 +7,6 @@ import ( "net/http" "time" - "github.com/go-chi/chi/v5" - "fuego/internal/auth" "fuego/pkg/types" ) @@ -22,7 +20,7 @@ func (s *Server) handleGenerateRegistrationToken(w http.ResponseWriter, r *http. // Default expiration: 24 hours expiresIn := 24 * time.Hour - + var req struct { ExpiresInHours int `json:"expires_in_hours,omitempty"` } @@ -39,9 +37,9 @@ func (s *Server) handleGenerateRegistrationToken(w http.ResponseWriter, r *http. } s.respondJSON(w, http.StatusCreated, map[string]interface{}{ - "token": token, - "expires_in": expiresIn.String(), - "expires_at": time.Now().Add(expiresIn), + "token": token, + "expires_in": expiresIn.String(), + "expires_at": time.Now().Add(expiresIn), }) } @@ -169,4 +167,3 @@ func (s *Server) handleListRunnersAdmin(w http.ResponseWriter, r *http.Request) s.respondJSON(w, http.StatusOK, runners) } - diff --git a/internal/api/server.go b/internal/api/server.go index 6778706..a3b6613 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -10,35 +10,36 @@ import ( "sync" "time" + authpkg "fuego/internal/auth" + "fuego/internal/database" + "fuego/internal/storage" + "fuego/pkg/types" + "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/go-chi/cors" "github.com/gorilla/websocket" - "fuego/internal/auth" - "fuego/internal/database" - "fuego/internal/storage" - "fuego/pkg/types" ) // Server represents the API server type Server struct { db *database.DB - auth *auth.Auth - secrets *auth.Secrets + auth *authpkg.Auth + secrets *authpkg.Secrets storage *storage.Storage router *chi.Mux // WebSocket connections - wsUpgrader websocket.Upgrader - runnerConns map[int64]*websocket.Conn - runnerConnsMu sync.RWMutex - frontendConns map[string]*websocket.Conn // key: "jobId:taskId" + wsUpgrader websocket.Upgrader + runnerConns map[int64]*websocket.Conn + runnerConnsMu sync.RWMutex + frontendConns map[string]*websocket.Conn // key: "jobId:taskId" frontendConnsMu sync.RWMutex } // NewServer creates a new API server -func NewServer(db *database.DB, auth *auth.Auth, storage *storage.Storage) (*Server, error) { - secrets, err := auth.NewSecrets(db.DB) +func NewServer(db *database.DB, auth *authpkg.Auth, storage *storage.Storage) (*Server, error) { + secrets, err := authpkg.NewSecrets(db.DB) if err != nil { return nil, fmt.Errorf("failed to initialize secrets: %w", err) } @@ -142,10 +143,10 @@ func (s *Server) setupRoutes() { s.router.Route("/api/runner", func(r chi.Router) { // Registration doesn't require auth (uses token) r.Post("/register", s.handleRegisterRunner) - + // WebSocket endpoint (auth handled in handler) r.Get("/ws", s.handleRunnerWebSocket) - + // File operations still use HTTP (WebSocket not suitable for large files) r.Group(func(r chi.Router) { r.Use(func(next http.Handler) http.Handler { @@ -290,7 +291,7 @@ func (s *Server) handleGetMe(w http.ResponseWriter, r *http.Request) { // Helper to get user ID from context func getUserID(r *http.Request) (int64, error) { - userID, ok := auth.GetUserID(r.Context()) + userID, ok := authpkg.GetUserID(r.Context()) if !ok { return 0, fmt.Errorf("user ID not found in context") } @@ -436,7 +437,7 @@ func (s *Server) recoverStuckTasks() { // Check for task timeouts s.recoverTaskTimeouts() - + // Distribute newly recovered tasks s.distributeTasksToRunners() }() @@ -510,4 +511,3 @@ func (s *Server) recoverTaskTimeouts() { } } } - diff --git a/internal/runner/client.go b/internal/runner/client.go index 4588ee1..7c66f19 100644 --- a/internal/runner/client.go +++ b/internal/runner/client.go @@ -20,8 +20,9 @@ import ( "sync" "time" - "github.com/gorilla/websocket" "fuego/pkg/types" + + "github.com/gorilla/websocket" ) // Client represents a runner client @@ -35,7 +36,7 @@ type Client struct { runnerSecret string managerSecret string wsConn *websocket.Conn - wsConnMu sync.Mutex + wsConnMu sync.RWMutex stopChan chan struct{} } @@ -144,7 +145,7 @@ func (c *Client) ConnectWebSocket() error { h := hmac.New(sha256.New, []byte(c.runnerSecret)) h.Write([]byte(message)) signature := hex.EncodeToString(h.Sum(nil)) - + // Convert HTTP URL to WebSocket URL wsURL := strings.Replace(c.managerURL, "http://", "ws://", 1) wsURL = strings.Replace(wsURL, "https://", "wss://", 1) @@ -268,8 +269,8 @@ func (c *Client) handleTaskAssignment(msg map[string]interface{}) { // Convert to task map format taskMap := map[string]interface{}{ - "id": taskID, - "job_id": jobID, + "id": taskID, + "job_id": jobID, "frame_start": frameStart, "frame_end": frameEnd, } @@ -402,7 +403,7 @@ func (c *Client) processTask(task map[string]interface{}, jobName, outputFormat // Step: render_blender c.sendStepUpdate(taskID, "render_blender", types.StepStatusRunning, "") c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting Blender render for frame %d...", frameStart), "render_blender") - + // Execute Blender cmd := exec.Command("blender", "-b", blendFile, "-o", outputPattern, "-f", fmt.Sprintf("%d", frameStart)) cmd.Dir = workDir @@ -432,7 +433,7 @@ func (c *Client) processTask(task map[string]interface{}, jobName, outputFormat } c.sendStepUpdate(taskID, uploadStepName, types.StepStatusRunning, "") c.sendLog(taskID, types.LogLevelInfo, "Uploading output file...", uploadStepName) - + outputPath, err := c.uploadFile(jobID, outputFile) if err != nil { errMsg := fmt.Sprintf("failed to upload output: %w", err) @@ -446,7 +447,7 @@ func (c *Client) processTask(task map[string]interface{}, jobName, outputFormat // Step: complete c.sendStepUpdate(taskID, "complete", types.StepStatusRunning, "") c.sendLog(taskID, types.LogLevelInfo, "Task completed successfully", "complete") - + // Mark task as complete if err := c.completeTask(taskID, outputPath, true, ""); err != nil { c.sendStepUpdate(taskID, "complete", types.StepStatusFailed, err.Error()) @@ -731,7 +732,7 @@ func (c *Client) uploadFile(jobID int64, filePath string) (string, error) { // Create multipart form var buf bytes.Buffer formWriter := multipart.NewWriter(&buf) - + part, err := formWriter.CreateFormFile("file", filepath.Base(filePath)) if err != nil { return "", fmt.Errorf("failed to create form file: %w", err) @@ -813,4 +814,3 @@ func (c *Client) sendTaskComplete(taskID int64, outputPath string, success bool, } return fmt.Errorf("WebSocket not connected, cannot complete task") } -