/ src / python / txtai / workflow / task / stream.py
stream.py
 1  """
 2  StreamTask module
 3  """
 4  
 5  from .base import Task
 6  
 7  
 8  class StreamTask(Task):
 9      """
10      Task that calls a task action and yields results.
11      """
12  
13      def register(self, batch=False):
14          """
15          Adds stream parameters to task.
16  
17          Args:
18              batch: all elements are passed to a single action call if True, otherwise an action call is executed per element, defaults to False
19          """
20  
21          # pylint: disable=W0201
22          # All elements are passed to a single action call if True, otherwise an action call is executed per element, defaults to False
23          self.batch = batch
24  
25      def __call__(self, elements, executor=None):
26          for action in self.action:
27              if self.batch:
28                  # Single batch call
29                  yield from action(elements)
30              else:
31                  # Call action for each element
32                  for x in elements:
33                      yield from action(x)