context.py
1 from __future__ import annotations 2 3 """Base context for pipeline execution.""" 4 5 6 7 from pydantic import BaseModel 8 9 from .status import PipelineStatus 10 11 12 class PipelineContext(BaseModel): 13 """Base context for pipeline execution.""" 14 15 status: PipelineStatus = PipelineStatus.PENDING 16 error: str | None = None 17 18 def mark_failed(self, error: str) -> None: 19 """Mark the context as failed with an error message.""" 20 self.status = PipelineStatus.FAILED 21 self.error = error 22 23 def mark_completed(self) -> None: 24 """Mark the context as completed.""" 25 self.status = PipelineStatus.COMPLETED 26 27 def mark_running(self) -> None: 28 """Mark the context as running.""" 29 self.status = PipelineStatus.RUNNING