// Package tasks provides task processing implementations. package tasks import ( "errors" "fmt" "jiggablend/internal/runner/api" "jiggablend/internal/runner/blender" "jiggablend/internal/runner/encoding" "jiggablend/internal/runner/workspace" "jiggablend/pkg/executils" "jiggablend/pkg/types" "os/exec" "sync" "time" ) // Processor handles a specific task type. type Processor interface { Process(ctx *Context) error } // Context provides task execution context. type Context struct { TaskID int64 JobID int64 JobName string Frame int // frame start (inclusive); kept for backward compat FrameEnd int // frame end (inclusive); same as Frame for single-frame TaskType string WorkDir string JobToken string Metadata *types.BlendMetadata Manager *api.ManagerClient JobConn *api.JobConnection Workspace *workspace.Manager Blender *blender.Manager Encoder *encoding.Selector Processes *executils.ProcessTracker } // ErrJobCancelled indicates the manager-side job was cancelled during execution. var ErrJobCancelled = errors.New("job cancelled") // NewContext creates a new task context. frameEnd should be >= frame; if 0 or less than frame, it is treated as single-frame (frameEnd = frame). func NewContext( taskID, jobID int64, jobName string, frameStart, frameEnd int, taskType string, workDir string, jobToken string, metadata *types.BlendMetadata, manager *api.ManagerClient, jobConn *api.JobConnection, ws *workspace.Manager, blenderMgr *blender.Manager, encoder *encoding.Selector, processes *executils.ProcessTracker, ) *Context { if frameEnd < frameStart { frameEnd = frameStart } return &Context{ TaskID: taskID, JobID: jobID, JobName: jobName, Frame: frameStart, FrameEnd: frameEnd, TaskType: taskType, WorkDir: workDir, JobToken: jobToken, Metadata: metadata, Manager: manager, JobConn: jobConn, Workspace: ws, Blender: blenderMgr, Encoder: encoder, Processes: processes, } } // Log sends a log entry to the manager. func (c *Context) Log(level types.LogLevel, message string) { if c.JobConn != nil { c.JobConn.Log(c.TaskID, level, message) } } // Info logs an info message. func (c *Context) Info(message string) { c.Log(types.LogLevelInfo, message) } // Warn logs a warning message. func (c *Context) Warn(message string) { c.Log(types.LogLevelWarn, message) } // Error logs an error message. func (c *Context) Error(message string) { c.Log(types.LogLevelError, message) } // Progress sends a progress update. func (c *Context) Progress(progress float64) { if c.JobConn != nil { c.JobConn.Progress(c.TaskID, progress) } } // OutputUploaded notifies that an output file was uploaded. func (c *Context) OutputUploaded(fileName string) { if c.JobConn != nil { c.JobConn.OutputUploaded(c.TaskID, fileName) } } // Complete sends task completion. func (c *Context) Complete(success bool, errorMsg error) { if c.JobConn != nil { c.JobConn.Complete(c.TaskID, success, errorMsg) } } // GetOutputFormat returns the output format from metadata or default. func (c *Context) GetOutputFormat() string { if c.Metadata != nil && c.Metadata.RenderSettings.OutputFormat != "" { return c.Metadata.RenderSettings.OutputFormat } return "PNG" } // GetFrameRate returns the frame rate from metadata or default. func (c *Context) GetFrameRate() float64 { if c.Metadata != nil && c.Metadata.RenderSettings.FrameRate > 0 { return c.Metadata.RenderSettings.FrameRate } return 24.0 } // GetBlenderVersion returns the Blender version from metadata. func (c *Context) GetBlenderVersion() string { if c.Metadata != nil { return c.Metadata.BlenderVersion } return "" } // ShouldUnhideObjects returns whether to unhide objects. func (c *Context) ShouldUnhideObjects() bool { return c.Metadata != nil && c.Metadata.UnhideObjects != nil && *c.Metadata.UnhideObjects } // ShouldEnableExecution returns whether to enable auto-execution. func (c *Context) ShouldEnableExecution() bool { return c.Metadata != nil && c.Metadata.EnableExecution != nil && *c.Metadata.EnableExecution } // IsJobCancelled checks whether the manager marked this job as cancelled. func (c *Context) IsJobCancelled() (bool, error) { if c.Manager == nil { return false, nil } status, err := c.Manager.GetJobStatus(c.JobID) if err != nil { return false, err } return status == types.JobStatusCancelled, nil } // CheckCancelled returns ErrJobCancelled if the job was cancelled. func (c *Context) CheckCancelled() error { cancelled, err := c.IsJobCancelled() if err != nil { return fmt.Errorf("failed to check job status: %w", err) } if cancelled { return ErrJobCancelled } return nil } // StartCancellationMonitor polls manager status and kills cmd if job is cancelled. // Caller must invoke returned stop function when cmd exits. func (c *Context) StartCancellationMonitor(cmd *exec.Cmd, taskLabel string) func() { stop := make(chan struct{}) var once sync.Once go func() { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { select { case <-stop: return case <-ticker.C: cancelled, err := c.IsJobCancelled() if err != nil { c.Warn(fmt.Sprintf("Could not check cancellation for %s task: %v", taskLabel, err)) continue } if !cancelled { continue } c.Warn(fmt.Sprintf("Job %d was cancelled, stopping %s task early", c.JobID, taskLabel)) if cmd != nil && cmd.Process != nil { _ = cmd.Process.Kill() } return } } }() return func() { once.Do(func() { close(stop) }) } }