- Added functionality to detect GPU backends (HIP and NVIDIA) during runner registration, enhancing compatibility for Blender versions below 4.x. - Introduced a new method, DetectAndStoreGPUBackends, to download the latest Blender and run a detection script, storing the results for future rendering decisions. - Updated rendering logic to force CPU rendering when HIP is detected on systems with Blender < 4.x, ensuring stability and compatibility. - Enhanced the Context structure to include flags for GPU detection status, improving error handling and rendering decisions based on GPU availability.
479 lines
14 KiB
Go
479 lines
14 KiB
Go
// Package runner provides the Jiggablend render runner.
|
|
package runner
|
|
|
|
import (
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"os/exec"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"jiggablend/internal/runner/api"
|
|
"jiggablend/internal/runner/blender"
|
|
"jiggablend/internal/runner/encoding"
|
|
"jiggablend/internal/runner/tasks"
|
|
"jiggablend/internal/runner/workspace"
|
|
"jiggablend/pkg/executils"
|
|
"jiggablend/pkg/types"
|
|
)
|
|
|
|
// Runner is the main render runner.
|
|
type Runner struct {
|
|
id int64
|
|
name string
|
|
hostname string
|
|
|
|
manager *api.ManagerClient
|
|
workspace *workspace.Manager
|
|
blender *blender.Manager
|
|
encoder *encoding.Selector
|
|
processes *executils.ProcessTracker
|
|
|
|
processors map[string]tasks.Processor
|
|
stopChan chan struct{}
|
|
|
|
fingerprint string
|
|
fingerprintMu sync.RWMutex
|
|
|
|
// gpuLockedOut is set when logs indicate a GPU error (e.g. HIP "Illegal address");
|
|
// when true, the runner forces CPU rendering for all subsequent jobs.
|
|
gpuLockedOut bool
|
|
gpuLockedOutMu sync.RWMutex
|
|
|
|
// hasHIP/hasNVIDIA are set at startup by running latest Blender to detect GPU backends.
|
|
// Used to force CPU only for Blender < 4.x when HIP is present (no official HIP support pre-4).
|
|
// gpuDetectionFailed is true when detection could not run; we then force CPU for all versions (we could not determine HIP vs NVIDIA).
|
|
gpuBackendMu sync.RWMutex
|
|
hasHIP bool
|
|
hasNVIDIA bool
|
|
gpuBackendProbed bool
|
|
gpuDetectionFailed bool
|
|
}
|
|
|
|
// New creates a new runner.
|
|
func New(managerURL, name, hostname string) *Runner {
|
|
manager := api.NewManagerClient(managerURL)
|
|
|
|
r := &Runner{
|
|
name: name,
|
|
hostname: hostname,
|
|
manager: manager,
|
|
processes: executils.NewProcessTracker(),
|
|
stopChan: make(chan struct{}),
|
|
processors: make(map[string]tasks.Processor),
|
|
}
|
|
|
|
// Generate fingerprint
|
|
r.generateFingerprint()
|
|
|
|
return r
|
|
}
|
|
|
|
// CheckRequiredTools verifies that required external tools are available.
|
|
func (r *Runner) CheckRequiredTools() error {
|
|
if err := exec.Command("zstd", "--version").Run(); err != nil {
|
|
return fmt.Errorf("zstd not found - required for compressed blend file support. Install with: apt install zstd")
|
|
}
|
|
log.Printf("Found zstd for compressed blend file support")
|
|
|
|
return nil
|
|
}
|
|
|
|
var cachedCapabilities map[string]interface{} = nil
|
|
|
|
// ProbeCapabilities detects hardware capabilities.
|
|
func (r *Runner) ProbeCapabilities() map[string]interface{} {
|
|
if cachedCapabilities != nil {
|
|
return cachedCapabilities
|
|
}
|
|
|
|
caps := make(map[string]interface{})
|
|
|
|
// Check for ffmpeg and probe encoding capabilities
|
|
if err := exec.Command("ffmpeg", "-version").Run(); err == nil {
|
|
caps["ffmpeg"] = true
|
|
} else {
|
|
caps["ffmpeg"] = false
|
|
}
|
|
|
|
cachedCapabilities = caps
|
|
return caps
|
|
}
|
|
|
|
// Register registers the runner with the manager.
|
|
func (r *Runner) Register(apiKey string) (int64, error) {
|
|
caps := r.ProbeCapabilities()
|
|
|
|
id, err := r.manager.Register(r.name, r.hostname, caps, apiKey, r.GetFingerprint())
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
r.id = id
|
|
|
|
// Initialize workspace after registration
|
|
r.workspace = workspace.NewManager(r.name)
|
|
|
|
// Initialize blender manager
|
|
r.blender = blender.NewManager(r.manager, r.workspace.BaseDir())
|
|
|
|
// Initialize encoder selector
|
|
r.encoder = encoding.NewSelector()
|
|
|
|
// Register task processors
|
|
r.processors["render"] = tasks.NewRenderProcessor()
|
|
r.processors["encode"] = tasks.NewEncodeProcessor()
|
|
|
|
return id, nil
|
|
}
|
|
|
|
// DetectAndStoreGPUBackends downloads the latest Blender from the manager (if needed),
|
|
// runs a detection script to see if HIP (AMD) and/or NVIDIA devices are available,
|
|
// and stores the result. Call after Register. Used so we only force CPU for Blender < 4.x
|
|
// when the runner has HIP (no official HIP support pre-4); NVIDIA is allowed.
|
|
func (r *Runner) DetectAndStoreGPUBackends() {
|
|
r.gpuBackendMu.Lock()
|
|
defer r.gpuBackendMu.Unlock()
|
|
if r.gpuBackendProbed {
|
|
return
|
|
}
|
|
latestVer, err := r.manager.GetLatestBlenderVersion()
|
|
if err != nil {
|
|
log.Printf("GPU backend detection failed (could not get latest Blender version: %v). All jobs will use CPU because we could not determine HIP vs NVIDIA.", err)
|
|
r.gpuBackendProbed = true
|
|
r.gpuDetectionFailed = true
|
|
return
|
|
}
|
|
binaryPath, err := r.blender.GetBinaryPath(latestVer)
|
|
if err != nil {
|
|
log.Printf("GPU backend detection failed (could not get Blender binary: %v). All jobs will use CPU because we could not determine HIP vs NVIDIA.", err)
|
|
r.gpuBackendProbed = true
|
|
r.gpuDetectionFailed = true
|
|
return
|
|
}
|
|
hasHIP, hasNVIDIA, err := blender.DetectGPUBackends(binaryPath, r.workspace.BaseDir())
|
|
if err != nil {
|
|
log.Printf("GPU backend detection failed (script error: %v). All jobs will use CPU because we could not determine HIP vs NVIDIA.", err)
|
|
r.gpuBackendProbed = true
|
|
r.gpuDetectionFailed = true
|
|
return
|
|
}
|
|
r.hasHIP = hasHIP
|
|
r.hasNVIDIA = hasNVIDIA
|
|
r.gpuBackendProbed = true
|
|
r.gpuDetectionFailed = false
|
|
log.Printf("GPU backend detection: HIP=%v NVIDIA=%v (Blender < 4.x will force CPU only when HIP is present)", hasHIP, hasNVIDIA)
|
|
}
|
|
|
|
// HasHIP returns whether the runner detected HIP (AMD) devices. Used to force CPU for Blender < 4.x only when HIP is present.
|
|
func (r *Runner) HasHIP() bool {
|
|
r.gpuBackendMu.RLock()
|
|
defer r.gpuBackendMu.RUnlock()
|
|
return r.hasHIP
|
|
}
|
|
|
|
// GPUDetectionFailed returns true when startup GPU backend detection could not run or failed. When true, all jobs use CPU because we could not determine HIP vs NVIDIA.
|
|
func (r *Runner) GPUDetectionFailed() bool {
|
|
r.gpuBackendMu.RLock()
|
|
defer r.gpuBackendMu.RUnlock()
|
|
return r.gpuDetectionFailed
|
|
}
|
|
|
|
// Start starts the job polling loop.
|
|
func (r *Runner) Start(pollInterval time.Duration) {
|
|
log.Printf("Starting job polling loop (interval: %v)", pollInterval)
|
|
|
|
for {
|
|
select {
|
|
case <-r.stopChan:
|
|
log.Printf("Stopping job polling loop")
|
|
return
|
|
default:
|
|
}
|
|
|
|
log.Printf("Polling for next job (runner ID: %d)", r.id)
|
|
job, err := r.manager.PollNextJob()
|
|
if err != nil {
|
|
log.Printf("Error polling for job: %v", err)
|
|
time.Sleep(pollInterval)
|
|
continue
|
|
}
|
|
|
|
if job == nil {
|
|
log.Printf("No job available, sleeping for %v", pollInterval)
|
|
time.Sleep(pollInterval)
|
|
continue
|
|
}
|
|
|
|
log.Printf("Received job assignment: task=%d, job=%d, type=%s",
|
|
job.Task.TaskID, job.Task.JobID, job.Task.TaskType)
|
|
|
|
if err := r.executeJob(job); err != nil {
|
|
log.Printf("Error processing job: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop stops the runner.
|
|
func (r *Runner) Stop() {
|
|
close(r.stopChan)
|
|
}
|
|
|
|
// KillAllProcesses kills all running processes.
|
|
func (r *Runner) KillAllProcesses() {
|
|
log.Printf("Killing all running processes...")
|
|
killedCount := r.processes.KillAll()
|
|
|
|
// Release all allocated devices
|
|
if r.encoder != nil {
|
|
// Device pool cleanup is handled internally
|
|
}
|
|
|
|
log.Printf("Killed %d process(es)", killedCount)
|
|
}
|
|
|
|
// Cleanup removes the workspace directory.
|
|
func (r *Runner) Cleanup() {
|
|
if r.workspace != nil {
|
|
r.workspace.Cleanup()
|
|
}
|
|
}
|
|
|
|
func (r *Runner) withJobWorkspace(jobID int64, fn func(workDir string) error) error {
|
|
workDir, err := r.workspace.CreateJobDir(jobID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create job workspace: %w", err)
|
|
}
|
|
|
|
defer func() {
|
|
if cleanupErr := r.workspace.CleanupJobDir(jobID); cleanupErr != nil {
|
|
log.Printf("Warning: failed to cleanup job workspace for job %d: %v", jobID, cleanupErr)
|
|
}
|
|
if cleanupErr := r.workspace.CleanupVideoDir(jobID); cleanupErr != nil {
|
|
log.Printf("Warning: failed to cleanup encode workspace for job %d: %v", jobID, cleanupErr)
|
|
}
|
|
}()
|
|
|
|
return fn(workDir)
|
|
}
|
|
|
|
// executeJob handles a job using per-job WebSocket connection.
|
|
func (r *Runner) executeJob(job *api.NextJobResponse) (err error) {
|
|
// Recover from panics to prevent runner process crashes during task execution
|
|
defer func() {
|
|
if rec := recover(); rec != nil {
|
|
log.Printf("Task execution panicked: %v", rec)
|
|
err = fmt.Errorf("task execution panicked: %v", rec)
|
|
}
|
|
}()
|
|
|
|
return r.withJobWorkspace(job.Task.JobID, func(workDir string) error {
|
|
// Connect to job WebSocket (no runnerID needed - authentication handles it)
|
|
jobConn := api.NewJobConnection()
|
|
if err := jobConn.Connect(r.manager.GetBaseURL(), job.JobPath, job.JobToken); err != nil {
|
|
return fmt.Errorf("failed to connect job WebSocket: %w", err)
|
|
}
|
|
defer jobConn.Close()
|
|
|
|
log.Printf("Job WebSocket authenticated for task %d", job.Task.TaskID)
|
|
|
|
// Create task context (frame range: Frame = start, FrameEnd = end; 0 or missing = single frame)
|
|
frameEnd := job.Task.FrameEnd
|
|
if frameEnd < job.Task.Frame {
|
|
frameEnd = job.Task.Frame
|
|
}
|
|
ctx := tasks.NewContext(
|
|
job.Task.TaskID,
|
|
job.Task.JobID,
|
|
job.Task.JobName,
|
|
job.Task.Frame,
|
|
frameEnd,
|
|
job.Task.TaskType,
|
|
workDir,
|
|
job.JobToken,
|
|
job.Task.Metadata,
|
|
r.manager,
|
|
jobConn,
|
|
r.workspace,
|
|
r.blender,
|
|
r.encoder,
|
|
r.processes,
|
|
r.IsGPULockedOut(),
|
|
r.HasHIP(),
|
|
r.GPUDetectionFailed(),
|
|
func() { r.SetGPULockedOut(true) },
|
|
)
|
|
|
|
ctx.Info(fmt.Sprintf("Task assignment received (job: %d, type: %s)",
|
|
job.Task.JobID, job.Task.TaskType))
|
|
|
|
// Get processor for task type
|
|
processor, ok := r.processors[job.Task.TaskType]
|
|
if !ok {
|
|
return fmt.Errorf("unknown task type: %s", job.Task.TaskType)
|
|
}
|
|
|
|
// Process the task
|
|
var processErr error
|
|
switch job.Task.TaskType {
|
|
case "render": // this task has a upload outputs step because the frames are not uploaded by the render task directly we have to do it manually here TODO: maybe we should make it work like the encode task
|
|
// Download context
|
|
contextPath := job.JobPath + "/context.tar"
|
|
if err := r.downloadContext(job.Task.JobID, contextPath, job.JobToken); err != nil {
|
|
jobConn.Log(job.Task.TaskID, types.LogLevelError, fmt.Sprintf("Failed to download context: %v", err))
|
|
jobConn.Complete(job.Task.TaskID, false, fmt.Errorf("failed to download context: %v", err))
|
|
return fmt.Errorf("failed to download context: %w", err)
|
|
}
|
|
processErr = processor.Process(ctx)
|
|
if processErr == nil {
|
|
processErr = r.uploadOutputs(ctx, job)
|
|
}
|
|
case "encode": // this task doesn't have a upload outputs step because the video is already uploaded by the encode task
|
|
processErr = processor.Process(ctx)
|
|
default:
|
|
return fmt.Errorf("unknown task type: %s", job.Task.TaskType)
|
|
}
|
|
|
|
if processErr != nil {
|
|
if errors.Is(processErr, tasks.ErrJobCancelled) {
|
|
ctx.Warn("Stopping task early because the job was cancelled")
|
|
return nil
|
|
}
|
|
ctx.Error(fmt.Sprintf("Task failed: %v", processErr))
|
|
ctx.Complete(false, processErr)
|
|
return processErr
|
|
}
|
|
|
|
ctx.Complete(true, nil)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (r *Runner) downloadContext(jobID int64, contextPath, jobToken string) error {
|
|
reader, err := r.manager.DownloadContext(contextPath, jobToken)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer reader.Close()
|
|
|
|
jobDir := r.workspace.JobDir(jobID)
|
|
return workspace.ExtractTar(reader, jobDir)
|
|
}
|
|
|
|
func (r *Runner) uploadOutputs(ctx *tasks.Context, job *api.NextJobResponse) error {
|
|
outputDir := ctx.WorkDir + "/output"
|
|
uploadPath := fmt.Sprintf("/api/runner/jobs/%d/upload", job.Task.JobID)
|
|
|
|
entries, err := os.ReadDir(outputDir)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read output directory: %w", err)
|
|
}
|
|
|
|
for _, entry := range entries {
|
|
if entry.IsDir() {
|
|
continue
|
|
}
|
|
filePath := outputDir + "/" + entry.Name()
|
|
if err := r.manager.UploadFile(uploadPath, job.JobToken, filePath); err != nil {
|
|
log.Printf("Failed to upload %s: %v", filePath, err)
|
|
} else {
|
|
ctx.OutputUploaded(entry.Name())
|
|
// Delete file after successful upload to prevent duplicate uploads
|
|
if err := os.Remove(filePath); err != nil {
|
|
log.Printf("Warning: Failed to delete file %s after upload: %v", filePath, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// generateFingerprint creates a unique hardware fingerprint.
|
|
func (r *Runner) generateFingerprint() {
|
|
r.fingerprintMu.Lock()
|
|
defer r.fingerprintMu.Unlock()
|
|
|
|
var components []string
|
|
components = append(components, r.hostname)
|
|
|
|
if machineID, err := os.ReadFile("/etc/machine-id"); err == nil {
|
|
components = append(components, strings.TrimSpace(string(machineID)))
|
|
}
|
|
|
|
if productUUID, err := os.ReadFile("/sys/class/dmi/id/product_uuid"); err == nil {
|
|
components = append(components, strings.TrimSpace(string(productUUID)))
|
|
}
|
|
|
|
if macAddr, err := r.getMACAddress(); err == nil {
|
|
components = append(components, macAddr)
|
|
}
|
|
|
|
if len(components) <= 1 {
|
|
components = append(components, fmt.Sprintf("%d", os.Getpid()))
|
|
components = append(components, fmt.Sprintf("%d", time.Now().Unix()))
|
|
}
|
|
|
|
h := sha256.New()
|
|
for _, comp := range components {
|
|
h.Write([]byte(comp))
|
|
h.Write([]byte{0})
|
|
}
|
|
|
|
r.fingerprint = hex.EncodeToString(h.Sum(nil))
|
|
}
|
|
|
|
func (r *Runner) getMACAddress() (string, error) {
|
|
interfaces, err := net.Interfaces()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
for _, iface := range interfaces {
|
|
if iface.Flags&net.FlagLoopback != 0 || iface.Flags&net.FlagUp == 0 {
|
|
continue
|
|
}
|
|
if len(iface.HardwareAddr) == 0 {
|
|
continue
|
|
}
|
|
return iface.HardwareAddr.String(), nil
|
|
}
|
|
|
|
return "", fmt.Errorf("no suitable network interface found")
|
|
}
|
|
|
|
// GetFingerprint returns the runner's hardware fingerprint.
|
|
func (r *Runner) GetFingerprint() string {
|
|
r.fingerprintMu.RLock()
|
|
defer r.fingerprintMu.RUnlock()
|
|
return r.fingerprint
|
|
}
|
|
|
|
// GetID returns the runner ID.
|
|
func (r *Runner) GetID() int64 {
|
|
return r.id
|
|
}
|
|
|
|
// SetGPULockedOut sets whether GPU use is locked out due to a detected GPU error.
|
|
// When true, the runner will force CPU rendering for all jobs.
|
|
func (r *Runner) SetGPULockedOut(locked bool) {
|
|
r.gpuLockedOutMu.Lock()
|
|
defer r.gpuLockedOutMu.Unlock()
|
|
r.gpuLockedOut = locked
|
|
if locked {
|
|
log.Printf("GPU lockout enabled: GPU rendering disabled for subsequent jobs (CPU only)")
|
|
}
|
|
}
|
|
|
|
// IsGPULockedOut returns whether GPU use is currently locked out.
|
|
func (r *Runner) IsGPULockedOut() bool {
|
|
r.gpuLockedOutMu.RLock()
|
|
defer r.gpuLockedOutMu.RUnlock()
|
|
return r.gpuLockedOut
|
|
}
|