Files
jiggablend/internal/runner/tasks/processor.go
Justin Harms 6833bb4013 Add GPU error handling and lockout mechanism in Runner
- Introduced gpuLockedOut state in Runner to manage GPU rendering based on detected errors.
- Implemented SetGPULockedOut and IsGPULockedOut methods for controlling GPU usage.
- Enhanced Context to include GPULockedOut and OnGPUError for better error handling.
- Updated RenderProcessor to check for GPU errors in logs and trigger lockout as needed.
- Modified rendering logic to force CPU rendering when GPU lockout is active, improving stability during errors.
2026-03-13 10:01:39 -05:00

249 lines
6.4 KiB
Go

// Package tasks provides task processing implementations.
package tasks
import (
"errors"
"fmt"
"jiggablend/internal/runner/api"
"jiggablend/internal/runner/blender"
"jiggablend/internal/runner/encoding"
"jiggablend/internal/runner/workspace"
"jiggablend/pkg/executils"
"jiggablend/pkg/types"
"os/exec"
"sync"
"time"
)
// Processor handles a specific task type.
type Processor interface {
Process(ctx *Context) error
}
// Context provides task execution context.
type Context struct {
TaskID int64
JobID int64
JobName string
Frame int // frame start (inclusive); kept for backward compat
FrameEnd int // frame end (inclusive); same as Frame for single-frame
TaskType string
WorkDir string
JobToken string
Metadata *types.BlendMetadata
Manager *api.ManagerClient
JobConn *api.JobConnection
Workspace *workspace.Manager
Blender *blender.Manager
Encoder *encoding.Selector
Processes *executils.ProcessTracker
// GPULockedOut is set when the runner has detected a GPU error (e.g. HIP) and disables GPU for all jobs.
GPULockedOut bool
// OnGPUError is called when a GPU error line is seen in render logs; typically sets runner GPU lockout.
OnGPUError func()
}
// ErrJobCancelled indicates the manager-side job was cancelled during execution.
var ErrJobCancelled = errors.New("job cancelled")
// NewContext creates a new task context. frameEnd should be >= frame; if 0 or less than frame, it is treated as single-frame (frameEnd = frame).
// gpuLockedOut is the runner's current GPU lockout state; onGPUError is called when a GPU error is detected in logs (may be nil).
func NewContext(
taskID, jobID int64,
jobName string,
frameStart, frameEnd int,
taskType string,
workDir string,
jobToken string,
metadata *types.BlendMetadata,
manager *api.ManagerClient,
jobConn *api.JobConnection,
ws *workspace.Manager,
blenderMgr *blender.Manager,
encoder *encoding.Selector,
processes *executils.ProcessTracker,
gpuLockedOut bool,
onGPUError func(),
) *Context {
if frameEnd < frameStart {
frameEnd = frameStart
}
return &Context{
TaskID: taskID,
JobID: jobID,
JobName: jobName,
Frame: frameStart,
FrameEnd: frameEnd,
TaskType: taskType,
WorkDir: workDir,
JobToken: jobToken,
Metadata: metadata,
Manager: manager,
JobConn: jobConn,
Workspace: ws,
Blender: blenderMgr,
Encoder: encoder,
Processes: processes,
GPULockedOut: gpuLockedOut,
OnGPUError: onGPUError,
}
}
// Log sends a log entry to the manager.
func (c *Context) Log(level types.LogLevel, message string) {
if c.JobConn != nil {
c.JobConn.Log(c.TaskID, level, message)
}
}
// Info logs an info message.
func (c *Context) Info(message string) {
c.Log(types.LogLevelInfo, message)
}
// Warn logs a warning message.
func (c *Context) Warn(message string) {
c.Log(types.LogLevelWarn, message)
}
// Error logs an error message.
func (c *Context) Error(message string) {
c.Log(types.LogLevelError, message)
}
// Progress sends a progress update.
func (c *Context) Progress(progress float64) {
if c.JobConn != nil {
c.JobConn.Progress(c.TaskID, progress)
}
}
// OutputUploaded notifies that an output file was uploaded.
func (c *Context) OutputUploaded(fileName string) {
if c.JobConn != nil {
c.JobConn.OutputUploaded(c.TaskID, fileName)
}
}
// Complete sends task completion.
func (c *Context) Complete(success bool, errorMsg error) {
if c.JobConn != nil {
c.JobConn.Complete(c.TaskID, success, errorMsg)
}
}
// GetOutputFormat returns the output format from metadata or default.
func (c *Context) GetOutputFormat() string {
if c.Metadata != nil && c.Metadata.RenderSettings.OutputFormat != "" {
return c.Metadata.RenderSettings.OutputFormat
}
return "PNG"
}
// GetFrameRate returns the frame rate from metadata or default.
func (c *Context) GetFrameRate() float64 {
if c.Metadata != nil && c.Metadata.RenderSettings.FrameRate > 0 {
return c.Metadata.RenderSettings.FrameRate
}
return 24.0
}
// GetBlenderVersion returns the Blender version from metadata.
func (c *Context) GetBlenderVersion() string {
if c.Metadata != nil {
return c.Metadata.BlenderVersion
}
return ""
}
// ShouldUnhideObjects returns whether to unhide objects.
func (c *Context) ShouldUnhideObjects() bool {
return c.Metadata != nil && c.Metadata.UnhideObjects != nil && *c.Metadata.UnhideObjects
}
// ShouldEnableExecution returns whether to enable auto-execution.
func (c *Context) ShouldEnableExecution() bool {
return c.Metadata != nil && c.Metadata.EnableExecution != nil && *c.Metadata.EnableExecution
}
// ShouldForceCPU returns true if GPU should be disabled and CPU rendering forced
// (runner GPU lockout or metadata force_cpu in engine_settings).
func (c *Context) ShouldForceCPU() bool {
if c.GPULockedOut {
return true
}
if c.Metadata != nil && c.Metadata.RenderSettings.EngineSettings != nil {
if v, ok := c.Metadata.RenderSettings.EngineSettings["force_cpu"]; ok {
if b, ok := v.(bool); ok && b {
return true
}
}
}
return false
}
// IsJobCancelled checks whether the manager marked this job as cancelled.
func (c *Context) IsJobCancelled() (bool, error) {
if c.Manager == nil {
return false, nil
}
status, err := c.Manager.GetJobStatus(c.JobID)
if err != nil {
return false, err
}
return status == types.JobStatusCancelled, nil
}
// CheckCancelled returns ErrJobCancelled if the job was cancelled.
func (c *Context) CheckCancelled() error {
cancelled, err := c.IsJobCancelled()
if err != nil {
return fmt.Errorf("failed to check job status: %w", err)
}
if cancelled {
return ErrJobCancelled
}
return nil
}
// StartCancellationMonitor polls manager status and kills cmd if job is cancelled.
// Caller must invoke returned stop function when cmd exits.
func (c *Context) StartCancellationMonitor(cmd *exec.Cmd, taskLabel string) func() {
stop := make(chan struct{})
var once sync.Once
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-stop:
return
case <-ticker.C:
cancelled, err := c.IsJobCancelled()
if err != nil {
c.Warn(fmt.Sprintf("Could not check cancellation for %s task: %v", taskLabel, err))
continue
}
if !cancelled {
continue
}
c.Warn(fmt.Sprintf("Job %d was cancelled, stopping %s task early", c.JobID, taskLabel))
if cmd != nil && cmd.Process != nil {
_ = cmd.Process.Kill()
}
return
}
}
}()
return func() {
once.Do(func() {
close(stop)
})
}
}