base.py
1 """ 2 Workflow module 3 """ 4 5 import logging 6 import time 7 import traceback 8 9 from datetime import datetime 10 11 # Conditional import 12 try: 13 from croniter import croniter 14 15 CRONITER = True 16 except ImportError: 17 CRONITER = False 18 19 from .execute import Execute 20 21 # Logging configuration 22 logger = logging.getLogger(__name__) 23 24 25 class Workflow: 26 """ 27 Base class for all workflows. 28 """ 29 30 def __init__(self, tasks, batch=100, workers=None, name=None, stream=None): 31 """ 32 Creates a new workflow. Workflows are lists of tasks to execute. 33 34 Args: 35 tasks: list of workflow tasks 36 batch: how many items to process at a time, defaults to 100 37 workers: number of concurrent workers 38 name: workflow name 39 stream: workflow stream processor 40 """ 41 42 self.tasks = tasks 43 self.batch = batch 44 self.workers = workers 45 self.name = name 46 self.stream = stream 47 48 # Set default number of executor workers to max number of actions in a task 49 self.workers = max(len(task.action) for task in self.tasks) if not self.workers else self.workers 50 51 def __call__(self, elements): 52 """ 53 Executes a workflow for input elements. This method returns a generator that yields transformed 54 data elements. 55 56 Args: 57 elements: iterable data elements 58 59 Returns: 60 generator that yields transformed data elements 61 """ 62 63 # Create execute instance for this run 64 with Execute(self.workers) as executor: 65 # Run task initializers 66 self.initialize() 67 68 # Process elements with stream processor, if available 69 elements = self.stream(elements) if self.stream else elements 70 71 # Process elements in batches 72 for batch in self.chunk(elements): 73 yield from self.process(batch, executor) 74 75 # Run task finalizers 76 self.finalize() 77 78 def schedule(self, cron, elements, iterations=None): 79 """ 80 Schedules a workflow using a cron expression and elements. 81 82 Args: 83 cron: cron expression 84 elements: iterable data elements passed to workflow each call 85 iterations: number of times to run workflow, defaults to run indefinitely 86 """ 87 88 # Check that croniter is installed 89 if not CRONITER: 90 raise ImportError('Workflow scheduling is not available - install "workflow" extra to enable') 91 92 logger.info("'%s' scheduler started with schedule %s", self.name, cron) 93 94 maxiterations = iterations 95 while iterations is None or iterations > 0: 96 # Schedule using localtime 97 schedule = croniter(cron, datetime.now().astimezone()).get_next(datetime) 98 logger.info("'%s' next run scheduled for %s", self.name, schedule.isoformat()) 99 time.sleep(schedule.timestamp() - time.time()) 100 101 # Run workflow 102 # pylint: disable=W0703 103 try: 104 for _ in self(elements): 105 pass 106 except Exception: 107 logger.error(traceback.format_exc()) 108 109 # Decrement iterations remaining, if necessary 110 if iterations is not None: 111 iterations -= 1 112 113 logger.info("'%s' max iterations (%d) reached", self.name, maxiterations) 114 115 def initialize(self): 116 """ 117 Runs task initializer methods (if any) before processing data. 118 """ 119 120 # Run task initializers 121 for task in self.tasks: 122 if task.initialize: 123 task.initialize() 124 125 def chunk(self, elements): 126 """ 127 Splits elements into batches. This method efficiently processes both fixed size inputs and 128 dynamically generated inputs. 129 130 Args: 131 elements: iterable data elements 132 133 Returns: 134 evenly sized batches with the last batch having the remaining elements 135 """ 136 137 # Build batches by slicing elements, more efficient for fixed sized inputs 138 if hasattr(elements, "__len__") and hasattr(elements, "__getitem__"): 139 for x in range(0, len(elements), self.batch): 140 yield elements[x : x + self.batch] 141 142 # Build batches by iterating over elements when inputs are dynamically generated (i.e. generators) 143 else: 144 batch = [] 145 for x in elements: 146 batch.append(x) 147 148 if len(batch) == self.batch: 149 yield batch 150 batch = [] 151 152 # Final batch 153 if batch: 154 yield batch 155 156 def process(self, elements, executor): 157 """ 158 Processes a batch of data elements. 159 160 Args: 161 elements: iterable data elements 162 executor: execute instance, enables concurrent task actions 163 164 Returns: 165 transformed data elements 166 """ 167 168 # Run elements through each task 169 for x, task in enumerate(self.tasks): 170 logger.debug("Running Task #%d", x) 171 elements = task(elements, executor) 172 173 # Yield results processed by all tasks 174 yield from elements 175 176 def finalize(self): 177 """ 178 Runs task finalizer methods (if any) after all data processed. 179 """ 180 181 # Run task finalizers 182 for task in self.tasks: 183 if task.finalize: 184 task.finalize()