Files
jiggablend/internal/runner/runner.go
Justin Harms dc525fbaa4 Add hardware compatibility flags for CPU rendering and HIPRT control
- Introduced `--force-cpu-rendering` and `--disable-hiprt` flags to the runner command, allowing users to enforce CPU rendering and disable HIPRT acceleration.
- Updated the runner initialization and context structures to accommodate the new flags, enhancing flexibility in rendering configurations.
- Modified the rendering logic to respect these flags, improving compatibility and user control over rendering behavior in Blender.
2026-03-13 21:15:44 -05:00

489 lines
14 KiB
Go

// Package runner provides the Jiggablend render runner.
package runner
import (
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"log"
"net"
"os"
"os/exec"
"strings"
"sync"
"time"
"jiggablend/internal/runner/api"
"jiggablend/internal/runner/blender"
"jiggablend/internal/runner/encoding"
"jiggablend/internal/runner/tasks"
"jiggablend/internal/runner/workspace"
"jiggablend/pkg/executils"
"jiggablend/pkg/types"
)
// Runner is the main render runner.
type Runner struct {
id int64
name string
hostname string
manager *api.ManagerClient
workspace *workspace.Manager
blender *blender.Manager
encoder *encoding.Selector
processes *executils.ProcessTracker
processors map[string]tasks.Processor
stopChan chan struct{}
fingerprint string
fingerprintMu sync.RWMutex
// gpuLockedOut is set when logs indicate a GPU error (e.g. HIP "Illegal address");
// when true, the runner forces CPU rendering for all subsequent jobs.
gpuLockedOut bool
gpuLockedOutMu sync.RWMutex
// hasHIP/hasNVIDIA are set at startup by running latest Blender to detect GPU backends.
// Used to force CPU only for Blender < 4.x when HIP is present (no official HIP support pre-4).
// gpuDetectionFailed is true when detection could not run; we then force CPU for all versions (we could not determine HIP vs NVIDIA).
gpuBackendMu sync.RWMutex
hasHIP bool
hasNVIDIA bool
gpuBackendProbed bool
gpuDetectionFailed bool
// forceCPURendering forces CPU rendering for all jobs regardless of metadata/backend detection.
forceCPURendering bool
// disableHIPRT disables HIPRT acceleration when configuring Cycles HIP devices.
disableHIPRT bool
}
// New creates a new runner.
func New(managerURL, name, hostname string, forceCPURendering, disableHIPRT bool) *Runner {
manager := api.NewManagerClient(managerURL)
r := &Runner{
name: name,
hostname: hostname,
manager: manager,
processes: executils.NewProcessTracker(),
stopChan: make(chan struct{}),
processors: make(map[string]tasks.Processor),
forceCPURendering: forceCPURendering,
disableHIPRT: disableHIPRT,
}
// Generate fingerprint
r.generateFingerprint()
return r
}
// CheckRequiredTools verifies that required external tools are available.
func (r *Runner) CheckRequiredTools() error {
if err := exec.Command("zstd", "--version").Run(); err != nil {
return fmt.Errorf("zstd not found - required for compressed blend file support. Install with: apt install zstd")
}
log.Printf("Found zstd for compressed blend file support")
return nil
}
var cachedCapabilities map[string]interface{} = nil
// ProbeCapabilities detects hardware capabilities.
func (r *Runner) ProbeCapabilities() map[string]interface{} {
if cachedCapabilities != nil {
return cachedCapabilities
}
caps := make(map[string]interface{})
// Check for ffmpeg and probe encoding capabilities
if err := exec.Command("ffmpeg", "-version").Run(); err == nil {
caps["ffmpeg"] = true
} else {
caps["ffmpeg"] = false
}
cachedCapabilities = caps
return caps
}
// Register registers the runner with the manager.
func (r *Runner) Register(apiKey string) (int64, error) {
caps := r.ProbeCapabilities()
id, err := r.manager.Register(r.name, r.hostname, caps, apiKey, r.GetFingerprint())
if err != nil {
return 0, err
}
r.id = id
// Initialize workspace after registration
r.workspace = workspace.NewManager(r.name)
// Initialize blender manager
r.blender = blender.NewManager(r.manager, r.workspace.BaseDir())
// Initialize encoder selector
r.encoder = encoding.NewSelector()
// Register task processors
r.processors["render"] = tasks.NewRenderProcessor()
r.processors["encode"] = tasks.NewEncodeProcessor()
return id, nil
}
// DetectAndStoreGPUBackends downloads the latest Blender from the manager (if needed),
// runs a detection script to see if HIP (AMD) and/or NVIDIA devices are available,
// and stores the result. Call after Register. Used so we only force CPU for Blender < 4.x
// when the runner has HIP (no official HIP support pre-4); NVIDIA is allowed.
func (r *Runner) DetectAndStoreGPUBackends() {
r.gpuBackendMu.Lock()
defer r.gpuBackendMu.Unlock()
if r.gpuBackendProbed {
return
}
latestVer, err := r.manager.GetLatestBlenderVersion()
if err != nil {
log.Printf("GPU backend detection failed (could not get latest Blender version: %v). All jobs will use CPU because we could not determine HIP vs NVIDIA.", err)
r.gpuBackendProbed = true
r.gpuDetectionFailed = true
return
}
binaryPath, err := r.blender.GetBinaryPath(latestVer)
if err != nil {
log.Printf("GPU backend detection failed (could not get Blender binary: %v). All jobs will use CPU because we could not determine HIP vs NVIDIA.", err)
r.gpuBackendProbed = true
r.gpuDetectionFailed = true
return
}
hasHIP, hasNVIDIA, err := blender.DetectGPUBackends(binaryPath, r.workspace.BaseDir())
if err != nil {
log.Printf("GPU backend detection failed (script error: %v). All jobs will use CPU because we could not determine HIP vs NVIDIA.", err)
r.gpuBackendProbed = true
r.gpuDetectionFailed = true
return
}
r.hasHIP = hasHIP
r.hasNVIDIA = hasNVIDIA
r.gpuBackendProbed = true
r.gpuDetectionFailed = false
log.Printf("GPU backend detection: HIP=%v NVIDIA=%v (Blender < 4.x will force CPU only when HIP is present)", hasHIP, hasNVIDIA)
}
// HasHIP returns whether the runner detected HIP (AMD) devices. Used to force CPU for Blender < 4.x only when HIP is present.
func (r *Runner) HasHIP() bool {
r.gpuBackendMu.RLock()
defer r.gpuBackendMu.RUnlock()
return r.hasHIP
}
// GPUDetectionFailed returns true when startup GPU backend detection could not run or failed. When true, all jobs use CPU because we could not determine HIP vs NVIDIA.
func (r *Runner) GPUDetectionFailed() bool {
r.gpuBackendMu.RLock()
defer r.gpuBackendMu.RUnlock()
return r.gpuDetectionFailed
}
// Start starts the job polling loop.
func (r *Runner) Start(pollInterval time.Duration) {
log.Printf("Starting job polling loop (interval: %v)", pollInterval)
for {
select {
case <-r.stopChan:
log.Printf("Stopping job polling loop")
return
default:
}
log.Printf("Polling for next job (runner ID: %d)", r.id)
job, err := r.manager.PollNextJob()
if err != nil {
log.Printf("Error polling for job: %v", err)
time.Sleep(pollInterval)
continue
}
if job == nil {
log.Printf("No job available, sleeping for %v", pollInterval)
time.Sleep(pollInterval)
continue
}
log.Printf("Received job assignment: task=%d, job=%d, type=%s",
job.Task.TaskID, job.Task.JobID, job.Task.TaskType)
if err := r.executeJob(job); err != nil {
log.Printf("Error processing job: %v", err)
}
}
}
// Stop stops the runner.
func (r *Runner) Stop() {
close(r.stopChan)
}
// KillAllProcesses kills all running processes.
func (r *Runner) KillAllProcesses() {
log.Printf("Killing all running processes...")
killedCount := r.processes.KillAll()
// Release all allocated devices
if r.encoder != nil {
// Device pool cleanup is handled internally
}
log.Printf("Killed %d process(es)", killedCount)
}
// Cleanup removes the workspace directory.
func (r *Runner) Cleanup() {
if r.workspace != nil {
r.workspace.Cleanup()
}
}
func (r *Runner) withJobWorkspace(jobID int64, fn func(workDir string) error) error {
workDir, err := r.workspace.CreateJobDir(jobID)
if err != nil {
return fmt.Errorf("failed to create job workspace: %w", err)
}
defer func() {
if cleanupErr := r.workspace.CleanupJobDir(jobID); cleanupErr != nil {
log.Printf("Warning: failed to cleanup job workspace for job %d: %v", jobID, cleanupErr)
}
if cleanupErr := r.workspace.CleanupVideoDir(jobID); cleanupErr != nil {
log.Printf("Warning: failed to cleanup encode workspace for job %d: %v", jobID, cleanupErr)
}
}()
return fn(workDir)
}
// executeJob handles a job using per-job WebSocket connection.
func (r *Runner) executeJob(job *api.NextJobResponse) (err error) {
// Recover from panics to prevent runner process crashes during task execution
defer func() {
if rec := recover(); rec != nil {
log.Printf("Task execution panicked: %v", rec)
err = fmt.Errorf("task execution panicked: %v", rec)
}
}()
return r.withJobWorkspace(job.Task.JobID, func(workDir string) error {
// Connect to job WebSocket (no runnerID needed - authentication handles it)
jobConn := api.NewJobConnection()
if err := jobConn.Connect(r.manager.GetBaseURL(), job.JobPath, job.JobToken); err != nil {
return fmt.Errorf("failed to connect job WebSocket: %w", err)
}
defer jobConn.Close()
log.Printf("Job WebSocket authenticated for task %d", job.Task.TaskID)
// Create task context (frame range: Frame = start, FrameEnd = end; 0 or missing = single frame)
frameEnd := job.Task.FrameEnd
if frameEnd < job.Task.Frame {
frameEnd = job.Task.Frame
}
ctx := tasks.NewContext(
job.Task.TaskID,
job.Task.JobID,
job.Task.JobName,
job.Task.Frame,
frameEnd,
job.Task.TaskType,
workDir,
job.JobToken,
job.Task.Metadata,
r.manager,
jobConn,
r.workspace,
r.blender,
r.encoder,
r.processes,
r.IsGPULockedOut(),
r.HasHIP(),
r.GPUDetectionFailed(),
r.forceCPURendering,
r.disableHIPRT,
func() { r.SetGPULockedOut(true) },
)
ctx.Info(fmt.Sprintf("Task assignment received (job: %d, type: %s)",
job.Task.JobID, job.Task.TaskType))
// Get processor for task type
processor, ok := r.processors[job.Task.TaskType]
if !ok {
return fmt.Errorf("unknown task type: %s", job.Task.TaskType)
}
// Process the task
var processErr error
switch job.Task.TaskType {
case "render": // this task has a upload outputs step because the frames are not uploaded by the render task directly we have to do it manually here TODO: maybe we should make it work like the encode task
// Download context
contextPath := job.JobPath + "/context.tar"
if err := r.downloadContext(job.Task.JobID, contextPath, job.JobToken); err != nil {
jobConn.Log(job.Task.TaskID, types.LogLevelError, fmt.Sprintf("Failed to download context: %v", err))
jobConn.Complete(job.Task.TaskID, false, fmt.Errorf("failed to download context: %v", err))
return fmt.Errorf("failed to download context: %w", err)
}
processErr = processor.Process(ctx)
if processErr == nil {
processErr = r.uploadOutputs(ctx, job)
}
case "encode": // this task doesn't have a upload outputs step because the video is already uploaded by the encode task
processErr = processor.Process(ctx)
default:
return fmt.Errorf("unknown task type: %s", job.Task.TaskType)
}
if processErr != nil {
if errors.Is(processErr, tasks.ErrJobCancelled) {
ctx.Warn("Stopping task early because the job was cancelled")
return nil
}
ctx.Error(fmt.Sprintf("Task failed: %v", processErr))
ctx.Complete(false, processErr)
return processErr
}
ctx.Complete(true, nil)
return nil
})
}
func (r *Runner) downloadContext(jobID int64, contextPath, jobToken string) error {
reader, err := r.manager.DownloadContext(contextPath, jobToken)
if err != nil {
return err
}
defer reader.Close()
jobDir := r.workspace.JobDir(jobID)
return workspace.ExtractTar(reader, jobDir)
}
func (r *Runner) uploadOutputs(ctx *tasks.Context, job *api.NextJobResponse) error {
outputDir := ctx.WorkDir + "/output"
uploadPath := fmt.Sprintf("/api/runner/jobs/%d/upload", job.Task.JobID)
entries, err := os.ReadDir(outputDir)
if err != nil {
return fmt.Errorf("failed to read output directory: %w", err)
}
for _, entry := range entries {
if entry.IsDir() {
continue
}
filePath := outputDir + "/" + entry.Name()
if err := r.manager.UploadFile(uploadPath, job.JobToken, filePath); err != nil {
log.Printf("Failed to upload %s: %v", filePath, err)
} else {
ctx.OutputUploaded(entry.Name())
// Delete file after successful upload to prevent duplicate uploads
if err := os.Remove(filePath); err != nil {
log.Printf("Warning: Failed to delete file %s after upload: %v", filePath, err)
}
}
}
return nil
}
// generateFingerprint creates a unique hardware fingerprint.
func (r *Runner) generateFingerprint() {
r.fingerprintMu.Lock()
defer r.fingerprintMu.Unlock()
var components []string
components = append(components, r.hostname)
if machineID, err := os.ReadFile("/etc/machine-id"); err == nil {
components = append(components, strings.TrimSpace(string(machineID)))
}
if productUUID, err := os.ReadFile("/sys/class/dmi/id/product_uuid"); err == nil {
components = append(components, strings.TrimSpace(string(productUUID)))
}
if macAddr, err := r.getMACAddress(); err == nil {
components = append(components, macAddr)
}
if len(components) <= 1 {
components = append(components, fmt.Sprintf("%d", os.Getpid()))
components = append(components, fmt.Sprintf("%d", time.Now().Unix()))
}
h := sha256.New()
for _, comp := range components {
h.Write([]byte(comp))
h.Write([]byte{0})
}
r.fingerprint = hex.EncodeToString(h.Sum(nil))
}
func (r *Runner) getMACAddress() (string, error) {
interfaces, err := net.Interfaces()
if err != nil {
return "", err
}
for _, iface := range interfaces {
if iface.Flags&net.FlagLoopback != 0 || iface.Flags&net.FlagUp == 0 {
continue
}
if len(iface.HardwareAddr) == 0 {
continue
}
return iface.HardwareAddr.String(), nil
}
return "", fmt.Errorf("no suitable network interface found")
}
// GetFingerprint returns the runner's hardware fingerprint.
func (r *Runner) GetFingerprint() string {
r.fingerprintMu.RLock()
defer r.fingerprintMu.RUnlock()
return r.fingerprint
}
// GetID returns the runner ID.
func (r *Runner) GetID() int64 {
return r.id
}
// SetGPULockedOut sets whether GPU use is locked out due to a detected GPU error.
// When true, the runner will force CPU rendering for all jobs.
func (r *Runner) SetGPULockedOut(locked bool) {
r.gpuLockedOutMu.Lock()
defer r.gpuLockedOutMu.Unlock()
r.gpuLockedOut = locked
if locked {
log.Printf("GPU lockout enabled: GPU rendering disabled for subsequent jobs (CPU only)")
}
}
// IsGPULockedOut returns whether GPU use is currently locked out.
func (r *Runner) IsGPULockedOut() bool {
r.gpuLockedOutMu.RLock()
defer r.gpuLockedOutMu.RUnlock()
return r.gpuLockedOut
}