/ src / python / txtai / workflow / task / service.py
service.py
  1  """
  2  ServiceTask module
  3  """
  4  
  5  # Conditional import
  6  try:
  7      import requests
  8      import xmltodict
  9  
 10      XML_TO_DICT = True
 11  except ImportError:
 12      XML_TO_DICT = False
 13  
 14  from .base import Task
 15  
 16  
 17  class ServiceTask(Task):
 18      """
 19      Task to runs requests against remote service urls.
 20      """
 21  
 22      def register(self, url=None, method=None, params=None, batch=True, extract=None):
 23          """
 24          Adds service parameters to task. Checks if required dependencies are installed.
 25  
 26          Args:
 27              url: url to connect to
 28              method: http method, GET or POST
 29              params: default query parameters
 30              batch: if True, all elements are passed in a single batch request, otherwise a service call is executed per element
 31              extract: list of sections to extract from response
 32          """
 33  
 34          if not XML_TO_DICT:
 35              raise ImportError('ServiceTask is not available - install "workflow" extra to enable')
 36  
 37          # pylint: disable=W0201
 38          # Save URL, method and parameter defaults
 39          self.url = url
 40          self.method = method
 41          self.params = params
 42  
 43          # If True, all elements are passed in a single batch request, otherwise a service call is executed per element
 44          self.batch = batch
 45  
 46          # Save sections to extract. Supports both a single string and a hierarchical list of sections.
 47          self.extract = extract
 48          if self.extract:
 49              self.extract = [self.extract] if isinstance(self.extract, str) else self.extract
 50  
 51      def execute(self, elements, executor=None):
 52          if self.batch:
 53              elements = self.request(elements)
 54          else:
 55              elements = [self.request(element) for element in elements]
 56  
 57          return super().execute(elements, executor)
 58  
 59      def request(self, data):
 60          """
 61          Execute service request.
 62  
 63          Args:
 64              url: service url
 65              method: method (get or post)
 66              params: dict of constant parameters to pass to request
 67              data: dynamic data for this specific request
 68  
 69          Returns:
 70              response as JSON
 71          """
 72  
 73          if not self.params:
 74              params = data
 75          else:
 76              # Create copy of parameters
 77              params = self.params.copy()
 78  
 79              # Add data to parameters
 80              for key in params:
 81                  if not params[key]:
 82                      params[key] = data
 83  
 84          # Run request
 85          if self.method and self.method.lower() == "get":
 86              response = requests.get(self.url, params=params)
 87          else:
 88              response = requests.post(self.url, json=params)
 89  
 90          # Parse data based on content-type
 91          mimetype = response.headers["Content-Type"].split(";")[0]
 92          if mimetype.lower().endswith("xml"):
 93              data = xmltodict.parse(response.text)
 94          else:
 95              data = response.json()
 96  
 97          # Extract content from response, if necessary
 98          if self.extract:
 99              for tag in self.extract:
100                  data = data[tag]
101  
102          return data