From 27a09aedd62b4380893e55a485a3ec142f81e8ff Mon Sep 17 00:00:00 2001 From: Justin Harms Date: Sat, 22 Nov 2025 05:45:13 -0600 Subject: [PATCH] Refactor imports and clean up whitespace in API and runner client files. Update auth package references to use the new package structure. Improve mutex usage in the runner client for better concurrency handling. --- internal/api/admin.go | 11 ++++------- internal/api/server.go | 34 +++++++++++++++++----------------- internal/runner/client.go | 20 ++++++++++---------- 3 files changed, 31 insertions(+), 34 deletions(-) diff --git a/internal/api/admin.go b/internal/api/admin.go index 98beec2..02dbf3e 100644 --- a/internal/api/admin.go +++ b/internal/api/admin.go @@ -7,8 +7,6 @@ import ( "net/http" "time" - "github.com/go-chi/chi/v5" - "fuego/internal/auth" "fuego/pkg/types" ) @@ -22,7 +20,7 @@ func (s *Server) handleGenerateRegistrationToken(w http.ResponseWriter, r *http. // Default expiration: 24 hours expiresIn := 24 * time.Hour - + var req struct { ExpiresInHours int `json:"expires_in_hours,omitempty"` } @@ -39,9 +37,9 @@ func (s *Server) handleGenerateRegistrationToken(w http.ResponseWriter, r *http. } s.respondJSON(w, http.StatusCreated, map[string]interface{}{ - "token": token, - "expires_in": expiresIn.String(), - "expires_at": time.Now().Add(expiresIn), + "token": token, + "expires_in": expiresIn.String(), + "expires_at": time.Now().Add(expiresIn), }) } @@ -169,4 +167,3 @@ func (s *Server) handleListRunnersAdmin(w http.ResponseWriter, r *http.Request) s.respondJSON(w, http.StatusOK, runners) } - diff --git a/internal/api/server.go b/internal/api/server.go index 6778706..a3b6613 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -10,35 +10,36 @@ import ( "sync" "time" + authpkg "fuego/internal/auth" + "fuego/internal/database" + "fuego/internal/storage" + "fuego/pkg/types" + "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/go-chi/cors" "github.com/gorilla/websocket" - "fuego/internal/auth" - "fuego/internal/database" - "fuego/internal/storage" - "fuego/pkg/types" ) // Server represents the API server type Server struct { db *database.DB - auth *auth.Auth - secrets *auth.Secrets + auth *authpkg.Auth + secrets *authpkg.Secrets storage *storage.Storage router *chi.Mux // WebSocket connections - wsUpgrader websocket.Upgrader - runnerConns map[int64]*websocket.Conn - runnerConnsMu sync.RWMutex - frontendConns map[string]*websocket.Conn // key: "jobId:taskId" + wsUpgrader websocket.Upgrader + runnerConns map[int64]*websocket.Conn + runnerConnsMu sync.RWMutex + frontendConns map[string]*websocket.Conn // key: "jobId:taskId" frontendConnsMu sync.RWMutex } // NewServer creates a new API server -func NewServer(db *database.DB, auth *auth.Auth, storage *storage.Storage) (*Server, error) { - secrets, err := auth.NewSecrets(db.DB) +func NewServer(db *database.DB, auth *authpkg.Auth, storage *storage.Storage) (*Server, error) { + secrets, err := authpkg.NewSecrets(db.DB) if err != nil { return nil, fmt.Errorf("failed to initialize secrets: %w", err) } @@ -142,10 +143,10 @@ func (s *Server) setupRoutes() { s.router.Route("/api/runner", func(r chi.Router) { // Registration doesn't require auth (uses token) r.Post("/register", s.handleRegisterRunner) - + // WebSocket endpoint (auth handled in handler) r.Get("/ws", s.handleRunnerWebSocket) - + // File operations still use HTTP (WebSocket not suitable for large files) r.Group(func(r chi.Router) { r.Use(func(next http.Handler) http.Handler { @@ -290,7 +291,7 @@ func (s *Server) handleGetMe(w http.ResponseWriter, r *http.Request) { // Helper to get user ID from context func getUserID(r *http.Request) (int64, error) { - userID, ok := auth.GetUserID(r.Context()) + userID, ok := authpkg.GetUserID(r.Context()) if !ok { return 0, fmt.Errorf("user ID not found in context") } @@ -436,7 +437,7 @@ func (s *Server) recoverStuckTasks() { // Check for task timeouts s.recoverTaskTimeouts() - + // Distribute newly recovered tasks s.distributeTasksToRunners() }() @@ -510,4 +511,3 @@ func (s *Server) recoverTaskTimeouts() { } } } - diff --git a/internal/runner/client.go b/internal/runner/client.go index 4588ee1..7c66f19 100644 --- a/internal/runner/client.go +++ b/internal/runner/client.go @@ -20,8 +20,9 @@ import ( "sync" "time" - "github.com/gorilla/websocket" "fuego/pkg/types" + + "github.com/gorilla/websocket" ) // Client represents a runner client @@ -35,7 +36,7 @@ type Client struct { runnerSecret string managerSecret string wsConn *websocket.Conn - wsConnMu sync.Mutex + wsConnMu sync.RWMutex stopChan chan struct{} } @@ -144,7 +145,7 @@ func (c *Client) ConnectWebSocket() error { h := hmac.New(sha256.New, []byte(c.runnerSecret)) h.Write([]byte(message)) signature := hex.EncodeToString(h.Sum(nil)) - + // Convert HTTP URL to WebSocket URL wsURL := strings.Replace(c.managerURL, "http://", "ws://", 1) wsURL = strings.Replace(wsURL, "https://", "wss://", 1) @@ -268,8 +269,8 @@ func (c *Client) handleTaskAssignment(msg map[string]interface{}) { // Convert to task map format taskMap := map[string]interface{}{ - "id": taskID, - "job_id": jobID, + "id": taskID, + "job_id": jobID, "frame_start": frameStart, "frame_end": frameEnd, } @@ -402,7 +403,7 @@ func (c *Client) processTask(task map[string]interface{}, jobName, outputFormat // Step: render_blender c.sendStepUpdate(taskID, "render_blender", types.StepStatusRunning, "") c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting Blender render for frame %d...", frameStart), "render_blender") - + // Execute Blender cmd := exec.Command("blender", "-b", blendFile, "-o", outputPattern, "-f", fmt.Sprintf("%d", frameStart)) cmd.Dir = workDir @@ -432,7 +433,7 @@ func (c *Client) processTask(task map[string]interface{}, jobName, outputFormat } c.sendStepUpdate(taskID, uploadStepName, types.StepStatusRunning, "") c.sendLog(taskID, types.LogLevelInfo, "Uploading output file...", uploadStepName) - + outputPath, err := c.uploadFile(jobID, outputFile) if err != nil { errMsg := fmt.Sprintf("failed to upload output: %w", err) @@ -446,7 +447,7 @@ func (c *Client) processTask(task map[string]interface{}, jobName, outputFormat // Step: complete c.sendStepUpdate(taskID, "complete", types.StepStatusRunning, "") c.sendLog(taskID, types.LogLevelInfo, "Task completed successfully", "complete") - + // Mark task as complete if err := c.completeTask(taskID, outputPath, true, ""); err != nil { c.sendStepUpdate(taskID, "complete", types.StepStatusFailed, err.Error()) @@ -731,7 +732,7 @@ func (c *Client) uploadFile(jobID int64, filePath string) (string, error) { // Create multipart form var buf bytes.Buffer formWriter := multipart.NewWriter(&buf) - + part, err := formWriter.CreateFormFile("file", filepath.Base(filePath)) if err != nil { return "", fmt.Errorf("failed to create form file: %w", err) @@ -813,4 +814,3 @@ func (c *Client) sendTaskComplete(taskID int64, outputPath string, success bool, } return fmt.Errorf("WebSocket not connected, cannot complete task") } -