step.py
1 from __future__ import annotations 2 3 """Protocol for pipeline step implementations.""" 4 5 from typing import Protocol, TypeVar 6 7 from pydantic import BaseModel 8 9 T = TypeVar("T", bound=BaseModel) 10 11 12 class PipelineStep(Protocol[T]): 13 """Protocol for pipeline step implementations. 14 15 Any class implementing the run method can be used as a pipeline step. 16 This allows swapping step implementations without changing the rest of the code. 17 """ 18 19 def run(self, context: T) -> None: 20 """Execute the pipeline step. 21 22 Parameters 23 ---------- 24 context 25 The pipeline context to operate on and modify. 26 27 Raises 28 ------ 29 Exception 30 If the step fails and cannot recover, it should raise an exception 31 or mark the context as failed. 32 """ 33 ...