/ board_game_scraper / pipelines.py
pipelines.py
1 # -*- coding: utf-8 -*- 2 3 """ Scrapy item pipelines """ 4 5 import logging 6 import math 7 import re 8 9 from itertools import islice 10 from urllib.parse import quote, unquote_plus 11 from typing import Optional 12 13 import jmespath 14 15 from itemadapter import ItemAdapter 16 from pytility import clear_list, take_first 17 from scrapy import Request 18 from scrapy.exceptions import DropItem, NotConfigured 19 from scrapy.utils.defer import defer_result 20 from scrapy.utils.misc import arg_to_iter 21 from scrapy.utils.python import flatten 22 from twisted.internet.defer import DeferredList 23 24 from .utils import REGEX_DBPEDIA_DOMAIN, parse_json, parse_url 25 26 LOGGER = logging.getLogger(__name__) 27 28 29 class DataTypePipeline: 30 """convert fields to their required data type""" 31 32 # pylint: disable=no-self-use,unused-argument 33 def process_item(self, item, spider): 34 """convert to data type""" 35 36 for field in item.fields: 37 dtype = item.fields[field].get("dtype") 38 default = item.fields[field].get("default", NotImplemented) 39 40 if item.get(field) is None and default is not NotImplemented: 41 item[field] = default() if callable(default) else default 42 43 if not dtype or item.get(field) is None or isinstance(item[field], dtype): 44 continue 45 46 try: 47 item[field] = dtype(item[field]) 48 except Exception as exc: 49 if default is NotImplemented: 50 raise DropItem( 51 'Could not convert field "{}" to datatype "{}" in item "{}"'.format( 52 field, dtype, item 53 ) 54 ) from exc 55 56 item[field] = default() if callable(default) else default 57 58 return item 59 60 61 class ResolveLabelPipeline: 62 """resolve labels""" 63 64 @classmethod 65 def from_crawler(cls, crawler): 66 """init from crawler""" 67 68 url = crawler.settings.get("RESOLVE_LABEL_URL") 69 fields = crawler.settings.getlist("RESOLVE_LABEL_FIELDS") 70 71 if not url or not fields: 72 raise NotConfigured 73 74 lang_priorities = crawler.settings.getlist("RESOLVE_LABEL_LANGUAGE_PRIORITIES") 75 76 return cls(url=url, fields=fields, lang_priorities=lang_priorities) 77 78 def __init__(self, url, fields, lang_priorities=None): 79 self.url = url 80 self.fields = fields 81 self.lang_priorities = { 82 lang: prio for prio, lang in enumerate(arg_to_iter(lang_priorities)) 83 } 84 self.labels = {} 85 self.logger = LOGGER 86 87 def _extract_labels(self, response, value): 88 json_obj = parse_json(response.text) if hasattr(response, "text") else {} 89 90 labels = take_first(jmespath.search(f"entities.{value}.labels", json_obj)) or {} 91 labels = labels.values() 92 labels = sorted( 93 labels, 94 key=lambda label: self.lang_priorities.get(label.get("language"), math.inf), 95 ) 96 labels = clear_list(label.get("value") for label in labels) 97 98 self.labels[value] = labels 99 self.logger.debug("resolved labels for %s: %s", value, labels) 100 101 return labels 102 103 def _deferred_value(self, value, spider): 104 labels = self.labels.get(value) 105 if labels is not None: 106 self.logger.debug("found labels in cache for %s: %s", value, labels) 107 return defer_result(labels) 108 109 request = Request(self.url.format(value), priority=1) 110 deferred = spider.crawler.engine.download(request, spider) 111 deferred.addBoth(self._extract_labels, value) 112 return deferred 113 114 def _add_value(self, result, field, item): 115 labels = clear_list(flatten(r[1] for r in arg_to_iter(result))) or None 116 self.logger.debug("resolved labels for %s: %s", item.get(field), labels) 117 item[field] = labels 118 return item 119 120 def _deferred_field(self, field, item, spider): 121 deferreds = [ 122 self._deferred_value(value, spider) 123 for value in arg_to_iter(item.get(field)) 124 ] 125 if not deferreds: 126 item[field] = None 127 return defer_result(item) 128 deferred = DeferredList(deferreds, consumeErrors=True) 129 deferred.addBoth(self._add_value, field, item) 130 return deferred 131 132 def process_item(self, item, spider): 133 """resolve IDs to labels in specified fields""" 134 135 if not any(item.get(field) for field in self.fields): 136 return item 137 138 deferred = DeferredList( 139 [self._deferred_field(field, item, spider) for field in self.fields], 140 consumeErrors=True, 141 ) 142 deferred.addBoth(lambda _: item) 143 return deferred 144 145 146 class ResolveImagePipeline: 147 """resolve image URLs""" 148 149 fields = ("image_url",) 150 hostnames = ( 151 "dbpedia.org", 152 "www.dbpedia.org", 153 "wikidata.org", 154 "www.wikidata.org", 155 REGEX_DBPEDIA_DOMAIN, 156 ) 157 regex_path = re.compile(r"^/(resource/File:|wiki/Special:EntityData/)(.+)$") 158 url = "https://commons.wikimedia.org/wiki/Special:Redirect/file/{}" 159 logger = LOGGER 160 161 def _parse_url(self, url): 162 parsed = parse_url(url, self.hostnames) 163 if not parsed: 164 return url 165 166 match = self.regex_path.match(parsed.path) 167 if not match: 168 return url 169 170 commons_id = unquote_plus(match.group(2)) 171 commons_id = commons_id.replace(" ", "_") 172 commons_id = quote(commons_id) 173 174 result = self.url.format(commons_id) 175 self.logger.debug("converted URL <%s> to <%s>", url, result) 176 return result 177 178 # pylint: disable=unused-argument 179 def process_item(self, item, spider): 180 """resolve resource image URLs to actual file locations""" 181 for field in self.fields: 182 if item.get(field): 183 item[field] = clear_list(map(self._parse_url, arg_to_iter(item[field]))) 184 return item 185 186 187 class LimitImagesPipeline: 188 """Copy a limited number of image URLs to be downloaded from source to target.""" 189 190 source_field: str 191 target_field: str 192 limit: Optional[int] = None 193 194 @classmethod 195 def from_crawler(cls, crawler): 196 """Init from crawler.""" 197 198 source_field = crawler.settings.get("LIMIT_IMAGES_URLS_FIELD") 199 target_field = crawler.settings.get("IMAGES_URLS_FIELD") 200 201 if not source_field or not target_field: 202 raise NotConfigured 203 204 limit = crawler.settings.getint("LIMIT_IMAGES_TO_DOWNLOAD", -1) 205 206 return cls( 207 source_field=source_field, 208 target_field=target_field, 209 limit=limit, 210 ) 211 212 def __init__( 213 self, source_field: str, target_field: str, limit: Optional[int] = None 214 ): 215 self.source_field = source_field 216 self.target_field = target_field 217 self.limit = limit 218 219 # pylint: disable=unused-argument 220 def process_item(self, item, spider): 221 """Copy a limited number of image URLs to be downloaded from source to target.""" 222 223 # adding target field would result in error; return item as-is 224 if hasattr(item, "fields") and self.target_field not in item.fields: 225 return item 226 227 if self.limit is None or self.limit < 0: # copy through everything 228 item[self.target_field] = list(arg_to_iter(item.get(self.source_field))) 229 return item 230 231 if not self.limit: # limit is zero 232 item[self.target_field] = [] 233 return item 234 235 # actual limit 236 item[self.target_field] = list( 237 islice(arg_to_iter(item.get(self.source_field)), self.limit) 238 ) 239 return item 240 241 242 class CleanItemPipeline: 243 """Clean up unnecessary values from an item.""" 244 245 drop_falsey: bool 246 drop_values: Optional[tuple] 247 248 @classmethod 249 def from_crawler(cls, crawler): 250 """Init from crawler.""" 251 252 drop_falsey = crawler.settings.getbool("CLEAN_ITEM_DROP_FALSEY") 253 drop_values = ( 254 tuple(arg_to_iter(crawler.settings.getlist("CLEAN_ITEM_DROP_VALUES"))) 255 or None 256 ) 257 258 if not drop_falsey and not drop_values: 259 raise NotConfigured 260 261 return cls(drop_falsey=drop_falsey, drop_values=drop_values) 262 263 def __init__(self, drop_falsey: bool, drop_values: Optional[tuple]): 264 self.drop_falsey = drop_falsey 265 self.drop_values = drop_values 266 267 # pylint: disable=unused-argument 268 def process_item(self, item, spider): 269 """Clean up unnecessary values from an item.""" 270 271 adapter = ItemAdapter(item) 272 273 for key in tuple(adapter.keys()): 274 if (self.drop_falsey and not adapter[key]) or ( 275 self.drop_values is not None and adapter[key] in self.drop_values 276 ): 277 del adapter[key] 278 279 return item