/ board_game_scraper / pipelines.py
pipelines.py
  1  # -*- coding: utf-8 -*-
  2  
  3  """ Scrapy item pipelines """
  4  
  5  import logging
  6  import math
  7  import re
  8  
  9  from itertools import islice
 10  from urllib.parse import quote, unquote_plus
 11  from typing import Optional
 12  
 13  import jmespath
 14  
 15  from itemadapter import ItemAdapter
 16  from pytility import clear_list, take_first
 17  from scrapy import Request
 18  from scrapy.exceptions import DropItem, NotConfigured
 19  from scrapy.utils.defer import defer_result
 20  from scrapy.utils.misc import arg_to_iter
 21  from scrapy.utils.python import flatten
 22  from twisted.internet.defer import DeferredList
 23  
 24  from .utils import REGEX_DBPEDIA_DOMAIN, parse_json, parse_url
 25  
 26  LOGGER = logging.getLogger(__name__)
 27  
 28  
 29  class DataTypePipeline:
 30      """convert fields to their required data type"""
 31  
 32      # pylint: disable=no-self-use,unused-argument
 33      def process_item(self, item, spider):
 34          """convert to data type"""
 35  
 36          for field in item.fields:
 37              dtype = item.fields[field].get("dtype")
 38              default = item.fields[field].get("default", NotImplemented)
 39  
 40              if item.get(field) is None and default is not NotImplemented:
 41                  item[field] = default() if callable(default) else default
 42  
 43              if not dtype or item.get(field) is None or isinstance(item[field], dtype):
 44                  continue
 45  
 46              try:
 47                  item[field] = dtype(item[field])
 48              except Exception as exc:
 49                  if default is NotImplemented:
 50                      raise DropItem(
 51                          'Could not convert field "{}" to datatype "{}" in item "{}"'.format(
 52                              field, dtype, item
 53                          )
 54                      ) from exc
 55  
 56                  item[field] = default() if callable(default) else default
 57  
 58          return item
 59  
 60  
 61  class ResolveLabelPipeline:
 62      """resolve labels"""
 63  
 64      @classmethod
 65      def from_crawler(cls, crawler):
 66          """init from crawler"""
 67  
 68          url = crawler.settings.get("RESOLVE_LABEL_URL")
 69          fields = crawler.settings.getlist("RESOLVE_LABEL_FIELDS")
 70  
 71          if not url or not fields:
 72              raise NotConfigured
 73  
 74          lang_priorities = crawler.settings.getlist("RESOLVE_LABEL_LANGUAGE_PRIORITIES")
 75  
 76          return cls(url=url, fields=fields, lang_priorities=lang_priorities)
 77  
 78      def __init__(self, url, fields, lang_priorities=None):
 79          self.url = url
 80          self.fields = fields
 81          self.lang_priorities = {
 82              lang: prio for prio, lang in enumerate(arg_to_iter(lang_priorities))
 83          }
 84          self.labels = {}
 85          self.logger = LOGGER
 86  
 87      def _extract_labels(self, response, value):
 88          json_obj = parse_json(response.text) if hasattr(response, "text") else {}
 89  
 90          labels = take_first(jmespath.search(f"entities.{value}.labels", json_obj)) or {}
 91          labels = labels.values()
 92          labels = sorted(
 93              labels,
 94              key=lambda label: self.lang_priorities.get(label.get("language"), math.inf),
 95          )
 96          labels = clear_list(label.get("value") for label in labels)
 97  
 98          self.labels[value] = labels
 99          self.logger.debug("resolved labels for %s: %s", value, labels)
