1123 lines
35 KiB
Go
1123 lines
35 KiB
Go
package runner
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/hmac"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"mime/multipart"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"fuego/pkg/types"
|
|
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
// Client represents a runner client
|
|
type Client struct {
|
|
managerURL string
|
|
name string
|
|
hostname string
|
|
ipAddress string
|
|
httpClient *http.Client
|
|
runnerID int64
|
|
runnerSecret string
|
|
managerSecret string
|
|
wsConn *websocket.Conn
|
|
wsConnMu sync.RWMutex
|
|
stopChan chan struct{}
|
|
stepStartTimes map[string]time.Time // key: "taskID:stepName"
|
|
stepTimesMu sync.RWMutex
|
|
}
|
|
|
|
// NewClient creates a new runner client
|
|
func NewClient(managerURL, name, hostname, ipAddress string) *Client {
|
|
return &Client{
|
|
managerURL: managerURL,
|
|
name: name,
|
|
hostname: hostname,
|
|
ipAddress: ipAddress,
|
|
httpClient: &http.Client{Timeout: 30 * time.Second},
|
|
stopChan: make(chan struct{}),
|
|
stepStartTimes: make(map[string]time.Time),
|
|
}
|
|
}
|
|
|
|
// SetSecrets sets the runner and manager secrets
|
|
func (c *Client) SetSecrets(runnerID int64, runnerSecret, managerSecret string) {
|
|
c.runnerID = runnerID
|
|
c.runnerSecret = runnerSecret
|
|
c.managerSecret = managerSecret
|
|
}
|
|
|
|
// Register registers the runner with the manager using a registration token
|
|
func (c *Client) Register(registrationToken string) (int64, string, string, error) {
|
|
req := map[string]interface{}{
|
|
"name": c.name,
|
|
"hostname": c.hostname,
|
|
"ip_address": c.ipAddress,
|
|
"capabilities": "blender,ffmpeg",
|
|
"registration_token": registrationToken,
|
|
}
|
|
|
|
body, _ := json.Marshal(req)
|
|
resp, err := c.httpClient.Post(
|
|
fmt.Sprintf("%s/api/runner/register", c.managerURL),
|
|
"application/json",
|
|
bytes.NewReader(body),
|
|
)
|
|
if err != nil {
|
|
return 0, "", "", fmt.Errorf("failed to register: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusCreated {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return 0, "", "", fmt.Errorf("registration failed: %s", string(body))
|
|
}
|
|
|
|
var result struct {
|
|
ID int64 `json:"id"`
|
|
RunnerSecret string `json:"runner_secret"`
|
|
ManagerSecret string `json:"manager_secret"`
|
|
}
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return 0, "", "", fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
|
|
c.runnerID = result.ID
|
|
c.runnerSecret = result.RunnerSecret
|
|
c.managerSecret = result.ManagerSecret
|
|
|
|
return result.ID, result.RunnerSecret, result.ManagerSecret, nil
|
|
}
|
|
|
|
// signRequest signs a request with the runner secret
|
|
func (c *Client) signRequest(method, path string, body []byte) (string, time.Time) {
|
|
timestamp := time.Now()
|
|
message := fmt.Sprintf("%s\n%s\n%s\n%d", method, path, string(body), timestamp.Unix())
|
|
h := hmac.New(sha256.New, []byte(c.runnerSecret))
|
|
h.Write([]byte(message))
|
|
signature := hex.EncodeToString(h.Sum(nil))
|
|
return signature, timestamp
|
|
}
|
|
|
|
// doSignedRequest performs a signed HTTP request
|
|
func (c *Client) doSignedRequest(method, path string, body []byte) (*http.Response, error) {
|
|
if c.runnerSecret == "" {
|
|
return nil, fmt.Errorf("runner not authenticated")
|
|
}
|
|
|
|
signature, timestamp := c.signRequest(method, path, body)
|
|
|
|
req, err := http.NewRequest(method, fmt.Sprintf("%s%s", c.managerURL, path), bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("X-Runner-Signature", signature)
|
|
req.Header.Set("X-Runner-Timestamp", fmt.Sprintf("%d", timestamp.Unix()))
|
|
|
|
return c.httpClient.Do(req)
|
|
}
|
|
|
|
// ConnectWebSocket establishes a WebSocket connection to the manager
|
|
func (c *Client) ConnectWebSocket() error {
|
|
if c.runnerID == 0 || c.runnerSecret == "" {
|
|
return fmt.Errorf("runner not authenticated")
|
|
}
|
|
|
|
// Build WebSocket URL with authentication
|
|
timestamp := time.Now().Unix()
|
|
path := "/api/runner/ws"
|
|
// Sign the request
|
|
message := fmt.Sprintf("GET\n%s\n\n%d", path, timestamp)
|
|
h := hmac.New(sha256.New, []byte(c.runnerSecret))
|
|
h.Write([]byte(message))
|
|
signature := hex.EncodeToString(h.Sum(nil))
|
|
|
|
// Convert HTTP URL to WebSocket URL
|
|
wsURL := strings.Replace(c.managerURL, "http://", "ws://", 1)
|
|
wsURL = strings.Replace(wsURL, "https://", "wss://", 1)
|
|
wsURL = fmt.Sprintf("%s%s?runner_id=%d&signature=%s×tamp=%d",
|
|
wsURL, path, c.runnerID, signature, timestamp)
|
|
|
|
// Parse URL
|
|
u, err := url.Parse(wsURL)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid WebSocket URL: %w", err)
|
|
}
|
|
|
|
// Connect
|
|
dialer := websocket.Dialer{
|
|
HandshakeTimeout: 10 * time.Second,
|
|
}
|
|
conn, _, err := dialer.Dial(u.String(), nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect WebSocket: %w", err)
|
|
}
|
|
|
|
c.wsConnMu.Lock()
|
|
if c.wsConn != nil {
|
|
c.wsConn.Close()
|
|
}
|
|
c.wsConn = conn
|
|
c.wsConnMu.Unlock()
|
|
|
|
log.Printf("WebSocket connected to manager")
|
|
return nil
|
|
}
|
|
|
|
// ConnectWebSocketWithReconnect connects with automatic reconnection
|
|
func (c *Client) ConnectWebSocketWithReconnect() {
|
|
backoff := 1 * time.Second
|
|
maxBackoff := 60 * time.Second
|
|
|
|
for {
|
|
err := c.ConnectWebSocket()
|
|
if err == nil {
|
|
backoff = 1 * time.Second // Reset on success
|
|
c.HandleWebSocketMessages()
|
|
} else {
|
|
log.Printf("WebSocket connection failed: %v, retrying in %v", err, backoff)
|
|
time.Sleep(backoff)
|
|
backoff *= 2
|
|
if backoff > maxBackoff {
|
|
backoff = maxBackoff
|
|
}
|
|
}
|
|
|
|
// Check if we should stop
|
|
select {
|
|
case <-c.stopChan:
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// HandleWebSocketMessages handles incoming WebSocket messages
|
|
func (c *Client) HandleWebSocketMessages() {
|
|
c.wsConnMu.Lock()
|
|
conn := c.wsConn
|
|
c.wsConnMu.Unlock()
|
|
|
|
if conn == nil {
|
|
return
|
|
}
|
|
|
|
// Set pong handler
|
|
conn.SetPongHandler(func(string) error {
|
|
return nil
|
|
})
|
|
|
|
// Handle messages
|
|
for {
|
|
var msg map[string]interface{}
|
|
err := conn.ReadJSON(&msg)
|
|
if err != nil {
|
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
|
log.Printf("WebSocket error: %v", err)
|
|
}
|
|
c.wsConnMu.Lock()
|
|
c.wsConn = nil
|
|
c.wsConnMu.Unlock()
|
|
return
|
|
}
|
|
|
|
msgType, _ := msg["type"].(string)
|
|
switch msgType {
|
|
case "task_assignment":
|
|
c.handleTaskAssignment(msg)
|
|
case "ping":
|
|
// Respond to ping with pong (automatic)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleTaskAssignment handles a task assignment message
|
|
func (c *Client) handleTaskAssignment(msg map[string]interface{}) {
|
|
data, ok := msg["data"].(map[string]interface{})
|
|
if !ok {
|
|
log.Printf("Invalid task assignment message")
|
|
return
|
|
}
|
|
|
|
taskID, _ := data["task_id"].(float64)
|
|
jobID, _ := data["job_id"].(float64)
|
|
jobName, _ := data["job_name"].(string)
|
|
outputFormat, _ := data["output_format"].(string)
|
|
frameStart, _ := data["frame_start"].(float64)
|
|
frameEnd, _ := data["frame_end"].(float64)
|
|
taskType, _ := data["task_type"].(string)
|
|
inputFilesRaw, _ := data["input_files"].([]interface{})
|
|
|
|
// Convert to task map format
|
|
taskMap := map[string]interface{}{
|
|
"id": taskID,
|
|
"job_id": jobID,
|
|
"frame_start": frameStart,
|
|
"frame_end": frameEnd,
|
|
}
|
|
|
|
// Process the task based on type
|
|
go func() {
|
|
var err error
|
|
if taskType == "metadata" {
|
|
if len(inputFilesRaw) == 0 {
|
|
log.Printf("No input files for metadata task %v", taskID)
|
|
c.sendTaskComplete(int64(taskID), "", false, "No input files")
|
|
return
|
|
}
|
|
err = c.processMetadataTask(taskMap, int64(jobID), inputFilesRaw)
|
|
} else if taskType == "video_generation" {
|
|
err = c.processVideoGenerationTask(taskMap, int64(jobID))
|
|
} else {
|
|
if len(inputFilesRaw) == 0 {
|
|
log.Printf("No input files for task %v", taskID)
|
|
c.sendTaskComplete(int64(taskID), "", false, "No input files")
|
|
return
|
|
}
|
|
err = c.processTask(taskMap, jobName, outputFormat, inputFilesRaw)
|
|
}
|
|
if err != nil {
|
|
log.Printf("Failed to process task %v: %v", taskID, err)
|
|
c.sendTaskComplete(int64(taskID), "", false, err.Error())
|
|
}
|
|
}()
|
|
}
|
|
|
|
// HeartbeatLoop sends periodic heartbeats via WebSocket
|
|
func (c *Client) HeartbeatLoop() {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
c.wsConnMu.RLock()
|
|
conn := c.wsConn
|
|
c.wsConnMu.RUnlock()
|
|
|
|
if conn != nil {
|
|
// Send heartbeat via WebSocket
|
|
msg := map[string]interface{}{
|
|
"type": "heartbeat",
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
if err := conn.WriteJSON(msg); err != nil {
|
|
log.Printf("Failed to send heartbeat: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendLog sends a log entry to the manager via WebSocket
|
|
func (c *Client) sendLog(taskID int64, logLevel types.LogLevel, message, stepName string) {
|
|
c.wsConnMu.RLock()
|
|
conn := c.wsConn
|
|
c.wsConnMu.RUnlock()
|
|
|
|
if conn != nil {
|
|
msg := map[string]interface{}{
|
|
"type": "log_entry",
|
|
"data": map[string]interface{}{
|
|
"task_id": taskID,
|
|
"log_level": string(logLevel),
|
|
"message": message,
|
|
"step_name": stepName,
|
|
},
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
if err := conn.WriteJSON(msg); err != nil {
|
|
log.Printf("Failed to send log: %v", err)
|
|
}
|
|
} else {
|
|
log.Printf("WebSocket not connected, cannot send log")
|
|
}
|
|
}
|
|
|
|
// sendStepUpdate sends a step start/complete event to the manager
|
|
func (c *Client) sendStepUpdate(taskID int64, stepName string, status types.StepStatus, errorMsg string) {
|
|
key := fmt.Sprintf("%d:%s", taskID, stepName)
|
|
var durationMs *int
|
|
|
|
// Track step start time
|
|
if status == types.StepStatusRunning {
|
|
c.stepTimesMu.Lock()
|
|
c.stepStartTimes[key] = time.Now()
|
|
c.stepTimesMu.Unlock()
|
|
}
|
|
|
|
// Calculate duration if step is completing
|
|
if status == types.StepStatusCompleted || status == types.StepStatusFailed {
|
|
c.stepTimesMu.RLock()
|
|
startTime, exists := c.stepStartTimes[key]
|
|
c.stepTimesMu.RUnlock()
|
|
if exists {
|
|
duration := int(time.Since(startTime).Milliseconds())
|
|
durationMs = &duration
|
|
c.stepTimesMu.Lock()
|
|
delete(c.stepStartTimes, key)
|
|
c.stepTimesMu.Unlock()
|
|
}
|
|
}
|
|
|
|
// Send step update via HTTP API
|
|
reqBody := map[string]interface{}{
|
|
"step_name": stepName,
|
|
"status": string(status),
|
|
}
|
|
if durationMs != nil {
|
|
reqBody["duration_ms"] = *durationMs
|
|
}
|
|
if errorMsg != "" {
|
|
reqBody["error_message"] = errorMsg
|
|
}
|
|
|
|
body, _ := json.Marshal(reqBody)
|
|
path := fmt.Sprintf("/api/runner/tasks/%d/steps?runner_id=%d", taskID, c.runnerID)
|
|
resp, err := c.doSignedRequest("POST", path, body)
|
|
if err != nil {
|
|
log.Printf("Failed to send step update: %v", err)
|
|
// Fallback to log-based tracking
|
|
msg := fmt.Sprintf("Step %s: %s", stepName, status)
|
|
if errorMsg != "" {
|
|
msg += " - " + errorMsg
|
|
}
|
|
logLevel := types.LogLevelInfo
|
|
if status == types.StepStatusFailed {
|
|
logLevel = types.LogLevelError
|
|
}
|
|
c.sendLog(taskID, logLevel, msg, stepName)
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
log.Printf("Step update failed: %s", string(body))
|
|
// Fallback to log-based tracking
|
|
msg := fmt.Sprintf("Step %s: %s", stepName, status)
|
|
if errorMsg != "" {
|
|
msg += " - " + errorMsg
|
|
}
|
|
logLevel := types.LogLevelInfo
|
|
if status == types.StepStatusFailed {
|
|
logLevel = types.LogLevelError
|
|
}
|
|
c.sendLog(taskID, logLevel, msg, stepName)
|
|
return
|
|
}
|
|
|
|
// Also send log for debugging
|
|
msg := fmt.Sprintf("Step %s: %s", stepName, status)
|
|
if errorMsg != "" {
|
|
msg += " - " + errorMsg
|
|
}
|
|
logLevel := types.LogLevelInfo
|
|
if status == types.StepStatusFailed {
|
|
logLevel = types.LogLevelError
|
|
}
|
|
c.sendLog(taskID, logLevel, msg, stepName)
|
|
}
|
|
|
|
// processTask processes a single task
|
|
func (c *Client) processTask(task map[string]interface{}, jobName, outputFormat string, inputFiles []interface{}) error {
|
|
taskID := int64(task["id"].(float64))
|
|
jobID := int64(task["job_id"].(float64))
|
|
frameStart := int(task["frame_start"].(float64))
|
|
frameEnd := int(task["frame_end"].(float64))
|
|
|
|
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting task: job %d, frames %d-%d, format: %s", jobID, frameStart, frameEnd, outputFormat), "")
|
|
log.Printf("Processing task %d: job %d, frames %d-%d, format: %s", taskID, jobID, frameStart, frameEnd, outputFormat)
|
|
|
|
// Create work directory
|
|
workDir := filepath.Join(os.TempDir(), fmt.Sprintf("fuego-task-%d", taskID))
|
|
if err := os.MkdirAll(workDir, 0755); err != nil {
|
|
return fmt.Errorf("failed to create work directory: %w", err)
|
|
}
|
|
defer os.RemoveAll(workDir)
|
|
|
|
// Step: download
|
|
c.sendStepUpdate(taskID, "download", types.StepStatusRunning, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, "Downloading input files...", "download")
|
|
blendFile := ""
|
|
for _, filePath := range inputFiles {
|
|
filePathStr := filePath.(string)
|
|
if err := c.downloadFile(filePathStr, workDir); err != nil {
|
|
c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error())
|
|
return fmt.Errorf("failed to download file %s: %w", filePathStr, err)
|
|
}
|
|
if filepath.Ext(filePathStr) == ".blend" {
|
|
blendFile = filepath.Join(workDir, filepath.Base(filePathStr))
|
|
}
|
|
}
|
|
|
|
if blendFile == "" {
|
|
err := fmt.Errorf("no .blend file found in input files")
|
|
c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error())
|
|
return err
|
|
}
|
|
c.sendStepUpdate(taskID, "download", types.StepStatusCompleted, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, "Input files downloaded successfully", "download")
|
|
|
|
// Render frames
|
|
outputDir := filepath.Join(workDir, "output")
|
|
if err := os.MkdirAll(outputDir, 0755); err != nil {
|
|
return fmt.Errorf("failed to create output directory: %w", err)
|
|
}
|
|
|
|
// For MP4, render as PNG first, then combine into video
|
|
renderFormat := outputFormat
|
|
if outputFormat == "MP4" {
|
|
renderFormat = "PNG"
|
|
}
|
|
|
|
outputPattern := filepath.Join(outputDir, fmt.Sprintf("frame_%%04d.%s", strings.ToLower(renderFormat)))
|
|
|
|
// Step: render_blender
|
|
c.sendStepUpdate(taskID, "render_blender", types.StepStatusRunning, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting Blender render for frame %d...", frameStart), "render_blender")
|
|
|
|
// Execute Blender
|
|
cmd := exec.Command("blender", "-b", blendFile, "-o", outputPattern, "-f", fmt.Sprintf("%d", frameStart))
|
|
cmd.Dir = workDir
|
|
output, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
errMsg := fmt.Sprintf("blender failed: %v\nOutput: %s", err, string(output))
|
|
c.sendLog(taskID, types.LogLevelError, errMsg, "render_blender")
|
|
c.sendStepUpdate(taskID, "render_blender", types.StepStatusFailed, errMsg)
|
|
return errors.New(errMsg)
|
|
}
|
|
|
|
// Find rendered output file
|
|
outputFile := filepath.Join(outputDir, fmt.Sprintf("frame_%04d.%s", frameStart, strings.ToLower(renderFormat)))
|
|
if _, err := os.Stat(outputFile); os.IsNotExist(err) {
|
|
errMsg := fmt.Sprintf("output file not found: %s", outputFile)
|
|
c.sendLog(taskID, types.LogLevelError, errMsg, "render_blender")
|
|
c.sendStepUpdate(taskID, "render_blender", types.StepStatusFailed, errMsg)
|
|
return errors.New(errMsg)
|
|
}
|
|
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Blender render completed for frame %d", frameStart), "render_blender")
|
|
c.sendStepUpdate(taskID, "render_blender", types.StepStatusCompleted, "")
|
|
|
|
// Step: upload or upload_frames
|
|
uploadStepName := "upload"
|
|
if outputFormat == "MP4" {
|
|
uploadStepName = "upload_frames"
|
|
}
|
|
c.sendStepUpdate(taskID, uploadStepName, types.StepStatusRunning, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, "Uploading output file...", uploadStepName)
|
|
|
|
outputPath, err := c.uploadFile(jobID, outputFile)
|
|
if err != nil {
|
|
errMsg := fmt.Sprintf("failed to upload output: %v", err)
|
|
c.sendLog(taskID, types.LogLevelError, errMsg, uploadStepName)
|
|
c.sendStepUpdate(taskID, uploadStepName, types.StepStatusFailed, errMsg)
|
|
return errors.New(errMsg)
|
|
}
|
|
c.sendLog(taskID, types.LogLevelInfo, "Output file uploaded successfully", uploadStepName)
|
|
c.sendStepUpdate(taskID, uploadStepName, types.StepStatusCompleted, "")
|
|
|
|
// Step: complete
|
|
c.sendStepUpdate(taskID, "complete", types.StepStatusRunning, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, "Task completed successfully", "complete")
|
|
|
|
// Mark task as complete
|
|
if err := c.completeTask(taskID, outputPath, true, ""); err != nil {
|
|
c.sendStepUpdate(taskID, "complete", types.StepStatusFailed, err.Error())
|
|
return err
|
|
}
|
|
c.sendStepUpdate(taskID, "complete", types.StepStatusCompleted, "")
|
|
|
|
return nil
|
|
}
|
|
|
|
// processVideoGenerationTask processes a video generation task
|
|
func (c *Client) processVideoGenerationTask(task map[string]interface{}, jobID int64) error {
|
|
taskID := int64(task["id"].(float64))
|
|
|
|
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting video generation task: job %d", jobID), "")
|
|
log.Printf("Processing video generation task %d for job %d", taskID, jobID)
|
|
|
|
// Get all output files for this job
|
|
files, err := c.getJobFiles(jobID)
|
|
if err != nil {
|
|
c.sendStepUpdate(taskID, "get_files", types.StepStatusFailed, err.Error())
|
|
return fmt.Errorf("failed to get job files: %w", err)
|
|
}
|
|
|
|
// Find all PNG frame files
|
|
var pngFiles []map[string]interface{}
|
|
for _, file := range files {
|
|
fileType, _ := file["file_type"].(string)
|
|
fileName, _ := file["file_name"].(string)
|
|
if fileType == "output" && strings.HasSuffix(fileName, ".png") {
|
|
pngFiles = append(pngFiles, file)
|
|
}
|
|
}
|
|
|
|
if len(pngFiles) == 0 {
|
|
err := fmt.Errorf("no PNG frame files found for MP4 generation")
|
|
c.sendStepUpdate(taskID, "get_files", types.StepStatusFailed, err.Error())
|
|
return err
|
|
}
|
|
|
|
c.sendStepUpdate(taskID, "get_files", types.StepStatusCompleted, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Found %d PNG frames for video generation", len(pngFiles)), "get_files")
|
|
|
|
log.Printf("Generating MP4 for job %d from %d PNG frames", jobID, len(pngFiles))
|
|
|
|
// Step: download_frames
|
|
c.sendStepUpdate(taskID, "download_frames", types.StepStatusRunning, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, "Downloading PNG frames...", "download_frames")
|
|
|
|
// Create work directory for video generation
|
|
workDir := filepath.Join(os.TempDir(), fmt.Sprintf("fuego-video-%d", jobID))
|
|
if err := os.MkdirAll(workDir, 0755); err != nil {
|
|
c.sendStepUpdate(taskID, "download_frames", types.StepStatusFailed, err.Error())
|
|
return fmt.Errorf("failed to create work directory: %w", err)
|
|
}
|
|
defer os.RemoveAll(workDir)
|
|
|
|
// Download all PNG frames
|
|
var frameFiles []string
|
|
for _, file := range pngFiles {
|
|
fileName, _ := file["file_name"].(string)
|
|
framePath := filepath.Join(workDir, fileName)
|
|
if err := c.downloadFrameFile(jobID, fileName, framePath); err != nil {
|
|
log.Printf("Failed to download frame %s: %v", fileName, err)
|
|
continue
|
|
}
|
|
frameFiles = append(frameFiles, framePath)
|
|
}
|
|
|
|
if len(frameFiles) == 0 {
|
|
err := fmt.Errorf("failed to download any frame files")
|
|
c.sendStepUpdate(taskID, "download_frames", types.StepStatusFailed, err.Error())
|
|
return err
|
|
}
|
|
|
|
// Sort frame files by name to ensure correct order
|
|
sort.Strings(frameFiles)
|
|
c.sendStepUpdate(taskID, "download_frames", types.StepStatusCompleted, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Downloaded %d frames", len(frameFiles)), "download_frames")
|
|
|
|
// Step: generate_video
|
|
c.sendStepUpdate(taskID, "generate_video", types.StepStatusRunning, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, "Generating MP4 video with ffmpeg...", "generate_video")
|
|
|
|
// Generate MP4 using ffmpeg
|
|
outputMP4 := filepath.Join(workDir, fmt.Sprintf("output_%d.mp4", jobID))
|
|
|
|
// Use ffmpeg to combine frames into MP4
|
|
// Method 1: Using image sequence input (more reliable)
|
|
firstFrame := frameFiles[0]
|
|
// Extract frame number pattern (e.g., frame_0001.png -> frame_%04d.png)
|
|
baseName := filepath.Base(firstFrame)
|
|
pattern := strings.Replace(baseName, fmt.Sprintf("%04d", extractFrameNumber(baseName)), "%04d", 1)
|
|
patternPath := filepath.Join(workDir, pattern)
|
|
|
|
// Run ffmpeg to combine frames into MP4 at 24 fps
|
|
cmd := exec.Command("ffmpeg", "-y", "-framerate", "24", "-i", patternPath,
|
|
"-c:v", "libx264", "-pix_fmt", "yuv420p", "-r", "24", outputMP4)
|
|
cmd.Dir = workDir
|
|
output, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
// Try alternative method with concat demuxer
|
|
log.Printf("First ffmpeg attempt failed, trying concat method: %s", string(output))
|
|
err = c.generateMP4WithConcat(frameFiles, outputMP4, workDir)
|
|
if err != nil {
|
|
c.sendStepUpdate(taskID, "generate_video", types.StepStatusFailed, err.Error())
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Check if MP4 was created
|
|
if _, err := os.Stat(outputMP4); os.IsNotExist(err) {
|
|
err := fmt.Errorf("MP4 file not created: %s", outputMP4)
|
|
c.sendStepUpdate(taskID, "generate_video", types.StepStatusFailed, err.Error())
|
|
return err
|
|
}
|
|
|
|
c.sendStepUpdate(taskID, "generate_video", types.StepStatusCompleted, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, "MP4 video generated successfully", "generate_video")
|
|
|
|
// Step: upload_video
|
|
c.sendStepUpdate(taskID, "upload_video", types.StepStatusRunning, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, "Uploading MP4 video...", "upload_video")
|
|
|
|
// Upload MP4 file
|
|
mp4Path, err := c.uploadFile(jobID, outputMP4)
|
|
if err != nil {
|
|
c.sendStepUpdate(taskID, "upload_video", types.StepStatusFailed, err.Error())
|
|
return fmt.Errorf("failed to upload MP4: %w", err)
|
|
}
|
|
|
|
c.sendStepUpdate(taskID, "upload_video", types.StepStatusCompleted, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Successfully uploaded MP4: %s", mp4Path), "upload_video")
|
|
|
|
// Mark task as complete
|
|
if err := c.completeTask(taskID, mp4Path, true, ""); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Printf("Successfully generated and uploaded MP4 for job %d: %s", jobID, mp4Path)
|
|
return nil
|
|
}
|
|
|
|
// generateMP4WithConcat uses ffmpeg concat demuxer as fallback
|
|
func (c *Client) generateMP4WithConcat(frameFiles []string, outputMP4, workDir string) error {
|
|
// Create file list for ffmpeg concat demuxer
|
|
listFile := filepath.Join(workDir, "frames.txt")
|
|
listFileHandle, err := os.Create(listFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create list file: %w", err)
|
|
}
|
|
|
|
for _, frameFile := range frameFiles {
|
|
absPath, _ := filepath.Abs(frameFile)
|
|
fmt.Fprintf(listFileHandle, "file '%s'\n", absPath)
|
|
}
|
|
listFileHandle.Close()
|
|
|
|
// Run ffmpeg with concat demuxer
|
|
cmd := exec.Command("ffmpeg", "-f", "concat", "-safe", "0", "-i", listFile,
|
|
"-c:v", "libx264", "-pix_fmt", "yuv420p", "-r", "24", "-y", outputMP4)
|
|
output, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
return fmt.Errorf("ffmpeg concat failed: %w\nOutput: %s", err, string(output))
|
|
}
|
|
|
|
if _, err := os.Stat(outputMP4); os.IsNotExist(err) {
|
|
return fmt.Errorf("MP4 file not created: %s", outputMP4)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// extractFrameNumber extracts frame number from filename like "frame_0001.png"
|
|
func extractFrameNumber(filename string) int {
|
|
parts := strings.Split(filepath.Base(filename), "_")
|
|
if len(parts) < 2 {
|
|
return 0
|
|
}
|
|
framePart := strings.Split(parts[1], ".")[0]
|
|
var frameNum int
|
|
fmt.Sscanf(framePart, "%d", &frameNum)
|
|
return frameNum
|
|
}
|
|
|
|
// getJobStatus gets job status from manager
|
|
func (c *Client) getJobStatus(jobID int64) (map[string]interface{}, error) {
|
|
path := fmt.Sprintf("/api/runner/jobs/%d/status?runner_id=%d", jobID, c.runnerID)
|
|
resp, err := c.doSignedRequest("GET", path, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return nil, fmt.Errorf("failed to get job status: %s", string(body))
|
|
}
|
|
|
|
var job map[string]interface{}
|
|
if err := json.NewDecoder(resp.Body).Decode(&job); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return job, nil
|
|
}
|
|
|
|
// getJobFiles gets job files from manager
|
|
func (c *Client) getJobFiles(jobID int64) ([]map[string]interface{}, error) {
|
|
path := fmt.Sprintf("/api/runner/jobs/%d/files?runner_id=%d", jobID, c.runnerID)
|
|
resp, err := c.doSignedRequest("GET", path, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return nil, fmt.Errorf("failed to get job files: %s", string(body))
|
|
}
|
|
|
|
var files []map[string]interface{}
|
|
if err := json.NewDecoder(resp.Body).Decode(&files); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return files, nil
|
|
}
|
|
|
|
// downloadFrameFile downloads a frame file for MP4 generation
|
|
func (c *Client) downloadFrameFile(jobID int64, fileName, destPath string) error {
|
|
path := fmt.Sprintf("/api/runner/files/%d/%s?runner_id=%d", jobID, fileName, c.runnerID)
|
|
resp, err := c.doSignedRequest("GET", path, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return fmt.Errorf("download failed: %s", string(body))
|
|
}
|
|
|
|
file, err := os.Create(destPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
|
|
_, err = io.Copy(file, resp.Body)
|
|
return err
|
|
}
|
|
|
|
// downloadFile downloads a file from the manager
|
|
func (c *Client) downloadFile(filePath, destDir string) error {
|
|
// Extract job ID and filename from path
|
|
// Path format: storage/jobs/{jobID}/{filename}
|
|
parts := filepath.SplitList(filePath)
|
|
if len(parts) < 3 {
|
|
return fmt.Errorf("invalid file path format: %s", filePath)
|
|
}
|
|
|
|
// Find job ID in path (look for "jobs" directory)
|
|
jobID := ""
|
|
fileName := filepath.Base(filePath)
|
|
for i, part := range parts {
|
|
if part == "jobs" && i+1 < len(parts) {
|
|
jobID = parts[i+1]
|
|
break
|
|
}
|
|
}
|
|
|
|
if jobID == "" {
|
|
return fmt.Errorf("could not extract job ID from path: %s", filePath)
|
|
}
|
|
|
|
// Download via HTTP
|
|
path := fmt.Sprintf("/api/runner/files/%s/%s?runner_id=%d", jobID, fileName, c.runnerID)
|
|
resp, err := c.doSignedRequest("GET", path, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to download file: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return fmt.Errorf("download failed: %s", string(body))
|
|
}
|
|
|
|
destPath := filepath.Join(destDir, fileName)
|
|
file, err := os.Create(destPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create destination file: %w", err)
|
|
}
|
|
defer file.Close()
|
|
|
|
_, err = io.Copy(file, resp.Body)
|
|
return err
|
|
}
|
|
|
|
// uploadFile uploads a file to the manager
|
|
func (c *Client) uploadFile(jobID int64, filePath string) (string, error) {
|
|
file, err := os.Open(filePath)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to open file: %w", err)
|
|
}
|
|
defer file.Close()
|
|
|
|
// Create multipart form
|
|
var buf bytes.Buffer
|
|
formWriter := multipart.NewWriter(&buf)
|
|
|
|
part, err := formWriter.CreateFormFile("file", filepath.Base(filePath))
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to create form file: %w", err)
|
|
}
|
|
|
|
_, err = io.Copy(part, file)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to copy file data: %w", err)
|
|
}
|
|
|
|
formWriter.Close()
|
|
|
|
// Upload file with signature
|
|
path := fmt.Sprintf("/api/runner/files/%d/upload?runner_id=%d", jobID, c.runnerID)
|
|
timestamp := time.Now()
|
|
message := fmt.Sprintf("POST\n%s\n%s\n%d", path, buf.String(), timestamp.Unix())
|
|
h := hmac.New(sha256.New, []byte(c.runnerSecret))
|
|
h.Write([]byte(message))
|
|
signature := hex.EncodeToString(h.Sum(nil))
|
|
|
|
url := fmt.Sprintf("%s%s", c.managerURL, path)
|
|
req, err := http.NewRequest("POST", url, &buf)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
req.Header.Set("Content-Type", formWriter.FormDataContentType())
|
|
req.Header.Set("X-Runner-Signature", signature)
|
|
req.Header.Set("X-Runner-Timestamp", fmt.Sprintf("%d", timestamp.Unix()))
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to upload file: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusCreated {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return "", fmt.Errorf("upload failed: %s", string(body))
|
|
}
|
|
|
|
var result struct {
|
|
FilePath string `json:"file_path"`
|
|
FileName string `json:"file_name"`
|
|
}
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return "", fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
|
|
return result.FilePath, nil
|
|
}
|
|
|
|
// processMetadataTask processes a metadata extraction task
|
|
func (c *Client) processMetadataTask(task map[string]interface{}, jobID int64, inputFiles []interface{}) error {
|
|
taskID := int64(task["id"].(float64))
|
|
|
|
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting metadata extraction task: job %d", jobID), "")
|
|
log.Printf("Processing metadata extraction task %d for job %d", taskID, jobID)
|
|
|
|
// Create work directory
|
|
workDir := filepath.Join(os.TempDir(), fmt.Sprintf("fuego-metadata-%d", taskID))
|
|
if err := os.MkdirAll(workDir, 0755); err != nil {
|
|
return fmt.Errorf("failed to create work directory: %w", err)
|
|
}
|
|
defer os.RemoveAll(workDir)
|
|
|
|
// Step: download
|
|
c.sendStepUpdate(taskID, "download", types.StepStatusRunning, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, "Downloading blend file...", "download")
|
|
blendFile := ""
|
|
for _, filePath := range inputFiles {
|
|
filePathStr := filePath.(string)
|
|
if err := c.downloadFile(filePathStr, workDir); err != nil {
|
|
c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error())
|
|
return fmt.Errorf("failed to download file %s: %w", filePathStr, err)
|
|
}
|
|
if filepath.Ext(filePathStr) == ".blend" {
|
|
blendFile = filepath.Join(workDir, filepath.Base(filePathStr))
|
|
}
|
|
}
|
|
|
|
if blendFile == "" {
|
|
err := fmt.Errorf("no .blend file found in input files")
|
|
c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error())
|
|
return err
|
|
}
|
|
c.sendStepUpdate(taskID, "download", types.StepStatusCompleted, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, "Blend file downloaded successfully", "download")
|
|
|
|
// Step: extract_metadata
|
|
c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusRunning, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, "Extracting metadata from blend file...", "extract_metadata")
|
|
|
|
// Create Python script to extract metadata
|
|
scriptPath := filepath.Join(workDir, "extract_metadata.py")
|
|
scriptContent := `import bpy
|
|
import json
|
|
import sys
|
|
|
|
# Get scene
|
|
scene = bpy.context.scene
|
|
|
|
# Extract frame range
|
|
frame_start = scene.frame_start
|
|
frame_end = scene.frame_end
|
|
|
|
# Extract render settings
|
|
render = scene.render
|
|
resolution_x = render.resolution_x
|
|
resolution_y = render.resolution_y
|
|
samples = scene.cycles.samples if scene.cycles else scene.eevee.taa_render_samples
|
|
engine = scene.render.engine.lower()
|
|
|
|
# Determine output format from file format
|
|
output_format = render.image_settings.file_format
|
|
|
|
# Extract scene info
|
|
camera_count = len([obj for obj in scene.objects if obj.type == 'CAMERA'])
|
|
object_count = len(scene.objects)
|
|
material_count = len(bpy.data.materials)
|
|
|
|
# Build metadata dictionary
|
|
metadata = {
|
|
"frame_start": frame_start,
|
|
"frame_end": frame_end,
|
|
"render_settings": {
|
|
"resolution_x": resolution_x,
|
|
"resolution_y": resolution_y,
|
|
"samples": samples,
|
|
"output_format": output_format,
|
|
"engine": engine
|
|
},
|
|
"scene_info": {
|
|
"camera_count": camera_count,
|
|
"object_count": object_count,
|
|
"material_count": material_count
|
|
}
|
|
}
|
|
|
|
# Output as JSON
|
|
print(json.dumps(metadata))
|
|
sys.stdout.flush()
|
|
`
|
|
|
|
if err := os.WriteFile(scriptPath, []byte(scriptContent), 0644); err != nil {
|
|
c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, err.Error())
|
|
return fmt.Errorf("failed to create extraction script: %w", err)
|
|
}
|
|
|
|
// Execute Blender with Python script
|
|
cmd := exec.Command("blender", "-b", blendFile, "--python", scriptPath)
|
|
cmd.Dir = workDir
|
|
output, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
errMsg := fmt.Sprintf("blender metadata extraction failed: %v\nOutput: %s", err, string(output))
|
|
c.sendLog(taskID, types.LogLevelError, errMsg, "extract_metadata")
|
|
c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, errMsg)
|
|
return errors.New(errMsg)
|
|
}
|
|
|
|
// Parse output (metadata is printed to stdout)
|
|
metadataJSON := strings.TrimSpace(string(output))
|
|
// Extract JSON from output (Blender may print other stuff)
|
|
jsonStart := strings.Index(metadataJSON, "{")
|
|
jsonEnd := strings.LastIndex(metadataJSON, "}")
|
|
if jsonStart == -1 || jsonEnd == -1 || jsonEnd <= jsonStart {
|
|
errMsg := "Failed to extract JSON from Blender output"
|
|
c.sendLog(taskID, types.LogLevelError, errMsg, "extract_metadata")
|
|
c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, errMsg)
|
|
return errors.New(errMsg)
|
|
}
|
|
metadataJSON = metadataJSON[jsonStart : jsonEnd+1]
|
|
|
|
var metadata types.BlendMetadata
|
|
if err := json.Unmarshal([]byte(metadataJSON), &metadata); err != nil {
|
|
errMsg := fmt.Sprintf("Failed to parse metadata JSON: %v", err)
|
|
c.sendLog(taskID, types.LogLevelError, errMsg, "extract_metadata")
|
|
c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, errMsg)
|
|
return errors.New(errMsg)
|
|
}
|
|
|
|
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Metadata extracted: frames %d-%d, resolution %dx%d",
|
|
metadata.FrameStart, metadata.FrameEnd, metadata.RenderSettings.ResolutionX, metadata.RenderSettings.ResolutionY), "extract_metadata")
|
|
c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusCompleted, "")
|
|
|
|
// Step: submit_metadata
|
|
c.sendStepUpdate(taskID, "submit_metadata", types.StepStatusRunning, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, "Submitting metadata to manager...", "submit_metadata")
|
|
|
|
// Submit metadata to manager
|
|
if err := c.submitMetadata(jobID, metadata); err != nil {
|
|
errMsg := fmt.Sprintf("Failed to submit metadata: %v", err)
|
|
c.sendLog(taskID, types.LogLevelError, errMsg, "submit_metadata")
|
|
c.sendStepUpdate(taskID, "submit_metadata", types.StepStatusFailed, errMsg)
|
|
return errors.New(errMsg)
|
|
}
|
|
|
|
c.sendStepUpdate(taskID, "submit_metadata", types.StepStatusCompleted, "")
|
|
c.sendLog(taskID, types.LogLevelInfo, "Metadata extraction completed successfully", "")
|
|
|
|
// Mark task as complete
|
|
c.sendTaskComplete(taskID, "", true, "")
|
|
return nil
|
|
}
|
|
|
|
// submitMetadata submits extracted metadata to the manager
|
|
func (c *Client) submitMetadata(jobID int64, metadata types.BlendMetadata) error {
|
|
metadataJSON, err := json.Marshal(metadata)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal metadata: %w", err)
|
|
}
|
|
|
|
path := fmt.Sprintf("/api/runner/jobs/%d/metadata?runner_id=%d", jobID, c.runnerID)
|
|
timestamp := time.Now()
|
|
message := fmt.Sprintf("POST\n%s\n%s\n%d", path, string(metadataJSON), timestamp.Unix())
|
|
h := hmac.New(sha256.New, []byte(c.runnerSecret))
|
|
h.Write([]byte(message))
|
|
signature := hex.EncodeToString(h.Sum(nil))
|
|
|
|
url := fmt.Sprintf("%s%s", c.managerURL, path)
|
|
req, err := http.NewRequest("POST", url, bytes.NewReader(metadataJSON))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("X-Runner-Signature", signature)
|
|
req.Header.Set("X-Runner-Timestamp", fmt.Sprintf("%d", timestamp.Unix()))
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to submit metadata: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return fmt.Errorf("metadata submission failed: %s", string(body))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// completeTask marks a task as complete via WebSocket (or HTTP fallback)
|
|
func (c *Client) completeTask(taskID int64, outputPath string, success bool, errorMsg string) error {
|
|
return c.sendTaskComplete(taskID, outputPath, success, errorMsg)
|
|
}
|
|
|
|
// sendTaskComplete sends task completion via WebSocket
|
|
func (c *Client) sendTaskComplete(taskID int64, outputPath string, success bool, errorMsg string) error {
|
|
c.wsConnMu.RLock()
|
|
conn := c.wsConn
|
|
c.wsConnMu.RUnlock()
|
|
|
|
if conn != nil {
|
|
msg := map[string]interface{}{
|
|
"type": "task_complete",
|
|
"data": map[string]interface{}{
|
|
"task_id": taskID,
|
|
"output_path": outputPath,
|
|
"success": success,
|
|
"error": errorMsg,
|
|
},
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
if err := conn.WriteJSON(msg); err != nil {
|
|
return fmt.Errorf("failed to send task completion: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
return fmt.Errorf("WebSocket not connected, cannot complete task")
|
|
}
|