factory.py
1 """ 2 Workflow factory module 3 """ 4 5 from .base import Workflow 6 from .task import TaskFactory 7 8 9 class WorkflowFactory: 10 """ 11 Workflow factory. Creates new Workflow instances. 12 """ 13 14 @staticmethod 15 def create(config, name): 16 """ 17 Creates a new Workflow instance. 18 19 Args: 20 config: Workflow configuration 21 name: Workflow name 22 23 Returns: 24 Workflow 25 """ 26 27 # Resolve workflow tasks 28 tasks = [] 29 for tconfig in config["tasks"]: 30 task = tconfig.pop("task") if "task" in tconfig else "" 31 tasks.append(TaskFactory.create(tconfig, task)) 32 33 config["tasks"] = tasks 34 35 if "stream" in config: 36 sconfig = config["stream"] 37 task = sconfig.pop("task") if "task" in sconfig else "stream" 38 39 config["stream"] = TaskFactory.create(sconfig, task) 40 41 # Create workflow 42 return Workflow(**config, name=name)