package api import ( "fmt" "log" "strings" "sync" "time" "jiggablend/pkg/types" "github.com/gorilla/websocket" ) // JobConnection wraps a WebSocket connection for job communication. type JobConnection struct { conn *websocket.Conn writeMu sync.Mutex stopPing chan struct{} stopHeartbeat chan struct{} isConnected bool connMu sync.RWMutex } // NewJobConnection creates a new job connection wrapper. func NewJobConnection() *JobConnection { return &JobConnection{} } // Connect establishes a WebSocket connection for a job (no runnerID needed). func (j *JobConnection) Connect(managerURL, jobPath, jobToken string) error { wsPath := jobPath + "/ws" wsURL := strings.Replace(managerURL, "http://", "ws://", 1) wsURL = strings.Replace(wsURL, "https://", "wss://", 1) wsURL += wsPath log.Printf("Connecting to job WebSocket: %s", wsPath) dialer := websocket.Dialer{ HandshakeTimeout: 10 * time.Second, } conn, _, err := dialer.Dial(wsURL, nil) if err != nil { return fmt.Errorf("failed to connect job WebSocket: %w", err) } j.conn = conn // Send auth message authMsg := map[string]interface{}{ "type": "auth", "job_token": jobToken, } if err := conn.WriteJSON(authMsg); err != nil { conn.Close() return fmt.Errorf("failed to send auth: %w", err) } // Wait for auth_ok conn.SetReadDeadline(time.Now().Add(30 * time.Second)) var authResp map[string]string if err := conn.ReadJSON(&authResp); err != nil { conn.Close() return fmt.Errorf("failed to read auth response: %w", err) } if authResp["type"] == "error" { conn.Close() return fmt.Errorf("auth failed: %s", authResp["message"]) } if authResp["type"] != "auth_ok" { conn.Close() return fmt.Errorf("unexpected auth response: %s", authResp["type"]) } // Clear read deadline after auth conn.SetReadDeadline(time.Time{}) // Set up ping/pong handler for keepalive conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(90 * time.Second)) return nil }) // Start ping goroutine j.stopPing = make(chan struct{}) j.connMu.Lock() j.isConnected = true j.connMu.Unlock() go j.pingLoop() // Start WebSocket heartbeat goroutine j.stopHeartbeat = make(chan struct{}) go j.heartbeatLoop() return nil } // pingLoop sends periodic pings to keep the WebSocket connection alive. func (j *JobConnection) pingLoop() { defer func() { if rec := recover(); rec != nil { log.Printf("Ping loop panicked: %v", rec) } }() ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-j.stopPing: return case <-ticker.C: j.writeMu.Lock() if j.conn != nil { deadline := time.Now().Add(10 * time.Second) if err := j.conn.WriteControl(websocket.PingMessage, []byte{}, deadline); err != nil { log.Printf("Failed to send ping, closing connection: %v", err) j.connMu.Lock() j.isConnected = false if j.conn != nil { j.conn.Close() j.conn = nil } j.connMu.Unlock() } } j.writeMu.Unlock() } } } // Heartbeat sends a heartbeat message over WebSocket to keep runner online. func (j *JobConnection) Heartbeat() { if j.conn == nil { return } j.writeMu.Lock() defer j.writeMu.Unlock() msg := map[string]interface{}{ "type": "runner_heartbeat", "timestamp": time.Now().Unix(), } if err := j.conn.WriteJSON(msg); err != nil { log.Printf("Failed to send WebSocket heartbeat: %v", err) // Handle connection failure j.connMu.Lock() j.isConnected = false if j.conn != nil { j.conn.Close() j.conn = nil } j.connMu.Unlock() } } // heartbeatLoop sends periodic heartbeat messages over WebSocket. func (j *JobConnection) heartbeatLoop() { defer func() { if rec := recover(); rec != nil { log.Printf("WebSocket heartbeat loop panicked: %v", rec) } }() ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-j.stopHeartbeat: return case <-ticker.C: j.Heartbeat() } } } // Close closes the WebSocket connection. func (j *JobConnection) Close() { j.connMu.Lock() j.isConnected = false j.connMu.Unlock() // Stop heartbeat goroutine if j.stopHeartbeat != nil { close(j.stopHeartbeat) j.stopHeartbeat = nil } // Stop ping goroutine if j.stopPing != nil { close(j.stopPing) j.stopPing = nil } if j.conn != nil { j.conn.Close() j.conn = nil } } // IsConnected returns true if the connection is established. func (j *JobConnection) IsConnected() bool { j.connMu.RLock() defer j.connMu.RUnlock() return j.isConnected && j.conn != nil } // Log sends a log entry to the manager. func (j *JobConnection) Log(taskID int64, level types.LogLevel, message string) { if j.conn == nil { return } j.writeMu.Lock() defer j.writeMu.Unlock() msg := map[string]interface{}{ "type": "log_entry", "data": map[string]interface{}{ "task_id": taskID, "log_level": string(level), "message": message, }, "timestamp": time.Now().Unix(), } if err := j.conn.WriteJSON(msg); err != nil { log.Printf("Failed to send job log, connection may be broken: %v", err) // Close the connection on write error j.connMu.Lock() j.isConnected = false if j.conn != nil { j.conn.Close() j.conn = nil } j.connMu.Unlock() } } // Progress sends a progress update to the manager. func (j *JobConnection) Progress(taskID int64, progress float64) { if j.conn == nil { return } j.writeMu.Lock() defer j.writeMu.Unlock() msg := map[string]interface{}{ "type": "progress", "data": map[string]interface{}{ "task_id": taskID, "progress": progress, }, "timestamp": time.Now().Unix(), } if err := j.conn.WriteJSON(msg); err != nil { log.Printf("Failed to send job progress, connection may be broken: %v", err) // Close the connection on write error j.connMu.Lock() j.isConnected = false if j.conn != nil { j.conn.Close() j.conn = nil } j.connMu.Unlock() } } // OutputUploaded notifies that an output file was uploaded. func (j *JobConnection) OutputUploaded(taskID int64, fileName string) { if j.conn == nil { return } j.writeMu.Lock() defer j.writeMu.Unlock() msg := map[string]interface{}{ "type": "output_uploaded", "data": map[string]interface{}{ "task_id": taskID, "file_name": fileName, }, "timestamp": time.Now().Unix(), } if err := j.conn.WriteJSON(msg); err != nil { log.Printf("Failed to send output uploaded, connection may be broken: %v", err) // Close the connection on write error j.connMu.Lock() j.isConnected = false if j.conn != nil { j.conn.Close() j.conn = nil } j.connMu.Unlock() } } // Complete sends task completion to the manager. func (j *JobConnection) Complete(taskID int64, success bool, errorMsg error) { if j.conn == nil { log.Printf("Cannot send task complete: WebSocket connection is nil") return } j.writeMu.Lock() defer j.writeMu.Unlock() msg := map[string]interface{}{ "type": "task_complete", "data": map[string]interface{}{ "task_id": taskID, "success": success, "error": errorMsg, }, "timestamp": time.Now().Unix(), } if err := j.conn.WriteJSON(msg); err != nil { log.Printf("Failed to send task complete, connection may be broken: %v", err) // Close the connection on write error j.connMu.Lock() j.isConnected = false if j.conn != nil { j.conn.Close() j.conn = nil } j.connMu.Unlock() } }