/ src / python / txtai / workflow / base.py
base.py
  1  """
  2  Workflow module
  3  """
  4  
  5  import logging
  6  import time
  7  import traceback
  8  
  9  from datetime import datetime
 10  
 11  # Conditional import
 12  try:
 13      from croniter import croniter
 14  
 15      CRONITER = True
 16  except ImportError:
 17      CRONITER = False
 18  
 19  from .execute import Execute
 20  
 21  # Logging configuration
 22  logger = logging.getLogger(__name__)
 23  
 24  
 25  class Workflow:
 26      """
 27      Base class for all workflows.
 28      """
 29  
 30      def __init__(self, tasks, batch=100, workers=None, name=None, stream=None):
 31          """
 32          Creates a new workflow. Workflows are lists of tasks to execute.
 33  
 34          Args:
 35              tasks: list of workflow tasks
 36              batch: how many items to process at a time, defaults to 100
 37              workers: number of concurrent workers
 38              name: workflow name
 39              stream: workflow stream processor
 40          """
 41  
 42          self.tasks = tasks
 43          self.batch = batch
 44          self.workers = workers
 45          self.name = name
 46          self.stream = stream
 47  
 48          # Set default number of executor workers to max number of actions in a task
 49          self.workers = max(len(task.action) for task in self.tasks) if not self.workers else self.workers
 50  
 51      def __call__(self, elements):
 52          """
 53          Executes a workflow for input elements. This method returns a generator that yields transformed
 54          data elements.
 55  
 56          Args:
 57              elements: iterable data elements
 58  
 59          Returns:
 60              generator that yields transformed data elements
 61          """
 62  
 63          # Create execute instance for this run
 64          with Execute(self.workers) as executor:
 65              # Run task initializers
 66              self.initialize()
 67  
 68              # Process elements with stream processor, if available
 69              elements = self.stream(elements) if self.stream else elements
 70  
 71              # Process elements in batches
 72              for batch in self.chunk(elements):
 73                  yield from self.process(batch, executor)
 74  
 75              # Run task finalizers
 76              self.finalize()
 77  
 78      def schedule(self, cron, elements, iterations=None):
 79          """
 80          Schedules a workflow using a cron expression and elements.
 81  
 82          Args:
 83              cron: cron expression
 84              elements: iterable data elements passed to workflow each call
 85              iterations: number of times to run workflow, defaults to run indefinitely
 86          """
 87  
 88          # Check that croniter is installed
 89          if not CRONITER:
 90              raise ImportError('Workflow scheduling is not available - install "workflow" extra to enable')
 91  
 92          logger.info("'%s' scheduler started with schedule %s", self.name, cron)
 93  
 94          maxiterations = iterations
 95          while iterations is None or iterations > 0:
 96              # Schedule using localtime
 97              schedule = croniter(cron, datetime.now().astimezone()).get_next(datetime)
 98              logger.info("'%s' next run scheduled for %s", self.name, schedule.isoformat())
 99              time.sleep(schedule.timestamp() - time.time())
100  
101              # Run workflow
102              # pylint: disable=W0703
103              try:
104                  for _ in self(elements):
105                      pass
106              except Exception:
107                  logger.error(traceback.format_exc())
108  
109              # Decrement iterations remaining, if necessary
110              if iterations is not None:
111                  iterations -= 1
112  
113          logger.info("'%s' max iterations (%d) reached", self.name, maxiterations)
114  
115      def initialize(self):
116          """
117          Runs task initializer methods (if any) before processing data.
118          """
119  
120          # Run task initializers
121          for task in self.tasks:
122              if task.initialize:
123                  task.initialize()
124  
125      def chunk(self, elements):
126          """
127          Splits elements into batches. This method efficiently processes both fixed size inputs and
128          dynamically generated inputs.
129  
130          Args:
131              elements: iterable data elements
132  
133          Returns:
134              evenly sized batches with the last batch having the remaining elements
135          """
136  
137          # Build batches by slicing elements, more efficient for fixed sized inputs
138          if hasattr(elements, "__len__") and hasattr(elements, "__getitem__"):
139              for x in range(0, len(elements), self.batch):
140                  yield elements[x : x + self.batch]
141  
142          # Build batches by iterating over elements when inputs are dynamically generated (i.e. generators)
143          else:
144              batch = []
145              for x in elements:
146                  batch.append(x)
147  
148                  if len(batch) == self.batch:
149                      yield batch
150                      batch = []
151  
152              # Final batch
153              if batch:
154                  yield batch
155  
156      def process(self, elements, executor):
157          """
158          Processes a batch of data elements.
159  
160          Args:
161              elements: iterable data elements
162              executor: execute instance, enables concurrent task actions
163  
164          Returns:
165              transformed data elements
166          """
167  
168          # Run elements through each task
169          for x, task in enumerate(self.tasks):
170              logger.debug("Running Task #%d", x)
171              elements = task(elements, executor)
172  
173          # Yield results processed by all tasks
174          yield from elements
175  
176      def finalize(self):
177          """
178          Runs task finalizer methods (if any) after all data processed.
179          """
180  
181          # Run task finalizers
182          for task in self.tasks:
183              if task.finalize:
184                  task.finalize()