Files
jiggablend/internal/runner/client.go
2025-11-21 17:31:18 -06:00

628 lines
18 KiB
Go

package runner
import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log"
"mime/multipart"
"net/http"
"os"
"os/exec"
"path/filepath"
"sort"
"strings"
"time"
)
// Client represents a runner client
type Client struct {
managerURL string
name string
hostname string
ipAddress string
httpClient *http.Client
runnerID int64
runnerSecret string
managerSecret string
}
// NewClient creates a new runner client
func NewClient(managerURL, name, hostname, ipAddress string) *Client {
return &Client{
managerURL: managerURL,
name: name,
hostname: hostname,
ipAddress: ipAddress,
httpClient: &http.Client{Timeout: 30 * time.Second},
}
}
// SetSecrets sets the runner and manager secrets
func (c *Client) SetSecrets(runnerID int64, runnerSecret, managerSecret string) {
c.runnerID = runnerID
c.runnerSecret = runnerSecret
c.managerSecret = managerSecret
}
// Register registers the runner with the manager using a registration token
func (c *Client) Register(registrationToken string) (int64, string, string, error) {
req := map[string]interface{}{
"name": c.name,
"hostname": c.hostname,
"ip_address": c.ipAddress,
"capabilities": "blender,ffmpeg",
"registration_token": registrationToken,
}
body, _ := json.Marshal(req)
resp, err := c.httpClient.Post(
fmt.Sprintf("%s/api/runner/register", c.managerURL),
"application/json",
bytes.NewReader(body),
)
if err != nil {
return 0, "", "", fmt.Errorf("failed to register: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return 0, "", "", fmt.Errorf("registration failed: %s", string(body))
}
var result struct {
ID int64 `json:"id"`
RunnerSecret string `json:"runner_secret"`
ManagerSecret string `json:"manager_secret"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return 0, "", "", fmt.Errorf("failed to decode response: %w", err)
}
c.runnerID = result.ID
c.runnerSecret = result.RunnerSecret
c.managerSecret = result.ManagerSecret
return result.ID, result.RunnerSecret, result.ManagerSecret, nil
}
// signRequest signs a request with the runner secret
func (c *Client) signRequest(method, path string, body []byte) (string, time.Time) {
timestamp := time.Now()
message := fmt.Sprintf("%s\n%s\n%s\n%d", method, path, string(body), timestamp.Unix())
h := hmac.New(sha256.New, []byte(c.runnerSecret))
h.Write([]byte(message))
signature := hex.EncodeToString(h.Sum(nil))
return signature, timestamp
}
// doSignedRequest performs a signed HTTP request
func (c *Client) doSignedRequest(method, path string, body []byte) (*http.Response, error) {
if c.runnerSecret == "" {
return nil, fmt.Errorf("runner not authenticated")
}
signature, timestamp := c.signRequest(method, path, body)
req, err := http.NewRequest(method, fmt.Sprintf("%s%s", c.managerURL, path), bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Runner-Signature", signature)
req.Header.Set("X-Runner-Timestamp", fmt.Sprintf("%d", timestamp.Unix()))
return c.httpClient.Do(req)
}
// HeartbeatLoop sends periodic heartbeats to the manager
func (c *Client) HeartbeatLoop() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
req := map[string]interface{}{}
body, _ := json.Marshal(req)
resp, err := c.doSignedRequest("POST", "/api/runner/heartbeat?runner_id="+fmt.Sprintf("%d", c.runnerID), body)
if err != nil {
log.Printf("Heartbeat failed: %v", err)
continue
}
resp.Body.Close()
}
}
// ProcessTasks polls for tasks and processes them
func (c *Client) ProcessTasks() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
tasks, err := c.getTasks()
if err != nil {
log.Printf("Failed to get tasks: %v", err)
continue
}
for _, taskData := range tasks {
taskMap, ok := taskData["task"].(map[string]interface{})
if !ok {
continue
}
jobName, _ := taskData["job_name"].(string)
outputFormat, _ := taskData["output_format"].(string)
inputFilesRaw, _ := taskData["input_files"].([]interface{})
if len(inputFilesRaw) == 0 {
log.Printf("No input files for task %v", taskMap["id"])
continue
}
// Process the task
if err := c.processTask(taskMap, jobName, outputFormat, inputFilesRaw); err != nil {
taskID, _ := taskMap["id"].(float64)
log.Printf("Failed to process task %v: %v", taskID, err)
c.completeTask(int64(taskID), "", false, err.Error())
}
}
}
}
// getTasks fetches tasks from the manager
func (c *Client) getTasks() ([]map[string]interface{}, error) {
path := fmt.Sprintf("/api/runner/tasks?runner_id=%d", c.runnerID)
resp, err := c.doSignedRequest("GET", path, nil)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("failed to get tasks: %s", string(body))
}
var tasks []map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&tasks); err != nil {
return nil, err
}
return tasks, nil
}
// processTask processes a single task
func (c *Client) processTask(task map[string]interface{}, jobName, outputFormat string, inputFiles []interface{}) error {
taskID := int64(task["id"].(float64))
jobID := int64(task["job_id"].(float64))
frameStart := int(task["frame_start"].(float64))
frameEnd := int(task["frame_end"].(float64))
log.Printf("Processing task %d: job %d, frames %d-%d, format: %s", taskID, jobID, frameStart, frameEnd, outputFormat)
// Create work directory
workDir := filepath.Join(os.TempDir(), fmt.Sprintf("fuego-task-%d", taskID))
if err := os.MkdirAll(workDir, 0755); err != nil {
return fmt.Errorf("failed to create work directory: %w", err)
}
defer os.RemoveAll(workDir)
// Download input files
blendFile := ""
for _, filePath := range inputFiles {
filePathStr := filePath.(string)
if err := c.downloadFile(filePathStr, workDir); err != nil {
return fmt.Errorf("failed to download file %s: %w", filePathStr, err)
}
if filepath.Ext(filePathStr) == ".blend" {
blendFile = filepath.Join(workDir, filepath.Base(filePathStr))
}
}
if blendFile == "" {
return fmt.Errorf("no .blend file found in input files")
}
// Render frames
outputDir := filepath.Join(workDir, "output")
if err := os.MkdirAll(outputDir, 0755); err != nil {
return fmt.Errorf("failed to create output directory: %w", err)
}
// For MP4, render as PNG first, then combine into video
renderFormat := outputFormat
if outputFormat == "MP4" {
renderFormat = "PNG"
}
outputPattern := filepath.Join(outputDir, fmt.Sprintf("frame_%%04d.%s", strings.ToLower(renderFormat)))
// Execute Blender
cmd := exec.Command("blender", "-b", blendFile, "-o", outputPattern, "-f", fmt.Sprintf("%d", frameStart))
cmd.Dir = workDir
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("blender failed: %w\nOutput: %s", err, string(output))
}
// Find rendered output file
outputFile := filepath.Join(outputDir, fmt.Sprintf("frame_%04d.%s", frameStart, strings.ToLower(renderFormat)))
if _, err := os.Stat(outputFile); os.IsNotExist(err) {
return fmt.Errorf("output file not found: %s", outputFile)
}
// Upload frame file
outputPath, err := c.uploadFile(jobID, outputFile)
if err != nil {
return fmt.Errorf("failed to upload output: %w", err)
}
// Mark task as complete
if err := c.completeTask(taskID, outputPath, true, ""); err != nil {
return err
}
// For MP4 format, check if all frames are done and generate video
if outputFormat == "MP4" {
if err := c.checkAndGenerateMP4(jobID); err != nil {
log.Printf("Failed to generate MP4 for job %d: %v", jobID, err)
// Don't fail the task if video generation fails - frames are already uploaded
}
}
return nil
}
// checkAndGenerateMP4 checks if all frames are complete and generates MP4 if so
func (c *Client) checkAndGenerateMP4(jobID int64) error {
// Check job status
job, err := c.getJobStatus(jobID)
if err != nil {
return fmt.Errorf("failed to get job status: %w", err)
}
if job["status"] != "completed" {
log.Printf("Job %d not yet complete (%v), skipping MP4 generation", jobID, job["status"])
return nil
}
// Get all output files for this job
files, err := c.getJobFiles(jobID)
if err != nil {
return fmt.Errorf("failed to get job files: %w", err)
}
// Find all PNG frame files
var pngFiles []map[string]interface{}
for _, file := range files {
fileType, _ := file["file_type"].(string)
fileName, _ := file["file_name"].(string)
if fileType == "output" && strings.HasSuffix(fileName, ".png") {
pngFiles = append(pngFiles, file)
}
}
if len(pngFiles) == 0 {
return fmt.Errorf("no PNG frame files found for MP4 generation")
}
log.Printf("Generating MP4 for job %d from %d PNG frames", jobID, len(pngFiles))
// Create work directory for video generation
workDir := filepath.Join(os.TempDir(), fmt.Sprintf("fuego-video-%d", jobID))
if err := os.MkdirAll(workDir, 0755); err != nil {
return fmt.Errorf("failed to create work directory: %w", err)
}
defer os.RemoveAll(workDir)
// Download all PNG frames
var frameFiles []string
for _, file := range pngFiles {
fileName, _ := file["file_name"].(string)
framePath := filepath.Join(workDir, fileName)
if err := c.downloadFrameFile(jobID, fileName, framePath); err != nil {
log.Printf("Failed to download frame %s: %v", fileName, err)
continue
}
frameFiles = append(frameFiles, framePath)
}
if len(frameFiles) == 0 {
return fmt.Errorf("failed to download any frame files")
}
// Sort frame files by name to ensure correct order
sort.Strings(frameFiles)
// Generate MP4 using ffmpeg
outputMP4 := filepath.Join(workDir, fmt.Sprintf("output_%d.mp4", jobID))
// Use ffmpeg to combine frames into MP4
// Method 1: Using image sequence input (more reliable)
firstFrame := frameFiles[0]
// Extract frame number pattern (e.g., frame_0001.png -> frame_%04d.png)
baseName := filepath.Base(firstFrame)
pattern := strings.Replace(baseName, fmt.Sprintf("%04d", extractFrameNumber(baseName)), "%04d", 1)
patternPath := filepath.Join(workDir, pattern)
// Run ffmpeg to combine frames into MP4 at 24 fps
cmd := exec.Command("ffmpeg", "-y", "-framerate", "24", "-i", patternPath,
"-c:v", "libx264", "-pix_fmt", "yuv420p", "-r", "24", outputMP4)
cmd.Dir = workDir
output, err := cmd.CombinedOutput()
if err != nil {
// Try alternative method with concat demuxer
log.Printf("First ffmpeg attempt failed, trying concat method: %s", string(output))
return c.generateMP4WithConcat(frameFiles, outputMP4, workDir)
}
// Check if MP4 was created
if _, err := os.Stat(outputMP4); os.IsNotExist(err) {
return fmt.Errorf("MP4 file not created: %s", outputMP4)
}
// Upload MP4 file
mp4Path, err := c.uploadFile(jobID, outputMP4)
if err != nil {
return fmt.Errorf("failed to upload MP4: %w", err)
}
log.Printf("Successfully generated and uploaded MP4 for job %d: %s", jobID, mp4Path)
return nil
}
// generateMP4WithConcat uses ffmpeg concat demuxer as fallback
func (c *Client) generateMP4WithConcat(frameFiles []string, outputMP4, workDir string) error {
// Create file list for ffmpeg concat demuxer
listFile := filepath.Join(workDir, "frames.txt")
listFileHandle, err := os.Create(listFile)
if err != nil {
return fmt.Errorf("failed to create list file: %w", err)
}
for _, frameFile := range frameFiles {
absPath, _ := filepath.Abs(frameFile)
fmt.Fprintf(listFileHandle, "file '%s'\n", absPath)
}
listFileHandle.Close()
// Run ffmpeg with concat demuxer
cmd := exec.Command("ffmpeg", "-f", "concat", "-safe", "0", "-i", listFile,
"-c:v", "libx264", "-pix_fmt", "yuv420p", "-r", "24", "-y", outputMP4)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("ffmpeg concat failed: %w\nOutput: %s", err, string(output))
}
if _, err := os.Stat(outputMP4); os.IsNotExist(err) {
return fmt.Errorf("MP4 file not created: %s", outputMP4)
}
return nil
}
// extractFrameNumber extracts frame number from filename like "frame_0001.png"
func extractFrameNumber(filename string) int {
parts := strings.Split(filepath.Base(filename), "_")
if len(parts) < 2 {
return 0
}
framePart := strings.Split(parts[1], ".")[0]
var frameNum int
fmt.Sscanf(framePart, "%d", &frameNum)
return frameNum
}
// getJobStatus gets job status from manager
func (c *Client) getJobStatus(jobID int64) (map[string]interface{}, error) {
path := fmt.Sprintf("/api/runner/jobs/%d/status?runner_id=%d", jobID, c.runnerID)
resp, err := c.doSignedRequest("GET", path, nil)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("failed to get job status: %s", string(body))
}
var job map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&job); err != nil {
return nil, err
}
return job, nil
}
// getJobFiles gets job files from manager
func (c *Client) getJobFiles(jobID int64) ([]map[string]interface{}, error) {
path := fmt.Sprintf("/api/runner/jobs/%d/files?runner_id=%d", jobID, c.runnerID)
resp, err := c.doSignedRequest("GET", path, nil)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("failed to get job files: %s", string(body))
}
var files []map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&files); err != nil {
return nil, err
}
return files, nil
}
// downloadFrameFile downloads a frame file for MP4 generation
func (c *Client) downloadFrameFile(jobID int64, fileName, destPath string) error {
path := fmt.Sprintf("/api/runner/files/%d/%s?runner_id=%d", jobID, fileName, c.runnerID)
resp, err := c.doSignedRequest("GET", path, nil)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("download failed: %s", string(body))
}
file, err := os.Create(destPath)
if err != nil {
return err
}
defer file.Close()
_, err = io.Copy(file, resp.Body)
return err
}
// downloadFile downloads a file from the manager
func (c *Client) downloadFile(filePath, destDir string) error {
// Extract job ID and filename from path
// Path format: storage/jobs/{jobID}/{filename}
parts := filepath.SplitList(filePath)
if len(parts) < 3 {
return fmt.Errorf("invalid file path format: %s", filePath)
}
// Find job ID in path (look for "jobs" directory)
jobID := ""
fileName := filepath.Base(filePath)
for i, part := range parts {
if part == "jobs" && i+1 < len(parts) {
jobID = parts[i+1]
break
}
}
if jobID == "" {
return fmt.Errorf("could not extract job ID from path: %s", filePath)
}
// Download via HTTP
path := fmt.Sprintf("/api/runner/files/%s/%s?runner_id=%d", jobID, fileName, c.runnerID)
resp, err := c.doSignedRequest("GET", path, nil)
if err != nil {
return fmt.Errorf("failed to download file: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("download failed: %s", string(body))
}
destPath := filepath.Join(destDir, fileName)
file, err := os.Create(destPath)
if err != nil {
return fmt.Errorf("failed to create destination file: %w", err)
}
defer file.Close()
_, err = io.Copy(file, resp.Body)
return err
}
// uploadFile uploads a file to the manager
func (c *Client) uploadFile(jobID int64, filePath string) (string, error) {
file, err := os.Open(filePath)
if err != nil {
return "", fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
// Create multipart form
var buf bytes.Buffer
formWriter := multipart.NewWriter(&buf)
part, err := formWriter.CreateFormFile("file", filepath.Base(filePath))
if err != nil {
return "", fmt.Errorf("failed to create form file: %w", err)
}
_, err = io.Copy(part, file)
if err != nil {
return "", fmt.Errorf("failed to copy file data: %w", err)
}
formWriter.Close()
// Upload file with signature
path := fmt.Sprintf("/api/runner/files/%d/upload?runner_id=%d", jobID, c.runnerID)
timestamp := time.Now()
message := fmt.Sprintf("POST\n%s\n%s\n%d", path, buf.String(), timestamp.Unix())
h := hmac.New(sha256.New, []byte(c.runnerSecret))
h.Write([]byte(message))
signature := hex.EncodeToString(h.Sum(nil))
url := fmt.Sprintf("%s%s", c.managerURL, path)
req, err := http.NewRequest("POST", url, &buf)
if err != nil {
return "", fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", formWriter.FormDataContentType())
req.Header.Set("X-Runner-Signature", signature)
req.Header.Set("X-Runner-Timestamp", fmt.Sprintf("%d", timestamp.Unix()))
resp, err := c.httpClient.Do(req)
if err != nil {
return "", fmt.Errorf("failed to upload file: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return "", fmt.Errorf("upload failed: %s", string(body))
}
var result struct {
FilePath string `json:"file_path"`
FileName string `json:"file_name"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", fmt.Errorf("failed to decode response: %w", err)
}
return result.FilePath, nil
}
// completeTask marks a task as complete
func (c *Client) completeTask(taskID int64, outputPath string, success bool, errorMsg string) error {
req := map[string]interface{}{
"output_path": outputPath,
"success": success,
}
if !success {
req["error"] = errorMsg
}
body, _ := json.Marshal(req)
path := fmt.Sprintf("/api/runner/tasks/%d/complete?runner_id=%d", taskID, c.runnerID)
resp, err := c.doSignedRequest("POST", path, body)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to complete task: %s", string(body))
}
return nil
}