diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 66ed2e9..6a46fea 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -40,6 +40,11 @@ type Runner struct { fingerprint string fingerprintMu sync.RWMutex + + // gpuLockedOut is set when logs indicate a GPU error (e.g. HIP "Illegal address"); + // when true, the runner forces CPU rendering for all subsequent jobs. + gpuLockedOut bool + gpuLockedOutMu sync.RWMutex } // New creates a new runner. @@ -238,6 +243,8 @@ func (r *Runner) executeJob(job *api.NextJobResponse) (err error) { r.blender, r.encoder, r.processes, + r.IsGPULockedOut(), + func() { r.SetGPULockedOut(true) }, ) ctx.Info(fmt.Sprintf("Task assignment received (job: %d, type: %s)", @@ -388,3 +395,21 @@ func (r *Runner) GetFingerprint() string { func (r *Runner) GetID() int64 { return r.id } + +// SetGPULockedOut sets whether GPU use is locked out due to a detected GPU error. +// When true, the runner will force CPU rendering for all jobs. +func (r *Runner) SetGPULockedOut(locked bool) { + r.gpuLockedOutMu.Lock() + defer r.gpuLockedOutMu.Unlock() + r.gpuLockedOut = locked + if locked { + log.Printf("GPU lockout enabled: GPU rendering disabled for subsequent jobs (CPU only)") + } +} + +// IsGPULockedOut returns whether GPU use is currently locked out. +func (r *Runner) IsGPULockedOut() bool { + r.gpuLockedOutMu.RLock() + defer r.gpuLockedOutMu.RUnlock() + return r.gpuLockedOut +} diff --git a/internal/runner/tasks/processor.go b/internal/runner/tasks/processor.go index 83d68ca..b607bbe 100644 --- a/internal/runner/tasks/processor.go +++ b/internal/runner/tasks/processor.go @@ -38,12 +38,18 @@ type Context struct { Blender *blender.Manager Encoder *encoding.Selector Processes *executils.ProcessTracker + + // GPULockedOut is set when the runner has detected a GPU error (e.g. HIP) and disables GPU for all jobs. + GPULockedOut bool + // OnGPUError is called when a GPU error line is seen in render logs; typically sets runner GPU lockout. + OnGPUError func() } // ErrJobCancelled indicates the manager-side job was cancelled during execution. var ErrJobCancelled = errors.New("job cancelled") // NewContext creates a new task context. frameEnd should be >= frame; if 0 or less than frame, it is treated as single-frame (frameEnd = frame). +// gpuLockedOut is the runner's current GPU lockout state; onGPUError is called when a GPU error is detected in logs (may be nil). func NewContext( taskID, jobID int64, jobName string, @@ -58,26 +64,30 @@ func NewContext( blenderMgr *blender.Manager, encoder *encoding.Selector, processes *executils.ProcessTracker, + gpuLockedOut bool, + onGPUError func(), ) *Context { if frameEnd < frameStart { frameEnd = frameStart } return &Context{ - TaskID: taskID, - JobID: jobID, - JobName: jobName, - Frame: frameStart, - FrameEnd: frameEnd, - TaskType: taskType, - WorkDir: workDir, - JobToken: jobToken, - Metadata: metadata, - Manager: manager, - JobConn: jobConn, - Workspace: ws, - Blender: blenderMgr, - Encoder: encoder, - Processes: processes, + TaskID: taskID, + JobID: jobID, + JobName: jobName, + Frame: frameStart, + FrameEnd: frameEnd, + TaskType: taskType, + WorkDir: workDir, + JobToken: jobToken, + Metadata: metadata, + Manager: manager, + JobConn: jobConn, + Workspace: ws, + Blender: blenderMgr, + Encoder: encoder, + Processes: processes, + GPULockedOut: gpuLockedOut, + OnGPUError: onGPUError, } } @@ -158,6 +168,22 @@ func (c *Context) ShouldEnableExecution() bool { return c.Metadata != nil && c.Metadata.EnableExecution != nil && *c.Metadata.EnableExecution } +// ShouldForceCPU returns true if GPU should be disabled and CPU rendering forced +// (runner GPU lockout or metadata force_cpu in engine_settings). +func (c *Context) ShouldForceCPU() bool { + if c.GPULockedOut { + return true + } + if c.Metadata != nil && c.Metadata.RenderSettings.EngineSettings != nil { + if v, ok := c.Metadata.RenderSettings.EngineSettings["force_cpu"]; ok { + if b, ok := v.(bool); ok && b { + return true + } + } + } + return false +} + // IsJobCancelled checks whether the manager marked this job as cancelled. func (c *Context) IsJobCancelled() (bool, error) { if c.Manager == nil { diff --git a/internal/runner/tasks/render.go b/internal/runner/tasks/render.go index 9de80fb..c4447ff 100644 --- a/internal/runner/tasks/render.go +++ b/internal/runner/tasks/render.go @@ -25,6 +25,24 @@ func NewRenderProcessor() *RenderProcessor { return &RenderProcessor{} } +// gpuErrorSubstrings are log line substrings that indicate a GPU backend error; any match triggers full GPU lockout. +var gpuErrorSubstrings = []string{ + "Illegal address in hip", // HIP (AMD) backend +} + +// checkGPUErrorLine checks a log line for GPU error indicators and triggers runner GPU lockout if found. +func (p *RenderProcessor) checkGPUErrorLine(ctx *Context, line string) { + for _, sub := range gpuErrorSubstrings { + if strings.Contains(line, sub) { + if ctx.OnGPUError != nil { + ctx.OnGPUError() + } + ctx.Warn(fmt.Sprintf("GPU error detected in log (%q); GPU disabled for subsequent jobs", sub)) + return + } + } +} + // Process executes a render task. func (p *RenderProcessor) Process(ctx *Context) error { if err := ctx.CheckCancelled(); err != nil { @@ -77,6 +95,10 @@ func (p *RenderProcessor) Process(ctx *Context) error { // We always render EXR (linear) for VFX accuracy; job output_format is the deliverable (EXR sequence or video). renderFormat := "EXR" + if ctx.ShouldForceCPU() { + ctx.Info("GPU lockout active: using CPU rendering only") + } + // Create render script if err := p.createRenderScript(ctx, renderFormat); err != nil { return err @@ -142,13 +164,22 @@ func (p *RenderProcessor) createRenderScript(ctx *Context, renderFormat string) return errors.New(errMsg) } - // Write render settings if available + // Write render settings: merge job metadata with runner force_cpu (GPU lockout) + var settingsMap map[string]interface{} if ctx.Metadata != nil && ctx.Metadata.RenderSettings.EngineSettings != nil { - settingsJSON, err := json.Marshal(ctx.Metadata.RenderSettings) + raw, err := json.Marshal(ctx.Metadata.RenderSettings) if err == nil { - if err := os.WriteFile(renderSettingsFilePath, settingsJSON, 0644); err != nil { - ctx.Warn(fmt.Sprintf("Failed to write render settings file: %v", err)) - } + _ = json.Unmarshal(raw, &settingsMap) + } + } + if settingsMap == nil { + settingsMap = make(map[string]interface{}) + } + settingsMap["force_cpu"] = ctx.ShouldForceCPU() + settingsJSON, err := json.Marshal(settingsMap) + if err == nil { + if err := os.WriteFile(renderSettingsFilePath, settingsJSON, 0644); err != nil { + ctx.Warn(fmt.Sprintf("Failed to write render settings file: %v", err)) } } @@ -211,7 +242,7 @@ func (p *RenderProcessor) runBlender(ctx *Context, blenderBinary, blendFile, out ctx.Processes.Track(ctx.TaskID, cmd) defer ctx.Processes.Untrack(ctx.TaskID) - // Stream stdout + // Stream stdout and watch for GPU error lines (lock out all GPU on any backend error) stdoutDone := make(chan bool) go func() { defer close(stdoutDone) @@ -219,6 +250,7 @@ func (p *RenderProcessor) runBlender(ctx *Context, blenderBinary, blendFile, out for scanner.Scan() { line := scanner.Text() if line != "" { + p.checkGPUErrorLine(ctx, line) shouldFilter, logLevel := blender.FilterLog(line) if !shouldFilter { ctx.Log(logLevel, line) @@ -227,7 +259,7 @@ func (p *RenderProcessor) runBlender(ctx *Context, blenderBinary, blendFile, out } }() - // Stream stderr + // Stream stderr and watch for GPU error lines stderrDone := make(chan bool) go func() { defer close(stderrDone) @@ -235,6 +267,7 @@ func (p *RenderProcessor) runBlender(ctx *Context, blenderBinary, blendFile, out for scanner.Scan() { line := scanner.Text() if line != "" { + p.checkGPUErrorLine(ctx, line) shouldFilter, logLevel := blender.FilterLog(line) if !shouldFilter { if logLevel == types.LogLevelInfo {