100  
101          return labels
102  
103      def _deferred_value(self, value, spider):
104          labels = self.labels.get(value)
105          if labels is not None:
106              self.logger.debug("found labels in cache for %s: %s", value, labels)
107              return defer_result(labels)
108  
109          request = Request(self.url.format(value), priority=1)
110          deferred = spider.crawler.engine.download(request, spider)
111          deferred.addBoth(self._extract_labels, value)
112          return deferred
113  
114      def _add_value(self, result, field, item):
115          labels = clear_list(flatten(r[1] for r in arg_to_iter(result))) or None
116          self.logger.debug("resolved labels for %s: %s", item.get(field), labels)
117          item[field] = labels
118          return item
119  
120      def _deferred_field(self, field, item, spider):
121          deferreds = [
122              self._deferred_value(value, spider)
123              for value in arg_to_iter(item.get(field))
124          ]
125          if not deferreds:
126              item[field] = None
127              return defer_result(item)
128          deferred = DeferredList(deferreds, consumeErrors=True)
129          deferred.addBoth(self._add_value, field, item)
130          return deferred
131  
132      def process_item(self, item, spider):
133          """resolve IDs to labels in specified fields"""
134  
135          if not any(item.get(field) for field in self.fields):
136              return item
137  
138          deferred = DeferredList(
139              [self._deferred_field(field, item, spider) for field in self.fields],
140              consumeErrors=True,
141          )
142          deferred.addBoth(lambda _: item)
143          return deferred
144  
145  
146  class ResolveImagePipeline:
147      """resolve image URLs"""
148  
149      fields = ("image_url",)
150      hostnames = (
151          "dbpedia.org",
152          "www.dbpedia.org",
153          "wikidata.org",
154          "www.wikidata.org",
155          REGEX_DBPEDIA_DOMAIN,
156      )
157      regex_path = re.compile(r"^/(resource/File:|wiki/Special:EntityData/)(.+)$")
158      url = "https://commons.wikimedia.org/wiki/Special:Redirect/file/{}"
159      logger = LOGGER
160  
161      def _parse_url(self, url):
162          parsed = parse_url(url, self.hostnames)
163          if not parsed:
164              return url
165  
166          match = self.regex_path.match(parsed.path)
167          if not match:
168              return url
169  
170          commons_id = unquote_plus(match.group(2))
171          commons_id = commons_id.replace(" ", "_")
172          commons_id = quote(commons_id)
173  
174          result = self.url.format(commons_id)
175          self.logger.debug("converted URL <%s> to <%s>", url, result)
176          return result
177  
178      # pylint: disable=unused-argument
179      def process_item(self, item, spider):
180          """resolve resource image URLs to actual file locations"""
181          for field in self.fields:
182              if item.get(field):
183                  item[field] = clear_list(map(self._parse_url, arg_to_iter(item[field])))
184          return item
185  
186  
187  class LimitImagesPipeline:
188      """Copy a limited number of image URLs to be downloaded from source to target."""
189  
190      source_field: str
191      target_field: str
192      limit: Optional[int] = None
193  
194      @classmethod
195      def from_crawler(cls, crawler):
196          """Init from crawler."""
197  
198          source_field = crawler.settings.get("LIMIT_IMAGES_URLS_FIELD")
199          target_field = crawler.settings.get("IMAGES_URLS_FIELD")
200  
201          if not source_field or not target_field:
202              raise NotConfigured
203  
204          limit = crawler.settings.getint("LIMIT_IMAGES_TO_DOWNLOAD", -1)
205  
206          return cls(
207              source_field=source_field,
208              target_field=target_field,
209              limit=limit,
210          )
211  
212      def __init__(
213          self, source_field: str, target_field: str, limit: Optional[int] = None
214      ):
215          self.source_field = source_field
216          self.target_field = target_field
217          self.limit = limit
218  
219      # pylint: disable=unused-argument
220      def process_item(self, item, spider):
221          """Copy a limited number of image URLs to be downloaded from source to target."""
222  
223          # adding target field would result in error; return item as-is
224          if hasattr(item, "fields") and self.target_field not in item.fields:
225              return item
226  
227          if self.limit is None or self.limit < 0:  # copy through everything
228              item[self.target_field] = list(arg_to_iter(item.get(self.source_field)))
229              return item
230  
231          if not self.limit:  # limit is zero
232              item[self.target_field] = []
233              return item
234  
235          # actual limit
236          item[self.target_field] = list(
237              islice(arg_to_iter(item.get(self.source_field)), self.limit)
238          )
239          return item
240  
241  
242  class CleanItemPipeline:
243      """Clean up unnecessary values from an item."""
244  
245      drop_falsey: bool
246      drop_values: Optional[tuple]
247  
248      @classmethod
249      def from_crawler(cls, crawler):
250          """Init from crawler."""
251  
252          drop_falsey = crawler.settings.getbool("CLEAN_ITEM_DROP_FALSEY")
253          drop_values = (
254              tuple(arg_to_iter(crawler.settings.getlist("CLEAN_ITEM_DROP_VALUES")))
255              or None
256          )
257  
258          if not drop_falsey and not drop_values:
259              raise NotConfigured
260  
261          return cls(drop_falsey=drop_falsey, drop_values=drop_values)
262  
263      def __init__(self, drop_falsey: bool, drop_values: Optional[tuple]):
264          self.drop_falsey = drop_falsey
265          self.drop_values = drop_values
266  
267      # pylint: disable=unused-argument
268      def process_item(self, item, spider):
269          """Clean up unnecessary values from an item."""
270  
271          adapter = ItemAdapter(item)
272  
273          for key in tuple(adapter.keys()):
274              if (self.drop_falsey and not adapter[key]) or (
275                  self.drop_values is not None and adapter[key] in self.drop_values
276              ):
277                  del adapter[key]
278  
279          return item