Files
jiggablend/internal/runner/api/jobconn.go

334 lines
7.3 KiB
Go

package api
import (
"fmt"
"log"
"strings"
"sync"
"time"
"jiggablend/pkg/types"
"github.com/gorilla/websocket"
)
// JobConnection wraps a WebSocket connection for job communication.
type JobConnection struct {
conn *websocket.Conn
writeMu sync.Mutex
stopPing chan struct{}
stopHeartbeat chan struct{}
isConnected bool
connMu sync.RWMutex
}
// NewJobConnection creates a new job connection wrapper.
func NewJobConnection() *JobConnection {
return &JobConnection{}
}
// Connect establishes a WebSocket connection for a job (no runnerID needed).
func (j *JobConnection) Connect(managerURL, jobPath, jobToken string) error {
wsPath := jobPath + "/ws"
wsURL := strings.Replace(managerURL, "http://", "ws://", 1)
wsURL = strings.Replace(wsURL, "https://", "wss://", 1)
wsURL += wsPath
log.Printf("Connecting to job WebSocket: %s", wsPath)
dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
}
conn, _, err := dialer.Dial(wsURL, nil)
if err != nil {
return fmt.Errorf("failed to connect job WebSocket: %w", err)
}
j.conn = conn
// Send auth message
authMsg := map[string]interface{}{
"type": "auth",
"job_token": jobToken,
}
if err := conn.WriteJSON(authMsg); err != nil {
conn.Close()
return fmt.Errorf("failed to send auth: %w", err)
}
// Wait for auth_ok
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
var authResp map[string]string
if err := conn.ReadJSON(&authResp); err != nil {
conn.Close()
return fmt.Errorf("failed to read auth response: %w", err)
}
if authResp["type"] == "error" {
conn.Close()
return fmt.Errorf("auth failed: %s", authResp["message"])
}
if authResp["type"] != "auth_ok" {
conn.Close()
return fmt.Errorf("unexpected auth response: %s", authResp["type"])
}
// Clear read deadline after auth
conn.SetReadDeadline(time.Time{})
// Set up ping/pong handler for keepalive
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(90 * time.Second))
return nil
})
// Start ping goroutine
j.stopPing = make(chan struct{})
j.connMu.Lock()
j.isConnected = true
j.connMu.Unlock()
go j.pingLoop()
// Start WebSocket heartbeat goroutine
j.stopHeartbeat = make(chan struct{})
go j.heartbeatLoop()
return nil
}
// pingLoop sends periodic pings to keep the WebSocket connection alive.
func (j *JobConnection) pingLoop() {
defer func() {
if rec := recover(); rec != nil {
log.Printf("Ping loop panicked: %v", rec)
}
}()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-j.stopPing:
return
case <-ticker.C:
j.writeMu.Lock()
if j.conn != nil {
deadline := time.Now().Add(10 * time.Second)
if err := j.conn.WriteControl(websocket.PingMessage, []byte{}, deadline); err != nil {
log.Printf("Failed to send ping, closing connection: %v", err)
j.connMu.Lock()
j.isConnected = false
if j.conn != nil {
j.conn.Close()
j.conn = nil
}
j.connMu.Unlock()
}
}
j.writeMu.Unlock()
}
}
}
// Heartbeat sends a heartbeat message over WebSocket to keep runner online.
func (j *JobConnection) Heartbeat() {
if j.conn == nil {
return
}
j.writeMu.Lock()
defer j.writeMu.Unlock()
msg := map[string]interface{}{
"type": "runner_heartbeat",
"timestamp": time.Now().Unix(),
}
if err := j.conn.WriteJSON(msg); err != nil {
log.Printf("Failed to send WebSocket heartbeat: %v", err)
// Handle connection failure
j.connMu.Lock()
j.isConnected = false
if j.conn != nil {
j.conn.Close()
j.conn = nil
}
j.connMu.Unlock()
}
}
// heartbeatLoop sends periodic heartbeat messages over WebSocket.
func (j *JobConnection) heartbeatLoop() {
defer func() {
if rec := recover(); rec != nil {
log.Printf("WebSocket heartbeat loop panicked: %v", rec)
}
}()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-j.stopHeartbeat:
return
case <-ticker.C:
j.Heartbeat()
}
}
}
// Close closes the WebSocket connection.
func (j *JobConnection) Close() {
j.connMu.Lock()
j.isConnected = false
j.connMu.Unlock()
// Stop heartbeat goroutine
if j.stopHeartbeat != nil {
close(j.stopHeartbeat)
j.stopHeartbeat = nil
}
// Stop ping goroutine
if j.stopPing != nil {
close(j.stopPing)
j.stopPing = nil
}
if j.conn != nil {
j.conn.Close()
j.conn = nil
}
}
// IsConnected returns true if the connection is established.
func (j *JobConnection) IsConnected() bool {
j.connMu.RLock()
defer j.connMu.RUnlock()
return j.isConnected && j.conn != nil
}
// Log sends a log entry to the manager.
func (j *JobConnection) Log(taskID int64, level types.LogLevel, message string) {
if j.conn == nil {
return
}
j.writeMu.Lock()
defer j.writeMu.Unlock()
msg := map[string]interface{}{
"type": "log_entry",
"data": map[string]interface{}{
"task_id": taskID,
"log_level": string(level),
"message": message,
},
"timestamp": time.Now().Unix(),
}
if err := j.conn.WriteJSON(msg); err != nil {
log.Printf("Failed to send job log, connection may be broken: %v", err)
// Close the connection on write error
j.connMu.Lock()
j.isConnected = false
if j.conn != nil {
j.conn.Close()
j.conn = nil
}
j.connMu.Unlock()
}
}
// Progress sends a progress update to the manager.
func (j *JobConnection) Progress(taskID int64, progress float64) {
if j.conn == nil {
return
}
j.writeMu.Lock()
defer j.writeMu.Unlock()
msg := map[string]interface{}{
"type": "progress",
"data": map[string]interface{}{
"task_id": taskID,
"progress": progress,
},
"timestamp": time.Now().Unix(),
}
if err := j.conn.WriteJSON(msg); err != nil {
log.Printf("Failed to send job progress, connection may be broken: %v", err)
// Close the connection on write error
j.connMu.Lock()
j.isConnected = false
if j.conn != nil {
j.conn.Close()
j.conn = nil
}
j.connMu.Unlock()
}
}
// OutputUploaded notifies that an output file was uploaded.
func (j *JobConnection) OutputUploaded(taskID int64, fileName string) {
if j.conn == nil {
return
}
j.writeMu.Lock()
defer j.writeMu.Unlock()
msg := map[string]interface{}{
"type": "output_uploaded",
"data": map[string]interface{}{
"task_id": taskID,
"file_name": fileName,
},
"timestamp": time.Now().Unix(),
}
if err := j.conn.WriteJSON(msg); err != nil {
log.Printf("Failed to send output uploaded, connection may be broken: %v", err)
// Close the connection on write error
j.connMu.Lock()
j.isConnected = false
if j.conn != nil {
j.conn.Close()
j.conn = nil
}
j.connMu.Unlock()
}
}
// Complete sends task completion to the manager.
func (j *JobConnection) Complete(taskID int64, success bool, errorMsg error) {
if j.conn == nil {
log.Printf("Cannot send task complete: WebSocket connection is nil")
return
}
j.writeMu.Lock()
defer j.writeMu.Unlock()
msg := map[string]interface{}{
"type": "task_complete",
"data": map[string]interface{}{
"task_id": taskID,
"success": success,
"error": errorMsg,
},
"timestamp": time.Now().Unix(),
}
if err := j.conn.WriteJSON(msg); err != nil {
log.Printf("Failed to send task complete, connection may be broken: %v", err)
// Close the connection on write error
j.connMu.Lock()
j.isConnected = false
if j.conn != nil {
j.conn.Close()
j.conn = nil
}
j.connMu.Unlock()
}
}