Files
jiggablend/internal/runner/client.go

3014 lines
103 KiB
Go

package runner
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"mime/multipart"
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
"regexp"
"sort"
"strings"
"sync"
"time"
"jiggablend/pkg/types"
"github.com/gorilla/websocket"
)
// Client represents a runner client
type Client struct {
managerURL string
name string
hostname string
ipAddress string
httpClient *http.Client
runnerID int64
runnerSecret string
managerSecret string
wsConn *websocket.Conn
wsConnMu sync.RWMutex
wsWriteMu sync.Mutex // Protects concurrent writes to WebSocket (WebSocket is not thread-safe)
stopChan chan struct{}
stepStartTimes map[string]time.Time // key: "taskID:stepName"
stepTimesMu sync.RWMutex
workspaceDir string // Persistent workspace directory for this runner
runningProcs sync.Map // map[int64]*exec.Cmd - tracks running processes by task ID
capabilities map[string]interface{} // Cached capabilities from initial probe (includes bools and numbers)
capabilitiesMu sync.RWMutex // Protects capabilities
hwAccelCache map[string]bool // Cached hardware acceleration detection results
hwAccelCacheMu sync.RWMutex // Protects hwAccelCache
vaapiDevices []string // Cached VAAPI device paths (all available devices)
vaapiDevicesMu sync.RWMutex // Protects vaapiDevices
allocatedDevices map[int64]string // map[taskID]device - tracks which device is allocated to which task
allocatedDevicesMu sync.RWMutex // Protects allocatedDevices
}
// NewClient creates a new runner client
func NewClient(managerURL, name, hostname, ipAddress string) *Client {
return &Client{
managerURL: managerURL,
name: name,
hostname: hostname,
ipAddress: ipAddress,
httpClient: &http.Client{Timeout: 30 * time.Second},
stopChan: make(chan struct{}),
stepStartTimes: make(map[string]time.Time),
}
}
// SetSecrets sets the runner and manager secrets
func (c *Client) SetSecrets(runnerID int64, runnerSecret, managerSecret string) {
c.runnerID = runnerID
c.runnerSecret = runnerSecret
c.managerSecret = managerSecret
// Initialize runner workspace directory if not already initialized
if c.workspaceDir == "" {
c.initWorkspace()
}
}
// initWorkspace creates the persistent workspace directory for this runner
func (c *Client) initWorkspace() {
// Use runner name if available, otherwise use runner ID
workspaceName := c.name
if workspaceName == "" {
workspaceName = fmt.Sprintf("runner-%d", c.runnerID)
}
// Sanitize workspace name (remove invalid characters)
workspaceName = strings.ReplaceAll(workspaceName, " ", "_")
workspaceName = strings.ReplaceAll(workspaceName, "/", "_")
workspaceName = strings.ReplaceAll(workspaceName, "\\", "_")
workspaceName = strings.ReplaceAll(workspaceName, ":", "_")
// Create workspace in a jiggablend directory under temp or current directory
baseDir := os.TempDir()
if cwd, err := os.Getwd(); err == nil {
// Prefer current directory if writable
baseDir = cwd
}
c.workspaceDir = filepath.Join(baseDir, "jiggablend-workspaces", workspaceName)
if err := os.MkdirAll(c.workspaceDir, 0755); err != nil {
log.Printf("Warning: Failed to create workspace directory %s: %v", c.workspaceDir, err)
// Fallback to temp directory
c.workspaceDir = filepath.Join(os.TempDir(), "jiggablend-workspaces", workspaceName)
if err := os.MkdirAll(c.workspaceDir, 0755); err != nil {
log.Printf("Error: Failed to create fallback workspace directory: %v", err)
// Last resort: use temp directory with runner ID
c.workspaceDir = filepath.Join(os.TempDir(), fmt.Sprintf("jiggablend-runner-%d", c.runnerID))
os.MkdirAll(c.workspaceDir, 0755)
}
}
log.Printf("Runner workspace initialized at: %s", c.workspaceDir)
}
// getWorkspaceDir returns the workspace directory, initializing it if needed
func (c *Client) getWorkspaceDir() string {
if c.workspaceDir == "" {
c.initWorkspace()
}
return c.workspaceDir
}
// probeCapabilities checks what capabilities the runner has by probing for blender and ffmpeg
// Returns a map that includes both boolean capabilities and numeric values (like GPU count)
func (c *Client) probeCapabilities() map[string]interface{} {
capabilities := make(map[string]interface{})
// Check for blender
blenderCmd := exec.Command("blender", "--version")
if err := blenderCmd.Run(); err == nil {
capabilities["blender"] = true
} else {
capabilities["blender"] = false
}
// Check for ffmpeg
ffmpegCmd := exec.Command("ffmpeg", "-version")
if err := ffmpegCmd.Run(); err == nil {
capabilities["ffmpeg"] = true
// Immediately probe GPU capabilities when ffmpeg is detected
log.Printf("FFmpeg detected, probing GPU hardware acceleration capabilities...")
c.probeGPUCapabilities(capabilities)
} else {
capabilities["ffmpeg"] = false
// Set defaults when ffmpeg is not available
capabilities["vaapi"] = false
capabilities["vaapi_gpu_count"] = 0
capabilities["nvenc"] = false
capabilities["nvenc_gpu_count"] = 0
capabilities["video_gpu_count"] = 0
}
return capabilities
}
// probeGPUCapabilities probes GPU hardware acceleration capabilities for ffmpeg
// This is called immediately after detecting ffmpeg during initial capability probe
func (c *Client) probeGPUCapabilities(capabilities map[string]interface{}) {
// First, probe all available hardware acceleration methods
log.Printf("Probing all hardware acceleration methods...")
hwaccels := c.probeAllHardwareAccelerators()
if len(hwaccels) > 0 {
log.Printf("Available hardware acceleration methods: %v", getKeys(hwaccels))
} else {
log.Printf("No hardware acceleration methods found")
}
// Probe all hardware encoders
log.Printf("Probing all hardware encoders...")
hwEncoders := c.probeAllHardwareEncoders()
if len(hwEncoders) > 0 {
log.Printf("Available hardware encoders: %v", getKeys(hwEncoders))
}
// Check for VAAPI devices and count them
log.Printf("Checking for VAAPI hardware acceleration...")
// First check if encoder is listed (more reliable than testing)
cmd := exec.Command("ffmpeg", "-hide_banner", "-encoders")
output, err := cmd.CombinedOutput()
hasVAAPIEncoder := false
if err == nil {
encoderOutput := string(output)
if strings.Contains(encoderOutput, "h264_vaapi") {
hasVAAPIEncoder = true
log.Printf("VAAPI encoder (h264_vaapi) found in ffmpeg encoders list")
}
}
if hasVAAPIEncoder {
// Try to find and test devices
vaapiDevices := c.findVAAPIDevices()
capabilities["vaapi_gpu_count"] = len(vaapiDevices)
if len(vaapiDevices) > 0 {
capabilities["vaapi"] = true
log.Printf("VAAPI detected: %d GPU device(s) available: %v", len(vaapiDevices), vaapiDevices)
} else {
capabilities["vaapi"] = false
log.Printf("VAAPI encoder available but no working devices found")
log.Printf(" This might indicate:")
log.Printf(" - Missing or incorrect GPU drivers")
log.Printf(" - Missing libva or mesa-va-drivers packages")
log.Printf(" - Permission issues accessing /dev/dri devices")
log.Printf(" - GPU not properly initialized")
}
} else {
capabilities["vaapi"] = false
capabilities["vaapi_gpu_count"] = 0
log.Printf("VAAPI encoder not available in ffmpeg")
log.Printf(" This might indicate:")
log.Printf(" - FFmpeg was not compiled with VAAPI support")
log.Printf(" - Missing libva development libraries during FFmpeg compilation")
}
// Check for NVENC (NVIDIA) - try to detect multiple GPUs
log.Printf("Checking for NVENC hardware acceleration...")
if c.checkEncoderAvailable("h264_nvenc") {
capabilities["nvenc"] = true
// Try to detect actual GPU count using nvidia-smi if available
nvencCount := c.detectNVENCCount()
capabilities["nvenc_gpu_count"] = nvencCount
log.Printf("NVENC detected: %d GPU(s)", nvencCount)
} else {
capabilities["nvenc"] = false
capabilities["nvenc_gpu_count"] = 0
log.Printf("NVENC encoder not available")
}
// Check for other hardware encoders (for completeness)
log.Printf("Checking for other hardware encoders...")
if c.checkEncoderAvailable("h264_qsv") {
capabilities["qsv"] = true
capabilities["qsv_gpu_count"] = 1
log.Printf("Intel Quick Sync (QSV) detected")
} else {
capabilities["qsv"] = false
capabilities["qsv_gpu_count"] = 0
}
if c.checkEncoderAvailable("h264_videotoolbox") {
capabilities["videotoolbox"] = true
capabilities["videotoolbox_gpu_count"] = 1
log.Printf("VideoToolbox (macOS) detected")
} else {
capabilities["videotoolbox"] = false
capabilities["videotoolbox_gpu_count"] = 0
}
if c.checkEncoderAvailable("h264_amf") {
capabilities["amf"] = true
capabilities["amf_gpu_count"] = 1
log.Printf("AMD AMF detected")
} else {
capabilities["amf"] = false
capabilities["amf_gpu_count"] = 0
}
// Check for V4L2M2M (Video4Linux2)
if c.checkEncoderAvailable("h264_v4l2m2m") {
capabilities["v4l2m2m"] = true
capabilities["v4l2m2m_gpu_count"] = 1
log.Printf("V4L2 M2M detected")
} else {
capabilities["v4l2m2m"] = false
capabilities["v4l2m2m_gpu_count"] = 0
}
// Check for OpenMAX (Raspberry Pi)
if c.checkEncoderAvailable("h264_omx") {
capabilities["omx"] = true
capabilities["omx_gpu_count"] = 1
log.Printf("OpenMAX detected")
} else {
capabilities["omx"] = false
capabilities["omx_gpu_count"] = 0
}
// Check for MediaCodec (Android)
if c.checkEncoderAvailable("h264_mediacodec") {
capabilities["mediacodec"] = true
capabilities["mediacodec_gpu_count"] = 1
log.Printf("MediaCodec detected")
} else {
capabilities["mediacodec"] = false
capabilities["mediacodec_gpu_count"] = 0
}
// Calculate total GPU count for video encoding
// Priority: VAAPI > NVENC > QSV > VideoToolbox > AMF > others
vaapiCount := 0
if count, ok := capabilities["vaapi_gpu_count"].(int); ok {
vaapiCount = count
}
nvencCount := 0
if count, ok := capabilities["nvenc_gpu_count"].(int); ok {
nvencCount = count
}
qsvCount := 0
if count, ok := capabilities["qsv_gpu_count"].(int); ok {
qsvCount = count
}
videotoolboxCount := 0
if count, ok := capabilities["videotoolbox_gpu_count"].(int); ok {
videotoolboxCount = count
}
amfCount := 0
if count, ok := capabilities["amf_gpu_count"].(int); ok {
amfCount = count
}
// Total GPU count - use the best available (they can't be used simultaneously)
totalGPUs := vaapiCount
if totalGPUs == 0 {
totalGPUs = nvencCount
}
if totalGPUs == 0 {
totalGPUs = qsvCount
}
if totalGPUs == 0 {
totalGPUs = videotoolboxCount
}
if totalGPUs == 0 {
totalGPUs = amfCount
}
capabilities["video_gpu_count"] = totalGPUs
if totalGPUs > 0 {
log.Printf("Total video GPU count: %d", totalGPUs)
} else {
log.Printf("No hardware-accelerated video encoding GPUs detected (will use software encoding)")
}
}
// detectNVENCCount tries to detect the actual number of NVIDIA GPUs using nvidia-smi
func (c *Client) detectNVENCCount() int {
// Try to use nvidia-smi to count GPUs
cmd := exec.Command("nvidia-smi", "--list-gpus")
output, err := cmd.CombinedOutput()
if err == nil {
// Count lines that contain "GPU" (each GPU is listed on a separate line)
lines := strings.Split(string(output), "\n")
count := 0
for _, line := range lines {
if strings.Contains(line, "GPU") {
count++
}
}
if count > 0 {
return count
}
}
// Fallback to 1 if nvidia-smi is not available
return 1
}
// getKeys returns all keys from a map as a slice (helper function)
func getKeys(m map[string]bool) []string {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
return keys
}
// ProbeCapabilities probes and caches capabilities (should be called once at startup)
func (c *Client) ProbeCapabilities() {
capabilities := c.probeCapabilities()
c.capabilitiesMu.Lock()
c.capabilities = capabilities
c.capabilitiesMu.Unlock()
}
// GetCapabilities returns the cached capabilities
func (c *Client) GetCapabilities() map[string]interface{} {
c.capabilitiesMu.RLock()
defer c.capabilitiesMu.RUnlock()
// Return a copy to prevent external modification
result := make(map[string]interface{})
for k, v := range c.capabilities {
result[k] = v
}
return result
}
// Register registers the runner with the manager using a registration token
func (c *Client) Register(registrationToken string) (int64, string, string, error) {
// Use cached capabilities (should have been probed once at startup)
c.capabilitiesMu.RLock()
capabilities := c.capabilities
c.capabilitiesMu.RUnlock()
// If capabilities weren't probed yet, probe them now (fallback)
if capabilities == nil {
capabilities = c.probeCapabilities()
c.capabilitiesMu.Lock()
c.capabilities = capabilities
c.capabilitiesMu.Unlock()
}
capabilitiesJSON, err := json.Marshal(capabilities)
if err != nil {
return 0, "", "", fmt.Errorf("failed to marshal capabilities: %w", err)
}
req := map[string]interface{}{
"name": c.name,
"hostname": c.hostname,
"ip_address": c.ipAddress,
"capabilities": string(capabilitiesJSON),
"registration_token": registrationToken,
}
body, _ := json.Marshal(req)
resp, err := c.httpClient.Post(
fmt.Sprintf("%s/api/runner/register", c.managerURL),
"application/json",
bytes.NewReader(body),
)
if err != nil {
// Network/connection error - should retry
return 0, "", "", fmt.Errorf("connection error: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
bodyBytes, _ := io.ReadAll(resp.Body)
errorBody := string(bodyBytes)
// Check if it's a token-related error (should not retry)
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusBadRequest {
// Check error message for token-related issues
errorLower := strings.ToLower(errorBody)
if strings.Contains(errorLower, "invalid") ||
strings.Contains(errorLower, "expired") ||
strings.Contains(errorLower, "already used") ||
strings.Contains(errorLower, "token") {
return 0, "", "", fmt.Errorf("token error: %s", errorBody)
}
}
// Other errors (like 500) might be retryable
return 0, "", "", fmt.Errorf("registration failed (status %d): %s", resp.StatusCode, errorBody)
}
var result struct {
ID int64 `json:"id"`
RunnerSecret string `json:"runner_secret"`
ManagerSecret string `json:"manager_secret"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return 0, "", "", fmt.Errorf("failed to decode response: %w", err)
}
c.runnerID = result.ID
c.runnerSecret = result.RunnerSecret
c.managerSecret = result.ManagerSecret
return result.ID, result.RunnerSecret, result.ManagerSecret, nil
}
// doSignedRequest performs an authenticated HTTP request using shared secret
// queryParams is optional and will be appended to the URL
func (c *Client) doSignedRequest(method, path string, body []byte, queryParams ...string) (*http.Response, error) {
if c.runnerSecret == "" {
return nil, fmt.Errorf("runner not authenticated")
}
// Build URL with query params if provided
url := fmt.Sprintf("%s%s", c.managerURL, path)
if len(queryParams) > 0 {
url += "?" + strings.Join(queryParams, "&")
}
req, err := http.NewRequest(method, url, bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Runner-Secret", c.runnerSecret)
return c.httpClient.Do(req)
}
// ConnectWebSocket establishes a WebSocket connection to the manager
func (c *Client) ConnectWebSocket() error {
if c.runnerID == 0 || c.runnerSecret == "" {
return fmt.Errorf("runner not authenticated")
}
// Build WebSocket URL with authentication
path := "/api/runner/ws"
// Convert HTTP URL to WebSocket URL
wsURL := strings.Replace(c.managerURL, "http://", "ws://", 1)
wsURL = strings.Replace(wsURL, "https://", "wss://", 1)
wsURL = fmt.Sprintf("%s%s?runner_id=%d&secret=%s",
wsURL, path, c.runnerID, url.QueryEscape(c.runnerSecret))
// Parse URL
u, err := url.Parse(wsURL)
if err != nil {
return fmt.Errorf("invalid WebSocket URL: %w", err)
}
// Connect
dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
}
conn, _, err := dialer.Dial(u.String(), nil)
if err != nil {
return fmt.Errorf("failed to connect WebSocket: %w", err)
}
c.wsConnMu.Lock()
if c.wsConn != nil {
c.wsConn.Close()
}
c.wsConn = conn
c.wsConnMu.Unlock()
log.Printf("WebSocket connected to manager")
return nil
}
// ConnectWebSocketWithReconnect connects with automatic reconnection
func (c *Client) ConnectWebSocketWithReconnect() {
backoff := 1 * time.Second
maxBackoff := 60 * time.Second
for {
err := c.ConnectWebSocket()
if err == nil {
backoff = 1 * time.Second // Reset on success
c.HandleWebSocketMessages()
} else {
log.Printf("WebSocket connection failed: %v, retrying in %v", err, backoff)
time.Sleep(backoff)
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
// Check if we should stop
select {
case <-c.stopChan:
return
default:
}
}
}
// HandleWebSocketMessages handles incoming WebSocket messages
func (c *Client) HandleWebSocketMessages() {
c.wsConnMu.Lock()
conn := c.wsConn
c.wsConnMu.Unlock()
if conn == nil {
return
}
// Set pong handler to respond to ping messages
// Also reset read deadline to keep connection alive
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(90 * time.Second)) // Increased to 90 seconds
return nil
})
// Set ping handler to respond with pong
// Also reset read deadline to keep connection alive
conn.SetPingHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(90 * time.Second)) // Increased to 90 seconds
// Respond to ping with pong - protect with write mutex
c.wsWriteMu.Lock()
defer c.wsWriteMu.Unlock()
return conn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(10*time.Second))
})
// Set read deadline to ensure we process control frames
conn.SetReadDeadline(time.Now().Add(90 * time.Second)) // Increased to 90 seconds
// Handle messages
for {
// Reset read deadline for each message to allow ping/pong processing
conn.SetReadDeadline(time.Now().Add(90 * time.Second)) // Increased to 90 seconds
var msg map[string]interface{}
err := conn.ReadJSON(&msg)
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("WebSocket error: %v", err)
}
c.wsConnMu.Lock()
c.wsConn = nil
c.wsConnMu.Unlock()
return
}
// Reset read deadline after successfully reading a message
// This ensures the connection stays alive as long as we're receiving messages
conn.SetReadDeadline(time.Now().Add(90 * time.Second))
msgType, _ := msg["type"].(string)
switch msgType {
case "task_assignment":
c.handleTaskAssignment(msg)
case "ping":
// Respond to ping with pong (automatic)
}
}
}
// handleTaskAssignment handles a task assignment message
func (c *Client) handleTaskAssignment(msg map[string]interface{}) {
data, ok := msg["data"].(map[string]interface{})
if !ok {
log.Printf("Invalid task assignment message")
return
}
taskID, _ := data["task_id"].(float64)
jobID, _ := data["job_id"].(float64)
jobName, _ := data["job_name"].(string)
outputFormat, _ := data["output_format"].(string)
frameStart, _ := data["frame_start"].(float64)
frameEnd, _ := data["frame_end"].(float64)
taskType, _ := data["task_type"].(string)
inputFilesRaw, _ := data["input_files"].([]interface{})
// Log that task assignment was received
taskIDInt := int64(taskID)
c.sendLog(taskIDInt, types.LogLevelInfo, fmt.Sprintf("Task assignment received from manager (job: %d, type: %s, frames: %d-%d)", int64(jobID), taskType, int(frameStart), int(frameEnd)), "")
// Convert to task map format
taskMap := map[string]interface{}{
"id": taskID,
"job_id": jobID,
"frame_start": frameStart,
"frame_end": frameEnd,
}
// Process the task based on type
go func() {
var err error
switch taskType {
case "metadata":
if len(inputFilesRaw) == 0 {
log.Printf("No input files for metadata task %v", taskID)
c.sendTaskComplete(int64(taskID), "", false, "No input files")
return
}
err = c.processMetadataTask(taskMap, int64(jobID), inputFilesRaw)
case "video_generation":
err = c.processVideoGenerationTask(taskMap, int64(jobID))
default:
if len(inputFilesRaw) == 0 {
errMsg := fmt.Sprintf("No input files provided for task %d (job %d). Task assignment data: job_name=%s, output_format=%s, task_type=%s",
int64(taskID), int64(jobID), jobName, outputFormat, taskType)
log.Printf("ERROR: %s", errMsg)
c.sendLog(int64(taskID), types.LogLevelError, errMsg, "")
c.sendTaskComplete(int64(taskID), "", false, "No input files provided")
return
}
log.Printf("Processing render task %d with %d input files: %v", int64(taskID), len(inputFilesRaw), inputFilesRaw)
err = c.processTask(taskMap, jobName, outputFormat, inputFilesRaw)
}
if err != nil {
errMsg := fmt.Sprintf("Task %d failed: %v", int64(taskID), err)
log.Printf("ERROR: %s", errMsg)
c.sendLog(int64(taskID), types.LogLevelError, errMsg, "")
c.sendTaskComplete(int64(taskID), "", false, err.Error())
}
}()
}
// HeartbeatLoop sends periodic heartbeats via WebSocket
func (c *Client) HeartbeatLoop() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
c.wsConnMu.RLock()
conn := c.wsConn
c.wsConnMu.RUnlock()
if conn != nil {
// Send heartbeat via WebSocket - protect with write mutex
c.wsWriteMu.Lock()
msg := map[string]interface{}{
"type": "heartbeat",
"timestamp": time.Now().Unix(),
}
err := conn.WriteJSON(msg)
c.wsWriteMu.Unlock()
if err != nil {
log.Printf("Failed to send heartbeat: %v", err)
}
}
}
}
// shouldFilterBlenderLog checks if a Blender log line should be filtered or downgraded
// Returns (shouldFilter, logLevel) - if shouldFilter is true, the log should be skipped
func shouldFilterBlenderLog(line string) (bool, types.LogLevel) {
// Filter out common Blender dependency graph noise
trimmed := strings.TrimSpace(line)
// Filter out empty lines
if trimmed == "" {
return true, types.LogLevelInfo
}
// Filter out separator lines (check both original and trimmed)
if trimmed == "--------------------------------------------------------------------" ||
strings.HasPrefix(trimmed, "-----") && strings.Contains(trimmed, "----") {
return true, types.LogLevelInfo
}
// Filter out trace headers (check both original and trimmed, case-insensitive)
upperLine := strings.ToUpper(trimmed)
upperOriginal := strings.ToUpper(line)
// Check for "Depth Type Name" - match even if words are separated by different spacing
if trimmed == "Trace:" ||
trimmed == "Depth Type Name" ||
trimmed == "----- ---- ----" ||
line == "Depth Type Name" ||
line == "----- ---- ----" ||
(strings.Contains(upperLine, "DEPTH") && strings.Contains(upperLine, "TYPE") && strings.Contains(upperLine, "NAME")) ||
(strings.Contains(upperOriginal, "DEPTH") && strings.Contains(upperOriginal, "TYPE") && strings.Contains(upperOriginal, "NAME")) ||
strings.Contains(line, "Depth Type Name") ||
strings.Contains(line, "----- ---- ----") ||
strings.HasPrefix(trimmed, "-----") ||
regexp.MustCompile(`^[-]+\s+[-]+\s+[-]+$`).MatchString(trimmed) {
return true, types.LogLevelInfo
}
// Completely filter out dependency graph messages (they're just noise)
dependencyGraphPatterns := []string{
"Failed to add relation",
"Could not find op_from",
"OperationKey",
"find_node_operation: Failed for",
"BONE_DONE",
"component name:",
"operation code:",
"rope_ctrl_rot_",
}
for _, pattern := range dependencyGraphPatterns {
if strings.Contains(line, pattern) {
return true, types.LogLevelInfo // Completely filter out
}
}
// Filter out animation system warnings (invalid drivers are common and harmless)
animationSystemPatterns := []string{
"BKE_animsys_eval_driver: invalid driver",
"bke.anim_sys",
"rotation_quaternion[",
"constraints[",
".influence[0]",
"pose.bones[",
}
for _, pattern := range animationSystemPatterns {
if strings.Contains(line, pattern) {
return true, types.LogLevelInfo // Completely filter out
}
}
// Filter out modifier warnings (common when vertices change)
modifierPatterns := []string{
"BKE_modifier_set_error",
"bke.modifier",
"Vertices changed from",
"Modifier:",
}
for _, pattern := range modifierPatterns {
if strings.Contains(line, pattern) {
return true, types.LogLevelInfo // Completely filter out
}
}
// Filter out lines that are just numbers or trace depth indicators
// Pattern: number, word, word (e.g., "1 Object timer_box_franck")
if matched, _ := regexp.MatchString(`^\d+\s+\w+\s+\w+`, trimmed); matched {
return true, types.LogLevelInfo
}
return false, types.LogLevelInfo
}
// sendLog sends a log entry to the manager via WebSocket
func (c *Client) sendLog(taskID int64, logLevel types.LogLevel, message, stepName string) {
c.wsConnMu.RLock()
conn := c.wsConn
c.wsConnMu.RUnlock()
if conn != nil {
// Serialize all WebSocket writes to prevent concurrent write panics
c.wsWriteMu.Lock()
defer c.wsWriteMu.Unlock()
msg := map[string]interface{}{
"type": "log_entry",
"data": map[string]interface{}{
"task_id": taskID,
"log_level": string(logLevel),
"message": message,
"step_name": stepName,
},
"timestamp": time.Now().Unix(),
}
if err := conn.WriteJSON(msg); err != nil {
log.Printf("Failed to send log: %v", err)
}
} else {
log.Printf("WebSocket not connected, cannot send log")
}
}
// KillAllProcesses kills all running processes tracked by this client
func (c *Client) KillAllProcesses() {
log.Printf("Killing all running processes...")
var killedCount int
c.runningProcs.Range(func(key, value interface{}) bool {
taskID := key.(int64)
cmd := value.(*exec.Cmd)
if cmd.Process != nil {
log.Printf("Killing process for task %d (PID: %d)", taskID, cmd.Process.Pid)
// Try graceful kill first (SIGTERM)
if err := cmd.Process.Signal(os.Interrupt); err != nil {
log.Printf("Failed to send SIGINT to process %d: %v", cmd.Process.Pid, err)
}
// Give it a moment to clean up
time.Sleep(100 * time.Millisecond)
// Force kill if still running
if err := cmd.Process.Kill(); err != nil {
log.Printf("Failed to kill process %d: %v", cmd.Process.Pid, err)
} else {
killedCount++
}
}
// Release any allocated device for this task
c.releaseVAAPIDevice(taskID)
return true
})
log.Printf("Killed %d process(es)", killedCount)
}
// sendStepUpdate sends a step start/complete event to the manager
func (c *Client) sendStepUpdate(taskID int64, stepName string, status types.StepStatus, errorMsg string) {
key := fmt.Sprintf("%d:%s", taskID, stepName)
var durationMs *int
// Track step start time
if status == types.StepStatusRunning {
c.stepTimesMu.Lock()
c.stepStartTimes[key] = time.Now()
c.stepTimesMu.Unlock()
}
// Calculate duration if step is completing
if status == types.StepStatusCompleted || status == types.StepStatusFailed {
c.stepTimesMu.RLock()
startTime, exists := c.stepStartTimes[key]
c.stepTimesMu.RUnlock()
if exists {
duration := int(time.Since(startTime).Milliseconds())
durationMs = &duration
c.stepTimesMu.Lock()
delete(c.stepStartTimes, key)
c.stepTimesMu.Unlock()
}
}
// Send step update via HTTP API
reqBody := map[string]interface{}{
"step_name": stepName,
"status": string(status),
}
if durationMs != nil {
reqBody["duration_ms"] = *durationMs
}
if errorMsg != "" {
reqBody["error_message"] = errorMsg
}
body, _ := json.Marshal(reqBody)
// Sign with path only (without query params) to match manager verification
path := fmt.Sprintf("/api/runner/tasks/%d/steps", taskID)
resp, err := c.doSignedRequest("POST", path, body, fmt.Sprintf("runner_id=%d", c.runnerID))
if err != nil {
log.Printf("Failed to send step update: %v", err)
// Fallback to log-based tracking
msg := fmt.Sprintf("Step %s: %s", stepName, status)
if errorMsg != "" {
msg += " - " + errorMsg
}
logLevel := types.LogLevelInfo
if status == types.StepStatusFailed {
logLevel = types.LogLevelError
}
c.sendLog(taskID, logLevel, msg, stepName)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
log.Printf("Step update failed: %s", string(body))
// Fallback to log-based tracking
msg := fmt.Sprintf("Step %s: %s", stepName, status)
if errorMsg != "" {
msg += " - " + errorMsg
}
logLevel := types.LogLevelInfo
if status == types.StepStatusFailed {
logLevel = types.LogLevelError
}
c.sendLog(taskID, logLevel, msg, stepName)
return
}
// Also send log for debugging
msg := fmt.Sprintf("Step %s: %s", stepName, status)
if errorMsg != "" {
msg += " - " + errorMsg
}
logLevel := types.LogLevelInfo
if status == types.StepStatusFailed {
logLevel = types.LogLevelError
}
c.sendLog(taskID, logLevel, msg, stepName)
}
// processTask processes a single task
func (c *Client) processTask(task map[string]interface{}, jobName string, outputFormat string, inputFiles []interface{}) error {
_ = jobName
taskID := int64(task["id"].(float64))
jobID := int64(task["job_id"].(float64))
frameStart := int(task["frame_start"].(float64))
frameEnd := int(task["frame_end"].(float64))
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting task: job %d, frames %d-%d, format: %s", jobID, frameStart, frameEnd, outputFormat), "")
log.Printf("Processing task %d: job %d, frames %d-%d, format: %s", taskID, jobID, frameStart, frameEnd, outputFormat)
// Create temporary job workspace within runner workspace
workDir := filepath.Join(c.getWorkspaceDir(), fmt.Sprintf("job-%d-task-%d", jobID, taskID))
if err := os.MkdirAll(workDir, 0755); err != nil {
return fmt.Errorf("failed to create work directory: %w", err)
}
defer os.RemoveAll(workDir)
// Step: download
c.sendStepUpdate(taskID, "download", types.StepStatusRunning, "")
c.sendLog(taskID, types.LogLevelInfo, "Downloading input files...", "download")
blendFile := ""
for _, filePath := range inputFiles {
filePathStr := filePath.(string)
// Preserve directory structure when downloading (for ZIP-extracted files)
// Extract relative path from storage path (format: storage/jobs/{jobID}/...)
relPath := filePathStr
if strings.Contains(filePathStr, "/jobs/") {
parts := strings.Split(filePathStr, "/jobs/")
if len(parts) > 1 {
// Get path after /jobs/{jobID}/
jobPathParts := strings.SplitN(parts[1], "/", 2)
if len(jobPathParts) > 1 {
relPath = jobPathParts[1]
} else {
relPath = jobPathParts[0]
}
}
}
destPath := filepath.Join(workDir, relPath)
destDir := filepath.Dir(destPath)
if err := os.MkdirAll(destDir, 0755); err != nil {
c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error())
return fmt.Errorf("failed to create directory for file %s: %w", filePathStr, err)
}
if err := c.downloadFileToPath(filePathStr, destPath); err != nil {
c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error())
return fmt.Errorf("failed to download file %s: %w", filePathStr, err)
}
if filepath.Ext(filePathStr) == ".blend" {
blendFile = destPath
}
}
if blendFile == "" {
err := fmt.Errorf("no .blend file found in input files")
c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error())
return err
}
c.sendStepUpdate(taskID, "download", types.StepStatusCompleted, "")
c.sendLog(taskID, types.LogLevelInfo, "Input files downloaded successfully", "download")
// Render frames
outputDir := filepath.Join(workDir, "output")
if err := os.MkdirAll(outputDir, 0755); err != nil {
return fmt.Errorf("failed to create output directory: %w", err)
}
// For MP4, render as PNG first, then combine into video
renderFormat := outputFormat
if outputFormat == "MP4" {
renderFormat = "PNG"
}
// Blender uses # characters for frame number placeholders (not %04d)
// Use #### for 4-digit zero-padded frame numbers
outputPattern := filepath.Join(outputDir, fmt.Sprintf("frame_####.%s", strings.ToLower(renderFormat)))
// Step: render_blender
c.sendStepUpdate(taskID, "render_blender", types.StepStatusRunning, "")
if frameStart == frameEnd {
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting Blender render for frame %d...", frameStart), "render_blender")
} else {
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting Blender render for frames %d-%d...", frameStart, frameEnd), "render_blender")
}
// Execute Blender - use absolute path for output pattern
absOutputPattern, err := filepath.Abs(outputPattern)
if err != nil {
errMsg := fmt.Sprintf("failed to get absolute path for output: %v", err)
c.sendLog(taskID, types.LogLevelError, errMsg, "render_blender")
c.sendStepUpdate(taskID, "render_blender", types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
// Respect blend file settings but prefer GPU if available, fallback to CPU
// This preserves the blend file's render settings (engine, samples, etc.) but optimizes device selection
scriptContent := `
import bpy
import sys
# Get current scene settings (preserve blend file preferences)
scene = bpy.context.scene
current_engine = scene.render.engine
current_device = scene.cycles.device if hasattr(scene, 'cycles') and scene.cycles else None
print(f"Blend file render engine: {current_engine}")
if current_device:
print(f"Blend file device setting: {current_device}")
# Only override device selection if using Cycles (other engines handle GPU differently)
if current_engine == 'CYCLES':
# Ensure Cycles addon is enabled
try:
if 'cycles' not in bpy.context.preferences.addons:
bpy.ops.preferences.addon_enable(module='cycles')
print("Enabled Cycles addon")
except Exception as e:
print(f"Warning: Could not enable Cycles addon: {e}")
# Access Cycles preferences
prefs = bpy.context.preferences
try:
cycles_prefs = prefs.addons['cycles'].preferences
except (KeyError, AttributeError):
try:
cycles_addon = prefs.addons.get('cycles')
if cycles_addon:
cycles_prefs = cycles_addon.preferences
else:
raise Exception("Cycles addon not found")
except Exception as e:
print(f"ERROR: Could not access Cycles preferences: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
# Check all devices and choose the best GPU type
# Device type preference order (most performant first)
device_type_preference = ['OPTIX', 'CUDA', 'HIP', 'ONEAPI', 'METAL']
gpu_available = False
best_device_type = None
best_gpu_devices = []
devices_by_type = {} # {device_type: [devices]}
seen_device_ids = set() # Track device IDs to avoid duplicates
print("Checking for GPU availability...")
# Try to get all devices - try each device type to see what's available
for device_type in device_type_preference:
try:
cycles_prefs.compute_device_type = device_type
cycles_prefs.refresh_devices()
# Get devices for this type
devices = None
if hasattr(cycles_prefs, 'devices'):
try:
devices_prop = cycles_prefs.devices
if devices_prop:
devices = list(devices_prop) if hasattr(devices_prop, '__iter__') else [devices_prop]
except Exception as e:
pass
if not devices or len(devices) == 0:
try:
devices = cycles_prefs.get_devices()
except Exception as e:
pass
if devices and len(devices) > 0:
# Categorize devices by their type attribute, avoiding duplicates
for device in devices:
if hasattr(device, 'type'):
device_type_str = str(device.type).upper()
device_id = getattr(device, 'id', None)
# Use device ID to avoid duplicates (same device appears when checking different compute_device_types)
if device_id and device_id in seen_device_ids:
continue
if device_id:
seen_device_ids.add(device_id)
if device_type_str not in devices_by_type:
devices_by_type[device_type_str] = []
devices_by_type[device_type_str].append(device)
except (ValueError, AttributeError, KeyError, TypeError):
# Device type not supported, continue
continue
except Exception as e:
# Other errors - log but continue
print(f" Error checking {device_type}: {e}")
continue
# Print what we found
print(f"Found devices by type: {list(devices_by_type.keys())}")
for dev_type, dev_list in devices_by_type.items():
print(f" {dev_type}: {len(dev_list)} device(s)")
for device in dev_list:
device_name = getattr(device, 'name', 'Unknown')
print(f" - {device_name}")
# Choose the best GPU type based on preference
for preferred_type in device_type_preference:
if preferred_type in devices_by_type:
gpu_devices = [d for d in devices_by_type[preferred_type] if preferred_type in ['CUDA', 'OPENCL', 'OPTIX', 'HIP', 'METAL', 'ONEAPI']]
if gpu_devices:
best_device_type = preferred_type
best_gpu_devices = [(d, preferred_type) for d in gpu_devices]
print(f"Selected {preferred_type} as best GPU type with {len(gpu_devices)} device(s)")
break
# Second pass: Enable the best GPU we found
if best_device_type and best_gpu_devices:
print(f"\nEnabling GPU devices for {best_device_type}...")
try:
# Set the device type again
cycles_prefs.compute_device_type = best_device_type
cycles_prefs.refresh_devices()
# First, disable all CPU devices to ensure only GPU is used
print(f" Disabling CPU devices...")
all_devices = cycles_prefs.devices if hasattr(cycles_prefs, 'devices') else cycles_prefs.get_devices()
if all_devices:
for device in all_devices:
if hasattr(device, 'type') and str(device.type).upper() == 'CPU':
try:
device.use = False
device_name = getattr(device, 'name', 'Unknown')
print(f" Disabled CPU: {device_name}")
except Exception as e:
print(f" Warning: Could not disable CPU device {getattr(device, 'name', 'Unknown')}: {e}")
# Enable all GPU devices
enabled_count = 0
for device, device_type in best_gpu_devices:
try:
device.use = True
enabled_count += 1
device_name = getattr(device, 'name', 'Unknown')
print(f" Enabled: {device_name}")
except Exception as e:
print(f" Warning: Could not enable device {getattr(device, 'name', 'Unknown')}: {e}")
# Enable ray tracing acceleration for supported device types
try:
if best_device_type == 'HIP':
# HIPRT (HIP Ray Tracing) for AMD GPUs
if hasattr(cycles_prefs, 'use_hiprt'):
cycles_prefs.use_hiprt = True
print(f" Enabled HIPRT (HIP Ray Tracing) for faster rendering")
elif hasattr(scene.cycles, 'use_hiprt'):
scene.cycles.use_hiprt = True
print(f" Enabled HIPRT (HIP Ray Tracing) for faster rendering")
else:
print(f" HIPRT not available (requires Blender 4.0+)")
elif best_device_type == 'OPTIX':
# OptiX is already enabled when using OPTIX device type
# But we can check if there are any OptiX-specific settings
if hasattr(scene.cycles, 'use_optix_denoising'):
scene.cycles.use_optix_denoising = True
print(f" Enabled OptiX denoising")
print(f" OptiX ray tracing is active (using OPTIX device type)")
elif best_device_type == 'CUDA':
# CUDA can use OptiX if available, but it's usually automatic
# Check if we can prefer OptiX over CUDA
if hasattr(scene.cycles, 'use_optix_denoising'):
scene.cycles.use_optix_denoising = True
print(f" Enabled OptiX denoising (if OptiX available)")
print(f" CUDA ray tracing active")
elif best_device_type == 'METAL':
# MetalRT for Apple Silicon (if available)
if hasattr(scene.cycles, 'use_metalrt'):
scene.cycles.use_metalrt = True
print(f" Enabled MetalRT (Metal Ray Tracing) for faster rendering")
elif hasattr(cycles_prefs, 'use_metalrt'):
cycles_prefs.use_metalrt = True
print(f" Enabled MetalRT (Metal Ray Tracing) for faster rendering")
else:
print(f" MetalRT not available")
elif best_device_type == 'ONEAPI':
# Intel oneAPI - Embree might be available
if hasattr(scene.cycles, 'use_embree'):
scene.cycles.use_embree = True
print(f" Enabled Embree for faster CPU ray tracing")
print(f" oneAPI ray tracing active")
except Exception as e:
print(f" Could not enable ray tracing acceleration: {e}")
print(f"SUCCESS: Enabled {enabled_count} GPU device(s) for {best_device_type}")
gpu_available = True
except Exception as e:
print(f"ERROR: Failed to enable GPU devices: {e}")
import traceback
traceback.print_exc()
# Set device based on availability (prefer GPU, fallback to CPU)
if gpu_available:
scene.cycles.device = 'GPU'
print(f"Using GPU for rendering (blend file had: {current_device})")
else:
scene.cycles.device = 'CPU'
print(f"GPU not available, using CPU for rendering (blend file had: {current_device})")
# Verify device setting
final_device = scene.cycles.device
print(f"Final Cycles device: {final_device}")
else:
# For other engines (EEVEE, etc.), respect blend file settings
print(f"Using {current_engine} engine - respecting blend file settings")
# Enable GPU acceleration for EEVEE viewport rendering (if using EEVEE)
if current_engine == 'EEVEE' or current_engine == 'EEVEE_NEXT':
try:
if hasattr(bpy.context.preferences.system, 'gpu_backend'):
bpy.context.preferences.system.gpu_backend = 'OPENGL'
print("Enabled OpenGL GPU backend for EEVEE")
except Exception as e:
print(f"Could not set EEVEE GPU backend: {e}")
# Enable GPU acceleration for compositing (if compositing is enabled)
try:
if scene.use_nodes and hasattr(scene, 'node_tree') and scene.node_tree:
if hasattr(scene.node_tree, 'use_gpu_compositing'):
scene.node_tree.use_gpu_compositing = True
print("Enabled GPU compositing")
except Exception as e:
print(f"Could not enable GPU compositing: {e}")
print("Device configuration complete - blend file settings preserved, device optimized")
sys.stdout.flush()
`
scriptPath := filepath.Join(workDir, "enable_gpu.py")
if err := os.WriteFile(scriptPath, []byte(scriptContent), 0644); err != nil {
errMsg := fmt.Sprintf("failed to create GPU enable script: %v", err)
c.sendLog(taskID, types.LogLevelError, errMsg, "render_blender")
c.sendStepUpdate(taskID, "render_blender", types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
// Run Blender with GPU enabled via Python script
// Use -s (start) and -e (end) for frame ranges, or -f for single frame
var cmd *exec.Cmd
if frameStart == frameEnd {
// Single frame
cmd = exec.Command("blender", "-b", blendFile,
"--python", scriptPath,
"-o", absOutputPattern,
"-f", fmt.Sprintf("%d", frameStart))
} else {
// Frame range
cmd = exec.Command("blender", "-b", blendFile,
"--python", scriptPath,
"-o", absOutputPattern,
"-s", fmt.Sprintf("%d", frameStart),
"-e", fmt.Sprintf("%d", frameEnd),
"-a") // -a renders animation (all frames in range)
}
cmd.Dir = workDir
// Capture stdout and stderr separately for line-by-line streaming
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
errMsg := fmt.Sprintf("failed to create stdout pipe: %v", err)
c.sendLog(taskID, types.LogLevelError, errMsg, "render_blender")
c.sendStepUpdate(taskID, "render_blender", types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
stderrPipe, err := cmd.StderrPipe()
if err != nil {
errMsg := fmt.Sprintf("failed to create stderr pipe: %v", err)
c.sendLog(taskID, types.LogLevelError, errMsg, "render_blender")
c.sendStepUpdate(taskID, "render_blender", types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
// Start the command
if err := cmd.Start(); err != nil {
errMsg := fmt.Sprintf("failed to start blender: %v", err)
c.sendLog(taskID, types.LogLevelError, errMsg, "render_blender")
c.sendStepUpdate(taskID, "render_blender", types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
// Register process for cleanup on shutdown
c.runningProcs.Store(taskID, cmd)
defer c.runningProcs.Delete(taskID)
// Stream stdout line by line
stdoutDone := make(chan bool)
go func() {
defer close(stdoutDone)
scanner := bufio.NewScanner(stdoutPipe)
for scanner.Scan() {
line := scanner.Text()
if line != "" {
shouldFilter, logLevel := shouldFilterBlenderLog(line)
if !shouldFilter {
c.sendLog(taskID, logLevel, line, "render_blender")
}
}
}
}()
// Stream stderr line by line
stderrDone := make(chan bool)
go func() {
defer close(stderrDone)
scanner := bufio.NewScanner(stderrPipe)
for scanner.Scan() {
line := scanner.Text()
if line != "" {
shouldFilter, logLevel := shouldFilterBlenderLog(line)
if !shouldFilter {
// Use the filtered log level, but if it's still WARN, keep it as WARN
if logLevel == types.LogLevelInfo {
logLevel = types.LogLevelWarn
}
c.sendLog(taskID, logLevel, line, "render_blender")
}
}
}
}()
// Wait for command to complete
err = cmd.Wait()
// Wait for streaming goroutines to finish
<-stdoutDone
<-stderrDone
if err != nil {
errMsg := fmt.Sprintf("blender failed: %v", err)
c.sendLog(taskID, types.LogLevelError, errMsg, "render_blender")
c.sendStepUpdate(taskID, "render_blender", types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
// Find rendered output file(s)
// For frame ranges, we'll find all frames in the upload step
// For single frames, we need to find the specific output file
outputFile := ""
// Only check for single output file if it's a single frame render
if frameStart == frameEnd {
// List all files in output directory to find what Blender actually created
entries, err := os.ReadDir(outputDir)
if err == nil {
c.sendLog(taskID, types.LogLevelInfo, "Checking output directory for files...", "render_blender")
// Try exact match first: frame_0155.png
expectedFile := filepath.Join(outputDir, fmt.Sprintf("frame_%04d.%s", frameStart, strings.ToLower(renderFormat)))
if _, err := os.Stat(expectedFile); err == nil {
outputFile = expectedFile
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Found output file: %s", filepath.Base(expectedFile)), "render_blender")
} else {
// Try without zero padding: frame_155.png
altFile := filepath.Join(outputDir, fmt.Sprintf("frame_%d.%s", frameStart, strings.ToLower(renderFormat)))
if _, err := os.Stat(altFile); err == nil {
outputFile = altFile
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Found output file: %s", filepath.Base(altFile)), "render_blender")
} else {
// Try just frame number: 0155.png or 155.png
altFile2 := filepath.Join(outputDir, fmt.Sprintf("%04d.%s", frameStart, strings.ToLower(renderFormat)))
if _, err := os.Stat(altFile2); err == nil {
outputFile = altFile2
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Found output file: %s", filepath.Base(altFile2)), "render_blender")
} else {
// Search through all files for one containing the frame number
for _, entry := range entries {
if !entry.IsDir() {
fileName := entry.Name()
// Skip files that contain the literal pattern string (Blender bug)
if strings.Contains(fileName, "%04d") || strings.Contains(fileName, "%d") {
c.sendLog(taskID, types.LogLevelWarn, fmt.Sprintf("Skipping file with literal pattern: %s", fileName), "render_blender")
continue
}
// Check if filename contains the frame number (with or without padding)
frameStr := fmt.Sprintf("%d", frameStart)
frameStrPadded := fmt.Sprintf("%04d", frameStart)
if strings.Contains(fileName, frameStrPadded) ||
(strings.Contains(fileName, frameStr) && strings.HasSuffix(strings.ToLower(fileName), strings.ToLower(renderFormat))) {
outputFile = filepath.Join(outputDir, fileName)
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Found output file: %s", fileName), "render_blender")
break
}
}
}
}
}
}
}
if outputFile == "" {
// List all files in output directory for debugging
entries, _ := os.ReadDir(outputDir)
fileList := []string{}
for _, entry := range entries {
if !entry.IsDir() {
fileList = append(fileList, entry.Name())
}
}
expectedFile := filepath.Join(outputDir, fmt.Sprintf("frame_%04d.%s", frameStart, strings.ToLower(renderFormat)))
errMsg := fmt.Sprintf("output file not found: %s\nFiles in output directory: %v",
expectedFile, fileList)
c.sendLog(taskID, types.LogLevelError, errMsg, "render_blender")
c.sendStepUpdate(taskID, "render_blender", types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Blender render completed for frame %d", frameStart), "render_blender")
} else {
// Frame range - Blender renders multiple frames, we'll find them all in the upload step
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Blender render completed for frames %d-%d", frameStart, frameEnd), "render_blender")
}
c.sendStepUpdate(taskID, "render_blender", types.StepStatusCompleted, "")
// Step: upload or upload_frames
uploadStepName := "upload"
if outputFormat == "MP4" {
uploadStepName = "upload_frames"
}
c.sendStepUpdate(taskID, uploadStepName, types.StepStatusRunning, "")
var outputPath string
// If we have a frame range, find and upload all frames
if frameStart != frameEnd {
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Uploading frames %d-%d...", frameStart, frameEnd), uploadStepName)
// Find all rendered frames in the output directory
var frameFiles []string
entries, err := os.ReadDir(outputDir)
if err == nil {
for frame := frameStart; frame <= frameEnd; frame++ {
// Try different naming patterns
patterns := []string{
fmt.Sprintf("frame_%04d.%s", frame, strings.ToLower(renderFormat)),
fmt.Sprintf("frame_%d.%s", frame, strings.ToLower(renderFormat)),
fmt.Sprintf("%04d.%s", frame, strings.ToLower(renderFormat)),
fmt.Sprintf("%d.%s", frame, strings.ToLower(renderFormat)),
}
found := false
for _, pattern := range patterns {
framePath := filepath.Join(outputDir, pattern)
if _, err := os.Stat(framePath); err == nil {
frameFiles = append(frameFiles, framePath)
found = true
break
}
}
// If not found with patterns, search through entries
if !found {
frameStr := fmt.Sprintf("%d", frame)
frameStrPadded := fmt.Sprintf("%04d", frame)
for _, entry := range entries {
if entry.IsDir() {
continue
}
fileName := entry.Name()
// Skip files with literal pattern strings
if strings.Contains(fileName, "%04d") || strings.Contains(fileName, "%d") {
continue
}
// Check if filename contains the frame number
fullPath := filepath.Join(outputDir, fileName)
alreadyAdded := false
for _, existing := range frameFiles {
if existing == fullPath {
alreadyAdded = true
break
}
}
if !alreadyAdded &&
(strings.Contains(fileName, frameStrPadded) ||
(strings.Contains(fileName, frameStr) && strings.HasSuffix(strings.ToLower(fileName), strings.ToLower(renderFormat)))) {
frameFiles = append(frameFiles, fullPath)
found = true
break
}
}
}
}
}
if len(frameFiles) == 0 {
errMsg := fmt.Sprintf("no frame files found for range %d-%d", frameStart, frameEnd)
c.sendLog(taskID, types.LogLevelError, errMsg, uploadStepName)
c.sendStepUpdate(taskID, uploadStepName, types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
// Upload all frames
uploadedCount := 0
uploadedFiles := []string{}
for i, frameFile := range frameFiles {
fileName := filepath.Base(frameFile)
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Uploading frame %d/%d: %s", i+1, len(frameFiles), fileName), uploadStepName)
uploadedPath, err := c.uploadFile(jobID, frameFile)
if err != nil {
errMsg := fmt.Sprintf("failed to upload frame %s: %v", fileName, err)
c.sendLog(taskID, types.LogLevelError, errMsg, uploadStepName)
c.sendStepUpdate(taskID, uploadStepName, types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
uploadedCount++
uploadedFiles = append(uploadedFiles, fileName)
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Uploaded frame %d/%d: %s -> %s", i+1, len(frameFiles), fileName, uploadedPath), uploadStepName)
}
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Successfully uploaded %d frames: %v", uploadedCount, uploadedFiles), uploadStepName)
c.sendStepUpdate(taskID, uploadStepName, types.StepStatusCompleted, "")
outputPath = "" // Not used for frame ranges, frames are uploaded individually
} else {
// Single frame upload
fileName := filepath.Base(outputFile)
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Uploading output file: %s", fileName), uploadStepName)
outputPath, err = c.uploadFile(jobID, outputFile)
if err != nil {
errMsg := fmt.Sprintf("failed to upload output file %s: %v", fileName, err)
c.sendLog(taskID, types.LogLevelError, errMsg, uploadStepName)
c.sendStepUpdate(taskID, uploadStepName, types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Output file uploaded successfully: %s -> %s", fileName, outputPath), uploadStepName)
c.sendStepUpdate(taskID, uploadStepName, types.StepStatusCompleted, "")
}
// Step: complete
c.sendStepUpdate(taskID, "complete", types.StepStatusRunning, "")
c.sendLog(taskID, types.LogLevelInfo, "Task completed successfully", "complete")
// Mark task as complete
if err := c.completeTask(taskID, outputPath, true, ""); err != nil {
c.sendStepUpdate(taskID, "complete", types.StepStatusFailed, err.Error())
return err
}
c.sendStepUpdate(taskID, "complete", types.StepStatusCompleted, "")
return nil
}
// processVideoGenerationTask processes a video generation task
func (c *Client) processVideoGenerationTask(task map[string]interface{}, jobID int64) error {
taskID := int64(task["id"].(float64))
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting video generation task: job %d", jobID), "")
log.Printf("Processing video generation task %d for job %d", taskID, jobID)
// Get all output files for this job
files, err := c.getJobFiles(jobID)
if err != nil {
c.sendStepUpdate(taskID, "get_files", types.StepStatusFailed, err.Error())
return fmt.Errorf("failed to get job files: %w", err)
}
// Find all PNG frame files
var pngFiles []map[string]interface{}
for _, file := range files {
fileType, _ := file["file_type"].(string)
fileName, _ := file["file_name"].(string)
if fileType == "output" && strings.HasSuffix(fileName, ".png") {
pngFiles = append(pngFiles, file)
}
}
if len(pngFiles) == 0 {
err := fmt.Errorf("no PNG frame files found for MP4 generation")
c.sendStepUpdate(taskID, "get_files", types.StepStatusFailed, err.Error())
return err
}
c.sendStepUpdate(taskID, "get_files", types.StepStatusCompleted, "")
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Found %d PNG frames for video generation", len(pngFiles)), "get_files")
log.Printf("Generating MP4 for job %d from %d PNG frames", jobID, len(pngFiles))
// Step: download_frames
c.sendStepUpdate(taskID, "download_frames", types.StepStatusRunning, "")
c.sendLog(taskID, types.LogLevelInfo, "Downloading PNG frames...", "download_frames")
// Create temporary job workspace for video generation within runner workspace
workDir := filepath.Join(c.getWorkspaceDir(), fmt.Sprintf("job-%d-video", jobID))
if err := os.MkdirAll(workDir, 0755); err != nil {
c.sendStepUpdate(taskID, "download_frames", types.StepStatusFailed, err.Error())
return fmt.Errorf("failed to create work directory: %w", err)
}
defer os.RemoveAll(workDir)
// Download all PNG frames
var frameFiles []string
for _, file := range pngFiles {
fileName, _ := file["file_name"].(string)
framePath := filepath.Join(workDir, fileName)
if err := c.downloadFrameFile(jobID, fileName, framePath); err != nil {
log.Printf("Failed to download frame %s: %v", fileName, err)
continue
}
frameFiles = append(frameFiles, framePath)
}
if len(frameFiles) == 0 {
err := fmt.Errorf("failed to download any frame files")
c.sendStepUpdate(taskID, "download_frames", types.StepStatusFailed, err.Error())
return err
}
// Sort frame files by name to ensure correct order
sort.Strings(frameFiles)
c.sendStepUpdate(taskID, "download_frames", types.StepStatusCompleted, "")
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Downloaded %d frames", len(frameFiles)), "download_frames")
// Step: generate_video
c.sendStepUpdate(taskID, "generate_video", types.StepStatusRunning, "")
c.sendLog(taskID, types.LogLevelInfo, "Generating MP4 video with ffmpeg...", "generate_video")
// Generate MP4 using ffmpeg
outputMP4 := filepath.Join(workDir, fmt.Sprintf("output_%d.mp4", jobID))
// Use ffmpeg to combine frames into MP4
// Method 1: Using image sequence input (more reliable)
firstFrame := frameFiles[0]
// Extract frame number pattern (e.g., frame_2470.png -> frame_%04d.png)
baseName := filepath.Base(firstFrame)
// Find the numeric part and replace it with %04d pattern
// Use regex to find digits after underscore and before extension
re := regexp.MustCompile(`_(\d+)\.`)
var pattern string
var startNumber int
frameNumStr := re.FindStringSubmatch(baseName)
if len(frameNumStr) > 1 {
// Replace the numeric part with %04d
pattern = re.ReplaceAllString(baseName, "_%04d.")
// Extract the starting frame number
fmt.Sscanf(frameNumStr[1], "%d", &startNumber)
} else {
// Fallback: try simple replacement
startNumber = extractFrameNumber(baseName)
pattern = strings.Replace(baseName, fmt.Sprintf("%d", startNumber), "%04d", 1)
}
patternPath := filepath.Join(workDir, pattern)
// Allocate a VAAPI device for this task (if available)
allocatedDevice := c.allocateVAAPIDevice(taskID)
defer c.releaseVAAPIDevice(taskID) // Always release the device when done
if allocatedDevice != "" {
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Using VAAPI device: %s", allocatedDevice), "generate_video")
} else {
c.sendLog(taskID, types.LogLevelInfo, "No VAAPI device available, will use software encoding or first available device", "generate_video")
}
// Run ffmpeg to combine frames into MP4 at 24 fps with hardware acceleration
// Use -start_number to tell ffmpeg the starting frame number
cmd, err := c.buildFFmpegCommand(allocatedDevice, "-y", "-start_number", fmt.Sprintf("%d", startNumber),
"-framerate", "24", "-i", patternPath,
"-r", "24", outputMP4)
if err != nil {
c.sendLog(taskID, types.LogLevelWarn, fmt.Sprintf("Hardware acceleration detection failed, using software encoding: %v", err), "generate_video")
// Fallback to software encoding
cmd = exec.Command("ffmpeg", "-y", "-start_number", fmt.Sprintf("%d", startNumber),
"-framerate", "24", "-i", patternPath,
"-c:v", "libx264", "-pix_fmt", "yuv420p", "-r", "24", outputMP4)
}
cmd.Dir = workDir
output, err := cmd.CombinedOutput()
if err != nil {
outputStr := string(output)
// Check for size-related errors and provide helpful messages
if sizeErr := c.checkFFmpegSizeError(outputStr); sizeErr != nil {
c.sendLog(taskID, types.LogLevelError, sizeErr.Error(), "generate_video")
c.sendStepUpdate(taskID, "generate_video", types.StepStatusFailed, sizeErr.Error())
return sizeErr
}
// Try alternative method with concat demuxer
log.Printf("First ffmpeg attempt failed, trying concat method: %s", outputStr)
err = c.generateMP4WithConcat(frameFiles, outputMP4, workDir, allocatedDevice)
if err != nil {
// Check for size errors in concat method too
if sizeErr := c.checkFFmpegSizeError(err.Error()); sizeErr != nil {
c.sendLog(taskID, types.LogLevelError, sizeErr.Error(), "generate_video")
c.sendStepUpdate(taskID, "generate_video", types.StepStatusFailed, sizeErr.Error())
return sizeErr
}
c.sendStepUpdate(taskID, "generate_video", types.StepStatusFailed, err.Error())
return err
}
}
// Check if MP4 was created
if _, err := os.Stat(outputMP4); os.IsNotExist(err) {
err := fmt.Errorf("MP4 file not created: %s", outputMP4)
c.sendStepUpdate(taskID, "generate_video", types.StepStatusFailed, err.Error())
return err
}
c.sendStepUpdate(taskID, "generate_video", types.StepStatusCompleted, "")
c.sendLog(taskID, types.LogLevelInfo, "MP4 video generated successfully", "generate_video")
// Step: upload_video
c.sendStepUpdate(taskID, "upload_video", types.StepStatusRunning, "")
c.sendLog(taskID, types.LogLevelInfo, "Uploading MP4 video...", "upload_video")
// Upload MP4 file
mp4Path, err := c.uploadFile(jobID, outputMP4)
if err != nil {
c.sendStepUpdate(taskID, "upload_video", types.StepStatusFailed, err.Error())
return fmt.Errorf("failed to upload MP4: %w", err)
}
c.sendStepUpdate(taskID, "upload_video", types.StepStatusCompleted, "")
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Successfully uploaded MP4: %s", mp4Path), "upload_video")
// Mark task as complete
if err := c.completeTask(taskID, mp4Path, true, ""); err != nil {
return err
}
log.Printf("Successfully generated and uploaded MP4 for job %d: %s", jobID, mp4Path)
return nil
}
// buildFFmpegCommand builds an ffmpeg command with hardware acceleration if available
// If device is provided (non-empty), it will be used for VAAPI encoding
// Returns the command and any error encountered during detection
func (c *Client) buildFFmpegCommand(device string, args ...string) (*exec.Cmd, error) {
// Try hardware encoders in order of preference
// Priority: NVENC (NVIDIA) > VideoToolbox (macOS) > VAAPI (Intel/AMD Linux) > AMF (AMD Windows) > software fallback
// Check for NVIDIA NVENC
if c.checkEncoderAvailable("h264_nvenc") {
// Insert hardware encoding args before output file
outputIdx := len(args) - 1
hwArgs := []string{"-c:v", "h264_nvenc", "-preset", "p4", "-b:v", "10M", "-maxrate", "12M", "-bufsize", "20M", "-pix_fmt", "yuv420p"}
newArgs := make([]string, 0, len(args)+len(hwArgs))
newArgs = append(newArgs, args[:outputIdx]...)
newArgs = append(newArgs, hwArgs...)
newArgs = append(newArgs, args[outputIdx:]...)
return exec.Command("ffmpeg", newArgs...), nil
}
// Check for VideoToolbox (macOS)
if c.checkEncoderAvailable("h264_videotoolbox") {
outputIdx := len(args) - 1
hwArgs := []string{"-c:v", "h264_videotoolbox", "-b:v", "10M", "-pix_fmt", "yuv420p"}
newArgs := make([]string, 0, len(args)+len(hwArgs))
newArgs = append(newArgs, args[:outputIdx]...)
newArgs = append(newArgs, hwArgs...)
newArgs = append(newArgs, args[outputIdx:]...)
return exec.Command("ffmpeg", newArgs...), nil
}
// Check for VAAPI (Intel/AMD on Linux)
if c.checkEncoderAvailable("h264_vaapi") {
// Use provided device if available, otherwise get the first available
vaapiDevice := device
if vaapiDevice == "" {
vaapiDevice = c.getVAAPIDevice()
}
if vaapiDevice != "" {
outputIdx := len(args) - 1
hwArgs := []string{"-vaapi_device", vaapiDevice, "-vf", "format=nv12,hwupload", "-c:v", "h264_vaapi", "-b:v", "10M", "-pix_fmt", "yuv420p"}
newArgs := make([]string, 0, len(args)+len(hwArgs))
newArgs = append(newArgs, args[:outputIdx]...)
newArgs = append(newArgs, hwArgs...)
newArgs = append(newArgs, args[outputIdx:]...)
return exec.Command("ffmpeg", newArgs...), nil
}
}
// Check for AMF (AMD on Windows)
if c.checkEncoderAvailable("h264_amf") {
outputIdx := len(args) - 1
hwArgs := []string{"-c:v", "h264_amf", "-quality", "balanced", "-b:v", "10M", "-pix_fmt", "yuv420p"}
newArgs := make([]string, 0, len(args)+len(hwArgs))
newArgs = append(newArgs, args[:outputIdx]...)
newArgs = append(newArgs, hwArgs...)
newArgs = append(newArgs, args[outputIdx:]...)
return exec.Command("ffmpeg", newArgs...), nil
}
// Check for Intel Quick Sync (QSV)
if c.checkEncoderAvailable("h264_qsv") {
outputIdx := len(args) - 1
hwArgs := []string{"-c:v", "h264_qsv", "-preset", "medium", "-b:v", "10M", "-pix_fmt", "yuv420p"}
newArgs := make([]string, 0, len(args)+len(hwArgs))
newArgs = append(newArgs, args[:outputIdx]...)
newArgs = append(newArgs, hwArgs...)
newArgs = append(newArgs, args[outputIdx:]...)
return exec.Command("ffmpeg", newArgs...), nil
}
// No hardware acceleration available
return nil, fmt.Errorf("no hardware encoder available")
}
// probeAllHardwareAccelerators probes ffmpeg for all available hardware acceleration methods
// Returns a map of hwaccel method -> true/false
func (c *Client) probeAllHardwareAccelerators() map[string]bool {
hwaccels := make(map[string]bool)
cmd := exec.Command("ffmpeg", "-hide_banner", "-hwaccels")
output, err := cmd.CombinedOutput()
if err != nil {
log.Printf("Failed to probe hardware accelerators: %v", err)
return hwaccels
}
// Parse output - hwaccels are listed one per line after "Hardware acceleration methods:"
outputStr := string(output)
lines := strings.Split(outputStr, "\n")
inHwaccelsSection := false
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.Contains(line, "Hardware acceleration methods:") {
inHwaccelsSection = true
continue
}
if inHwaccelsSection {
if line == "" {
break
}
// Each hwaccel is on its own line
hwaccel := strings.TrimSpace(line)
if hwaccel != "" {
hwaccels[hwaccel] = true
}
}
}
return hwaccels
}
// probeAllHardwareEncoders probes ffmpeg for all available hardware encoders
// Returns a map of encoder name -> true/false
func (c *Client) probeAllHardwareEncoders() map[string]bool {
encoders := make(map[string]bool)
cmd := exec.Command("ffmpeg", "-hide_banner", "-encoders")
output, err := cmd.CombinedOutput()
if err != nil {
log.Printf("Failed to probe encoders: %v", err)
return encoders
}
// Parse output - encoders are listed with format: " V..... h264_nvenc"
outputStr := string(output)
lines := strings.Split(outputStr, "\n")
inEncodersSection := false
// Common hardware encoder patterns
hwPatterns := []string{
"_nvenc", "_vaapi", "_qsv", "_videotoolbox", "_amf", "_v4l2m2m", "_omx", "_mediacodec",
}
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.Contains(line, "Encoders:") || strings.Contains(line, "Codecs:") {
inEncodersSection = true
continue
}
if inEncodersSection {
// Encoder lines typically look like: " V..... h264_nvenc H.264 / AVC / MPEG-4 AVC (NVIDIA NVENC)"
// Split by whitespace and check if any part matches hardware patterns
parts := strings.Fields(line)
for _, part := range parts {
for _, pattern := range hwPatterns {
if strings.Contains(part, pattern) {
encoders[part] = true
break
}
}
}
}
}
return encoders
}
// checkEncoderAvailable checks if an ffmpeg encoder is available and actually usable
func (c *Client) checkEncoderAvailable(encoder string) bool {
// Check cache first
c.hwAccelCacheMu.RLock()
if cached, ok := c.hwAccelCache[encoder]; ok {
c.hwAccelCacheMu.RUnlock()
return cached
}
c.hwAccelCacheMu.RUnlock()
// Initialize cache if needed
c.hwAccelCacheMu.Lock()
if c.hwAccelCache == nil {
c.hwAccelCache = make(map[string]bool)
}
c.hwAccelCacheMu.Unlock()
// First check if encoder is listed in encoders output
cmd := exec.Command("ffmpeg", "-hide_banner", "-encoders")
output, err := cmd.CombinedOutput()
if err != nil {
c.hwAccelCacheMu.Lock()
c.hwAccelCache[encoder] = false
c.hwAccelCacheMu.Unlock()
return false
}
encoderOutput := string(output)
// Check for exact encoder name (more reliable than just contains)
encoderPattern := regexp.MustCompile(`\b` + regexp.QuoteMeta(encoder) + `\b`)
if !encoderPattern.MatchString(encoderOutput) {
// Also try case-insensitive and without exact word boundary
if !strings.Contains(strings.ToLower(encoderOutput), strings.ToLower(encoder)) {
c.hwAccelCacheMu.Lock()
c.hwAccelCache[encoder] = false
c.hwAccelCacheMu.Unlock()
return false
}
}
// Check hardware acceleration methods that might be needed
hwaccelCmd := exec.Command("ffmpeg", "-hide_banner", "-hwaccels")
hwaccelOutput, err := hwaccelCmd.CombinedOutput()
hwaccelStr := ""
if err == nil {
hwaccelStr = string(hwaccelOutput)
}
// Encoder-specific detection and testing
var available bool
switch encoder {
case "h264_nvenc", "hevc_nvenc":
// NVENC - check for CUDA/NVENC support
hasCuda := strings.Contains(hwaccelStr, "cuda") || strings.Contains(hwaccelStr, "cuvid")
if hasCuda {
available = c.testNVENCEncoder()
} else {
// Some builds have NVENC without CUDA hwaccel, still test
available = c.testNVENCEncoder()
}
case "h264_vaapi", "hevc_vaapi":
// VAAPI needs device setup
// Check if encoder is listed first (more reliable than hwaccels check)
hasVAAPI := strings.Contains(hwaccelStr, "vaapi")
if hasVAAPI {
available = c.testVAAPIEncoder()
} else {
// Even if hwaccels doesn't show vaapi, the encoder might still work
// Try testing anyway (some builds have the encoder but not the hwaccel method)
log.Printf("VAAPI not in hwaccels list, but encoder found - testing anyway")
available = c.testVAAPIEncoder()
}
case "h264_qsv", "hevc_qsv":
// QSV needs specific setup
hasQSV := strings.Contains(hwaccelStr, "qsv")
if hasQSV {
available = c.testQSVEncoder()
} else {
available = false
}
case "h264_videotoolbox", "hevc_videotoolbox":
// VideoToolbox on macOS
hasVideoToolbox := strings.Contains(hwaccelStr, "videotoolbox")
if hasVideoToolbox {
available = c.testVideoToolboxEncoder()
} else {
available = false
}
case "h264_amf", "hevc_amf":
// AMF on Windows
hasAMF := strings.Contains(hwaccelStr, "d3d11va") || strings.Contains(hwaccelStr, "dxva2")
if hasAMF {
available = c.testAMFEncoder()
} else {
available = false
}
case "h264_v4l2m2m", "hevc_v4l2m2m":
// V4L2 M2M (Video4Linux2 Memory-to-Memory) on Linux
available = c.testV4L2M2MEncoder()
case "h264_omx", "hevc_omx":
// OpenMAX on Raspberry Pi
available = c.testOMXEncoder()
case "h264_mediacodec", "hevc_mediacodec":
// MediaCodec on Android
available = c.testMediaCodecEncoder()
default:
// Generic test for other encoders
available = c.testGenericEncoder(encoder)
}
// Cache the result
c.hwAccelCacheMu.Lock()
c.hwAccelCache[encoder] = available
c.hwAccelCacheMu.Unlock()
return available
}
// testNVENCEncoder tests NVIDIA NVENC encoder
func (c *Client) testNVENCEncoder() bool {
// Test with a simple encode
testCmd := exec.Command("ffmpeg",
"-f", "lavfi",
"-i", "color=c=black:s=64x64:d=0.1",
"-c:v", "h264_nvenc",
"-preset", "p1",
"-frames:v", "1",
"-f", "null",
"-",
)
testCmd.Stdout = nil
testCmd.Stderr = nil
err := testCmd.Run()
return err == nil
}
// testVAAPIEncoder tests VAAPI encoder and finds all available devices
func (c *Client) testVAAPIEncoder() bool {
// First, find all available VAAPI devices
devices := c.findVAAPIDevices()
if len(devices) == 0 {
log.Printf("VAAPI test failed: No devices found")
return false
}
// Test with each device until one works
for _, device := range devices {
log.Printf("Testing VAAPI device: %s", device)
// Try multiple test approaches with proper parameters
testCommands := [][]string{
// Standard test with proper size and bitrate
{"-vaapi_device", device, "-f", "lavfi", "-i", "color=c=black:s=1920x1080:d=0.1", "-vf", "format=nv12,hwupload", "-c:v", "h264_vaapi", "-b:v", "1M", "-frames:v", "1", "-f", "null", "-"},
// Try with smaller but still reasonable size
{"-vaapi_device", device, "-f", "lavfi", "-i", "color=c=black:s=640x480:d=0.1", "-vf", "format=nv12,hwupload", "-c:v", "h264_vaapi", "-b:v", "1M", "-frames:v", "1", "-f", "null", "-"},
// Try with minimum reasonable size
{"-vaapi_device", device, "-f", "lavfi", "-i", "color=c=black:s=64x64:d=0.1", "-vf", "format=nv12,hwupload", "-c:v", "h264_vaapi", "-b:v", "1M", "-frames:v", "1", "-f", "null", "-"},
}
for i, testArgs := range testCommands {
testCmd := exec.Command("ffmpeg", testArgs...)
var stderr bytes.Buffer
testCmd.Stdout = nil
testCmd.Stderr = &stderr
err := testCmd.Run()
if err == nil {
log.Printf("VAAPI device %s works with test method %d", device, i+1)
return true
}
// Log error for debugging but continue trying
if i == 0 {
log.Printf("VAAPI device %s test failed (method %d): %v, stderr: %s", device, i+1, err, stderr.String())
}
}
}
log.Printf("VAAPI test failed: All devices failed all test methods")
return false
}
// findVAAPIDevices finds all available VAAPI render devices
func (c *Client) findVAAPIDevices() []string {
// Check cache first
c.vaapiDevicesMu.RLock()
if len(c.vaapiDevices) > 0 {
// Verify devices still exist
validDevices := make([]string, 0, len(c.vaapiDevices))
for _, device := range c.vaapiDevices {
if _, err := os.Stat(device); err == nil {
validDevices = append(validDevices, device)
}
}
if len(validDevices) > 0 {
c.vaapiDevicesMu.RUnlock()
// Update cache if some devices were removed
if len(validDevices) != len(c.vaapiDevices) {
c.vaapiDevicesMu.Lock()
c.vaapiDevices = validDevices
c.vaapiDevicesMu.Unlock()
}
return validDevices
}
}
c.vaapiDevicesMu.RUnlock()
log.Printf("Discovering VAAPI devices...")
// Build list of potential device paths
deviceCandidates := []string{}
// First, check /dev/dri for render nodes (preferred)
if entries, err := os.ReadDir("/dev/dri"); err == nil {
log.Printf("Found %d entries in /dev/dri", len(entries))
for _, entry := range entries {
if strings.HasPrefix(entry.Name(), "renderD") {
devPath := filepath.Join("/dev/dri", entry.Name())
deviceCandidates = append(deviceCandidates, devPath)
log.Printf("Found render node: %s", devPath)
} else if strings.HasPrefix(entry.Name(), "card") {
// Also try card devices as fallback
devPath := filepath.Join("/dev/dri", entry.Name())
deviceCandidates = append(deviceCandidates, devPath)
log.Printf("Found card device: %s", devPath)
}
}
} else {
log.Printf("Failed to read /dev/dri: %v", err)
}
// Also try common device paths as fallback
commonDevices := []string{
"/dev/dri/renderD128",
"/dev/dri/renderD129",
"/dev/dri/renderD130",
"/dev/dri/renderD131",
"/dev/dri/renderD132",
"/dev/dri/card0",
"/dev/dri/card1",
"/dev/dri/card2",
}
for _, dev := range commonDevices {
// Only add if not already in candidates
found := false
for _, candidate := range deviceCandidates {
if candidate == dev {
found = true
break
}
}
if !found {
deviceCandidates = append(deviceCandidates, dev)
}
}
log.Printf("Testing %d device candidates for VAAPI", len(deviceCandidates))
// Test each device and collect working ones
workingDevices := []string{}
for _, device := range deviceCandidates {
if _, err := os.Stat(device); err != nil {
log.Printf("Device %s does not exist, skipping", device)
continue
}
log.Printf("Testing VAAPI device: %s", device)
// Try multiple test methods with proper frame sizes and bitrate
// VAAPI encoders require minimum frame sizes and bitrate parameters
testMethods := [][]string{
// Standard test with proper size and bitrate
{"-vaapi_device", device, "-f", "lavfi", "-i", "color=c=black:s=1920x1080:d=0.1", "-vf", "format=nv12,hwupload", "-c:v", "h264_vaapi", "-b:v", "1M", "-frames:v", "1", "-f", "null", "-"},
// Try with smaller but still reasonable size
{"-vaapi_device", device, "-f", "lavfi", "-i", "color=c=black:s=640x480:d=0.1", "-vf", "format=nv12,hwupload", "-c:v", "h264_vaapi", "-b:v", "1M", "-frames:v", "1", "-f", "null", "-"},
// Try with minimum reasonable size
{"-vaapi_device", device, "-f", "lavfi", "-i", "color=c=black:s=64x64:d=0.1", "-vf", "format=nv12,hwupload", "-c:v", "h264_vaapi", "-b:v", "1M", "-frames:v", "1", "-f", "null", "-"},
}
deviceWorks := false
for i, testArgs := range testMethods {
testCmd := exec.Command("ffmpeg", testArgs...)
var stderr bytes.Buffer
testCmd.Stdout = nil
testCmd.Stderr = &stderr
err := testCmd.Run()
if err == nil {
log.Printf("VAAPI device %s works (method %d)", device, i+1)
workingDevices = append(workingDevices, device)
deviceWorks = true
break
}
if i == 0 {
// Log first failure for debugging
log.Printf("VAAPI device %s test failed (method %d): %v", device, i+1, err)
if stderr.Len() > 0 {
log.Printf(" stderr: %s", strings.TrimSpace(stderr.String()))
}
}
}
if !deviceWorks {
log.Printf("VAAPI device %s failed all test methods", device)
}
}
log.Printf("Found %d working VAAPI device(s): %v", len(workingDevices), workingDevices)
// Cache all working devices
c.vaapiDevicesMu.Lock()
c.vaapiDevices = workingDevices
c.vaapiDevicesMu.Unlock()
return workingDevices
}
// getVAAPIDevice returns the first available VAAPI device, or empty string if none
func (c *Client) getVAAPIDevice() string {
devices := c.findVAAPIDevices()
if len(devices) > 0 {
return devices[0]
}
return ""
}
// allocateVAAPIDevice allocates an available VAAPI device to a task
// Returns the device path, or empty string if no device is available
func (c *Client) allocateVAAPIDevice(taskID int64) string {
c.allocatedDevicesMu.Lock()
defer c.allocatedDevicesMu.Unlock()
// Initialize map if needed
if c.allocatedDevices == nil {
c.allocatedDevices = make(map[int64]string)
}
// Get all available devices
allDevices := c.findVAAPIDevices()
if len(allDevices) == 0 {
return ""
}
// Find which devices are currently allocated
allocatedSet := make(map[string]bool)
for _, allocatedDevice := range c.allocatedDevices {
allocatedSet[allocatedDevice] = true
}
// Find the first available (not allocated) device
for _, device := range allDevices {
if !allocatedSet[device] {
c.allocatedDevices[taskID] = device
log.Printf("Allocated VAAPI device %s to task %d", device, taskID)
return device
}
}
// All devices are in use
log.Printf("No available VAAPI devices for task %d (all %d devices in use)", taskID, len(allDevices))
return ""
}
// releaseVAAPIDevice releases a VAAPI device allocated to a task
func (c *Client) releaseVAAPIDevice(taskID int64) {
c.allocatedDevicesMu.Lock()
defer c.allocatedDevicesMu.Unlock()
if c.allocatedDevices == nil {
return
}
if device, ok := c.allocatedDevices[taskID]; ok {
delete(c.allocatedDevices, taskID)
log.Printf("Released VAAPI device %s from task %d", device, taskID)
}
}
// testQSVEncoder tests Intel Quick Sync Video encoder
func (c *Client) testQSVEncoder() bool {
// QSV can work with different backends
testCmd := exec.Command("ffmpeg",
"-f", "lavfi",
"-i", "color=c=black:s=64x64:d=0.1",
"-c:v", "h264_qsv",
"-preset", "medium",
"-frames:v", "1",
"-f", "null",
"-",
)
testCmd.Stdout = nil
testCmd.Stderr = nil
err := testCmd.Run()
return err == nil
}
// testVideoToolboxEncoder tests macOS VideoToolbox encoder
func (c *Client) testVideoToolboxEncoder() bool {
testCmd := exec.Command("ffmpeg",
"-f", "lavfi",
"-i", "color=c=black:s=64x64:d=0.1",
"-c:v", "h264_videotoolbox",
"-frames:v", "1",
"-f", "null",
"-",
)
testCmd.Stdout = nil
testCmd.Stderr = nil
err := testCmd.Run()
return err == nil
}
// testAMFEncoder tests AMD AMF encoder
func (c *Client) testAMFEncoder() bool {
testCmd := exec.Command("ffmpeg",
"-f", "lavfi",
"-i", "color=c=black:s=64x64:d=0.1",
"-c:v", "h264_amf",
"-quality", "balanced",
"-frames:v", "1",
"-f", "null",
"-",
)
testCmd.Stdout = nil
testCmd.Stderr = nil
err := testCmd.Run()
return err == nil
}
// testV4L2M2MEncoder tests V4L2 M2M encoder (Video4Linux2 Memory-to-Memory)
func (c *Client) testV4L2M2MEncoder() bool {
testCmd := exec.Command("ffmpeg",
"-f", "lavfi",
"-i", "color=c=black:s=64x64:d=0.1",
"-c:v", "h264_v4l2m2m",
"-frames:v", "1",
"-f", "null",
"-",
)
testCmd.Stdout = nil
testCmd.Stderr = nil
err := testCmd.Run()
return err == nil
}
// testOMXEncoder tests OpenMAX encoder (Raspberry Pi)
func (c *Client) testOMXEncoder() bool {
testCmd := exec.Command("ffmpeg",
"-f", "lavfi",
"-i", "color=c=black:s=64x64:d=0.1",
"-c:v", "h264_omx",
"-frames:v", "1",
"-f", "null",
"-",
)
testCmd.Stdout = nil
testCmd.Stderr = nil
err := testCmd.Run()
return err == nil
}
// testMediaCodecEncoder tests MediaCodec encoder (Android)
func (c *Client) testMediaCodecEncoder() bool {
testCmd := exec.Command("ffmpeg",
"-f", "lavfi",
"-i", "color=c=black:s=64x64:d=0.1",
"-c:v", "h264_mediacodec",
"-frames:v", "1",
"-f", "null",
"-",
)
testCmd.Stdout = nil
testCmd.Stderr = nil
err := testCmd.Run()
return err == nil
}
// testGenericEncoder tests a generic encoder
func (c *Client) testGenericEncoder(encoder string) bool {
testCmd := exec.Command("ffmpeg",
"-f", "lavfi",
"-i", "color=c=black:s=64x64:d=0.1",
"-c:v", encoder,
"-frames:v", "1",
"-f", "null",
"-",
)
testCmd.Stdout = nil
testCmd.Stderr = nil
err := testCmd.Run()
return err == nil
}
// generateMP4WithConcat uses ffmpeg concat demuxer as fallback
// device parameter is optional - if provided, it will be used for VAAPI encoding
func (c *Client) generateMP4WithConcat(frameFiles []string, outputMP4, workDir string, device string) error {
// Create file list for ffmpeg concat demuxer
listFile := filepath.Join(workDir, "frames.txt")
listFileHandle, err := os.Create(listFile)
if err != nil {
return fmt.Errorf("failed to create list file: %w", err)
}
for _, frameFile := range frameFiles {
absPath, _ := filepath.Abs(frameFile)
fmt.Fprintf(listFileHandle, "file '%s'\n", absPath)
}
listFileHandle.Close()
// Run ffmpeg with concat demuxer and hardware acceleration
cmd, err := c.buildFFmpegCommand(device, "-f", "concat", "-safe", "0", "-i", listFile,
"-r", "24", "-y", outputMP4)
if err != nil {
// Fallback to software encoding
cmd = exec.Command("ffmpeg", "-f", "concat", "-safe", "0", "-i", listFile,
"-c:v", "libx264", "-pix_fmt", "yuv420p", "-r", "24", "-y", outputMP4)
}
output, err := cmd.CombinedOutput()
if err != nil {
outputStr := string(output)
// Check for size-related errors
if sizeErr := c.checkFFmpegSizeError(outputStr); sizeErr != nil {
return sizeErr
}
return fmt.Errorf("ffmpeg concat failed: %w\nOutput: %s", err, outputStr)
}
if _, err := os.Stat(outputMP4); os.IsNotExist(err) {
return fmt.Errorf("MP4 file not created: %s", outputMP4)
}
return nil
}
// checkFFmpegSizeError checks ffmpeg output for size-related errors and returns a helpful error message
func (c *Client) checkFFmpegSizeError(output string) error {
outputLower := strings.ToLower(output)
// Check for hardware encoding size constraints
if strings.Contains(outputLower, "hardware does not support encoding at size") {
// Extract size constraints if available
constraintsMatch := regexp.MustCompile(`constraints:\s*width\s+(\d+)-(\d+)\s+height\s+(\d+)-(\d+)`).FindStringSubmatch(output)
if len(constraintsMatch) == 5 {
return fmt.Errorf("video frame size is outside hardware encoder limits. Hardware requires: width %s-%s, height %s-%s. Please adjust your render resolution to fit within these constraints",
constraintsMatch[1], constraintsMatch[2], constraintsMatch[3], constraintsMatch[4])
}
return fmt.Errorf("video frame size is outside hardware encoder limits. Please adjust your render resolution")
}
// Check for invalid picture size
if strings.Contains(outputLower, "picture size") && strings.Contains(outputLower, "is invalid") {
sizeMatch := regexp.MustCompile(`picture size\s+(\d+)x(\d+)`).FindStringSubmatch(output)
if len(sizeMatch) == 3 {
return fmt.Errorf("invalid video frame size: %sx%s. Frame dimensions are too large or invalid", sizeMatch[1], sizeMatch[2])
}
return fmt.Errorf("invalid video frame size. Frame dimensions are too large or invalid")
}
// Check for encoder parameter errors mentioning width/height
if strings.Contains(outputLower, "error while opening encoder") &&
(strings.Contains(outputLower, "width") || strings.Contains(outputLower, "height") || strings.Contains(outputLower, "size")) {
// Try to extract the actual size if mentioned
sizeMatch := regexp.MustCompile(`at size\s+(\d+)x(\d+)`).FindStringSubmatch(output)
if len(sizeMatch) == 3 {
return fmt.Errorf("hardware encoder cannot encode frame size %sx%s. The frame dimensions may be too small, too large, or not supported by the hardware encoder", sizeMatch[1], sizeMatch[2])
}
return fmt.Errorf("hardware encoder error: frame size may be invalid. Common issues: frame too small (minimum usually 128x128) or too large (maximum varies by hardware)")
}
// Check for general size-related errors
if strings.Contains(outputLower, "invalid") &&
(strings.Contains(outputLower, "width") || strings.Contains(outputLower, "height") || strings.Contains(outputLower, "dimension")) {
return fmt.Errorf("invalid frame dimensions detected. Please check your render resolution settings")
}
return nil
}
// extractFrameNumber extracts frame number from filename like "frame_0001.png"
func extractFrameNumber(filename string) int {
parts := strings.Split(filepath.Base(filename), "_")
if len(parts) < 2 {
return 0
}
framePart := strings.Split(parts[1], ".")[0]
var frameNum int
fmt.Sscanf(framePart, "%d", &frameNum)
return frameNum
}
// getJobFiles gets job files from manager
func (c *Client) getJobFiles(jobID int64) ([]map[string]interface{}, error) {
path := fmt.Sprintf("/api/runner/jobs/%d/files", jobID)
resp, err := c.doSignedRequest("GET", path, nil, fmt.Sprintf("runner_id=%d", c.runnerID))
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("failed to get job files: %s", string(body))
}
var files []map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&files); err != nil {
return nil, err
}
return files, nil
}
// downloadFrameFile downloads a frame file for MP4 generation
func (c *Client) downloadFrameFile(jobID int64, fileName, destPath string) error {
path := fmt.Sprintf("/api/runner/files/%d/%s", jobID, fileName)
resp, err := c.doSignedRequest("GET", path, nil, fmt.Sprintf("runner_id=%d", c.runnerID))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("download failed: %s", string(body))
}
file, err := os.Create(destPath)
if err != nil {
return err
}
defer file.Close()
_, err = io.Copy(file, resp.Body)
return err
}
// downloadFile downloads a file from the manager to a directory (preserves filename only)
func (c *Client) downloadFile(filePath, destDir string) error {
fileName := filepath.Base(filePath)
destPath := filepath.Join(destDir, fileName)
return c.downloadFileToPath(filePath, destPath)
}
// downloadFileToPath downloads a file from the manager to a specific path (preserves directory structure)
func (c *Client) downloadFileToPath(filePath, destPath string) error {
// Extract job ID and relative path from storage path
// Path format: storage/jobs/{jobID}/{relativePath}
parts := strings.Split(strings.TrimPrefix(filePath, "./"), "/")
if len(parts) < 3 {
return fmt.Errorf("invalid file path format: %s", filePath)
}
// Find job ID in path (look for "jobs" directory)
jobID := ""
var relPathParts []string
foundJobs := false
for i, part := range parts {
if part == "jobs" && i+1 < len(parts) {
jobID = parts[i+1]
foundJobs = true
if i+2 < len(parts) {
relPathParts = parts[i+2:]
}
break
}
}
if !foundJobs || jobID == "" {
return fmt.Errorf("could not extract job ID from path: %s", filePath)
}
// Build download path - preserve relative path structure
downloadPath := fmt.Sprintf("/api/runner/files/%s", jobID)
if len(relPathParts) > 0 {
// URL encode each path component
for _, part := range relPathParts {
downloadPath += "/" + part
}
} else {
// Fallback to filename only
downloadPath += "/" + filepath.Base(filePath)
}
resp, err := c.doSignedRequest("GET", downloadPath, nil, fmt.Sprintf("runner_id=%d", c.runnerID))
if err != nil {
return fmt.Errorf("failed to download file: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("download failed: %s", string(body))
}
// Ensure destination directory exists
destDir := filepath.Dir(destPath)
if err := os.MkdirAll(destDir, 0755); err != nil {
return fmt.Errorf("failed to create destination directory: %w", err)
}
file, err := os.Create(destPath)
if err != nil {
return fmt.Errorf("failed to create destination file: %w", err)
}
defer file.Close()
_, err = io.Copy(file, resp.Body)
return err
}
// uploadFile uploads a file to the manager
func (c *Client) uploadFile(jobID int64, filePath string) (string, error) {
file, err := os.Open(filePath)
if err != nil {
return "", fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
// Create multipart form
var buf bytes.Buffer
formWriter := multipart.NewWriter(&buf)
part, err := formWriter.CreateFormFile("file", filepath.Base(filePath))
if err != nil {
return "", fmt.Errorf("failed to create form file: %w", err)
}
_, err = io.Copy(part, file)
if err != nil {
return "", fmt.Errorf("failed to copy file data: %w", err)
}
formWriter.Close()
// Upload file with shared secret
path := fmt.Sprintf("/api/runner/files/%d/upload?runner_id=%d", jobID, c.runnerID)
url := fmt.Sprintf("%s%s", c.managerURL, path)
req, err := http.NewRequest("POST", url, &buf)
if err != nil {
return "", fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", formWriter.FormDataContentType())
req.Header.Set("X-Runner-Secret", c.runnerSecret)
resp, err := c.httpClient.Do(req)
if err != nil {
return "", fmt.Errorf("failed to upload file: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return "", fmt.Errorf("upload failed: %s", string(body))
}
var result struct {
FilePath string `json:"file_path"`
FileName string `json:"file_name"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", fmt.Errorf("failed to decode response: %w", err)
}
return result.FilePath, nil
}
// processMetadataTask processes a metadata extraction task
func (c *Client) processMetadataTask(task map[string]interface{}, jobID int64, inputFiles []interface{}) error {
taskID := int64(task["id"].(float64))
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Starting metadata extraction task: job %d", jobID), "")
log.Printf("Processing metadata extraction task %d for job %d", taskID, jobID)
// Create temporary job workspace for metadata extraction within runner workspace
workDir := filepath.Join(c.getWorkspaceDir(), fmt.Sprintf("job-%d-metadata-%d", jobID, taskID))
if err := os.MkdirAll(workDir, 0755); err != nil {
return fmt.Errorf("failed to create work directory: %w", err)
}
defer os.RemoveAll(workDir)
// Step: download
c.sendStepUpdate(taskID, "download", types.StepStatusRunning, "")
c.sendLog(taskID, types.LogLevelInfo, "Downloading blend file...", "download")
blendFile := ""
for _, filePath := range inputFiles {
filePathStr := filePath.(string)
if err := c.downloadFile(filePathStr, workDir); err != nil {
c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error())
return fmt.Errorf("failed to download file %s: %w", filePathStr, err)
}
if filepath.Ext(filePathStr) == ".blend" {
blendFile = filepath.Join(workDir, filepath.Base(filePathStr))
}
}
if blendFile == "" {
err := fmt.Errorf("no .blend file found in input files")
c.sendStepUpdate(taskID, "download", types.StepStatusFailed, err.Error())
return err
}
c.sendStepUpdate(taskID, "download", types.StepStatusCompleted, "")
c.sendLog(taskID, types.LogLevelInfo, "Blend file downloaded successfully", "download")
// Step: extract_metadata
c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusRunning, "")
c.sendLog(taskID, types.LogLevelInfo, "Extracting metadata from blend file...", "extract_metadata")
// Create Python script to extract metadata
scriptPath := filepath.Join(workDir, "extract_metadata.py")
scriptContent := `import bpy
import json
import sys
# Get scene
scene = bpy.context.scene
# Extract frame range from scene settings
frame_start = scene.frame_start
frame_end = scene.frame_end
# Also check for actual animation range (keyframes)
# Find the earliest and latest keyframes across all objects
animation_start = None
animation_end = None
for obj in scene.objects:
if obj.animation_data and obj.animation_data.action:
action = obj.animation_data.action
if action.fcurves:
for fcurve in action.fcurves:
if fcurve.keyframe_points:
for keyframe in fcurve.keyframe_points:
frame = int(keyframe.co[0])
if animation_start is None or frame < animation_start:
animation_start = frame
if animation_end is None or frame > animation_end:
animation_end = frame
# Use animation range if available, otherwise use scene frame range
# If scene range seems wrong (start == end), prefer animation range
if animation_start is not None and animation_end is not None:
if frame_start == frame_end or (animation_start < frame_start or animation_end > frame_end):
# Use animation range if scene range is invalid or animation extends beyond it
frame_start = animation_start
frame_end = animation_end
# Extract render settings
render = scene.render
resolution_x = render.resolution_x
resolution_y = render.resolution_y
samples = scene.cycles.samples if scene.cycles else scene.eevee.taa_render_samples
engine = scene.render.engine.lower()
# Determine output format from file format
output_format = render.image_settings.file_format
# Extract scene info
camera_count = len([obj for obj in scene.objects if obj.type == 'CAMERA'])
object_count = len(scene.objects)
material_count = len(bpy.data.materials)
# Build metadata dictionary
metadata = {
"frame_start": frame_start,
"frame_end": frame_end,
"render_settings": {
"resolution_x": resolution_x,
"resolution_y": resolution_y,
"samples": samples,
"output_format": output_format,
"engine": engine
},
"scene_info": {
"camera_count": camera_count,
"object_count": object_count,
"material_count": material_count
}
}
# Output as JSON
print(json.dumps(metadata))
sys.stdout.flush()
`
if err := os.WriteFile(scriptPath, []byte(scriptContent), 0644); err != nil {
c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, err.Error())
return fmt.Errorf("failed to create extraction script: %w", err)
}
// Execute Blender with Python script
cmd := exec.Command("blender", "-b", blendFile, "--python", scriptPath)
cmd.Dir = workDir
// Capture stdout and stderr separately for line-by-line streaming
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
errMsg := fmt.Sprintf("failed to create stdout pipe: %v", err)
c.sendLog(taskID, types.LogLevelError, errMsg, "extract_metadata")
c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
stderrPipe, err := cmd.StderrPipe()
if err != nil {
errMsg := fmt.Sprintf("failed to create stderr pipe: %v", err)
c.sendLog(taskID, types.LogLevelError, errMsg, "extract_metadata")
c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
// Buffer to collect stdout for JSON parsing
var stdoutBuffer bytes.Buffer
// Start the command
if err := cmd.Start(); err != nil {
errMsg := fmt.Sprintf("failed to start blender: %v", err)
c.sendLog(taskID, types.LogLevelError, errMsg, "extract_metadata")
c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
// Register process for cleanup on shutdown
c.runningProcs.Store(taskID, cmd)
defer c.runningProcs.Delete(taskID)
// Stream stdout line by line and collect for JSON parsing
stdoutDone := make(chan bool)
go func() {
defer close(stdoutDone)
scanner := bufio.NewScanner(stdoutPipe)
for scanner.Scan() {
line := scanner.Text()
stdoutBuffer.WriteString(line)
stdoutBuffer.WriteString("\n")
if line != "" {
shouldFilter, logLevel := shouldFilterBlenderLog(line)
if !shouldFilter {
c.sendLog(taskID, logLevel, line, "extract_metadata")
}
}
}
}()
// Stream stderr line by line
stderrDone := make(chan bool)
go func() {
defer close(stderrDone)
scanner := bufio.NewScanner(stderrPipe)
for scanner.Scan() {
line := scanner.Text()
if line != "" {
shouldFilter, logLevel := shouldFilterBlenderLog(line)
if !shouldFilter {
// Use the filtered log level, but if it's still WARN, keep it as WARN
if logLevel == types.LogLevelInfo {
logLevel = types.LogLevelWarn
}
c.sendLog(taskID, logLevel, line, "extract_metadata")
}
}
}
}()
// Wait for command to complete
err = cmd.Wait()
// Wait for streaming goroutines to finish
<-stdoutDone
<-stderrDone
if err != nil {
errMsg := fmt.Sprintf("blender metadata extraction failed: %v", err)
c.sendLog(taskID, types.LogLevelError, errMsg, "extract_metadata")
c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
// Parse output (metadata is printed to stdout)
metadataJSON := strings.TrimSpace(stdoutBuffer.String())
// Extract JSON from output (Blender may print other stuff)
jsonStart := strings.Index(metadataJSON, "{")
jsonEnd := strings.LastIndex(metadataJSON, "}")
if jsonStart == -1 || jsonEnd == -1 || jsonEnd <= jsonStart {
errMsg := "failed to extract JSON from Blender output"
c.sendLog(taskID, types.LogLevelError, errMsg, "extract_metadata")
c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
metadataJSON = metadataJSON[jsonStart : jsonEnd+1]
var metadata types.BlendMetadata
if err := json.Unmarshal([]byte(metadataJSON), &metadata); err != nil {
errMsg := fmt.Sprintf("Failed to parse metadata JSON: %v", err)
c.sendLog(taskID, types.LogLevelError, errMsg, "extract_metadata")
c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
c.sendLog(taskID, types.LogLevelInfo, fmt.Sprintf("Metadata extracted: frames %d-%d, resolution %dx%d",
metadata.FrameStart, metadata.FrameEnd, metadata.RenderSettings.ResolutionX, metadata.RenderSettings.ResolutionY), "extract_metadata")
c.sendStepUpdate(taskID, "extract_metadata", types.StepStatusCompleted, "")
// Step: submit_metadata
c.sendStepUpdate(taskID, "submit_metadata", types.StepStatusRunning, "")
c.sendLog(taskID, types.LogLevelInfo, "Submitting metadata to manager...", "submit_metadata")
// Submit metadata to manager
if err := c.submitMetadata(jobID, metadata); err != nil {
errMsg := fmt.Sprintf("Failed to submit metadata: %v", err)
c.sendLog(taskID, types.LogLevelError, errMsg, "submit_metadata")
c.sendStepUpdate(taskID, "submit_metadata", types.StepStatusFailed, errMsg)
return errors.New(errMsg)
}
c.sendStepUpdate(taskID, "submit_metadata", types.StepStatusCompleted, "")
c.sendLog(taskID, types.LogLevelInfo, "Metadata extraction completed successfully", "")
// Mark task as complete
c.sendTaskComplete(taskID, "", true, "")
return nil
}
// submitMetadata submits extracted metadata to the manager
func (c *Client) submitMetadata(jobID int64, metadata types.BlendMetadata) error {
metadataJSON, err := json.Marshal(metadata)
if err != nil {
return fmt.Errorf("failed to marshal metadata: %w", err)
}
path := fmt.Sprintf("/api/runner/jobs/%d/metadata?runner_id=%d", jobID, c.runnerID)
url := fmt.Sprintf("%s%s", c.managerURL, path)
req, err := http.NewRequest("POST", url, bytes.NewReader(metadataJSON))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Runner-Secret", c.runnerSecret)
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to submit metadata: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("metadata submission failed: %s", string(body))
}
return nil
}
// completeTask marks a task as complete via WebSocket (or HTTP fallback)
func (c *Client) completeTask(taskID int64, outputPath string, success bool, errorMsg string) error {
return c.sendTaskComplete(taskID, outputPath, success, errorMsg)
}
// sendTaskComplete sends task completion via WebSocket
func (c *Client) sendTaskComplete(taskID int64, outputPath string, success bool, errorMsg string) error {
c.wsConnMu.RLock()
conn := c.wsConn
c.wsConnMu.RUnlock()
if conn != nil {
// Serialize all WebSocket writes to prevent concurrent write panics
c.wsWriteMu.Lock()
defer c.wsWriteMu.Unlock()
msg := map[string]interface{}{
"type": "task_complete",
"data": map[string]interface{}{
"task_id": taskID,
"output_path": outputPath,
"success": success,
"error": errorMsg,
},
"timestamp": time.Now().Unix(),
}
if err := conn.WriteJSON(msg); err != nil {
return fmt.Errorf("failed to send task completion: %w", err)
}
return nil
}
return fmt.Errorf("WebSocket not connected, cannot complete task")
}