/ haystack / components / fetchers / link_content.py
link_content.py
  1  # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
  2  #
  3  # SPDX-License-Identifier: Apache-2.0
  4  
  5  import asyncio
  6  from collections import defaultdict
  7  from collections.abc import Callable
  8  from concurrent.futures import ThreadPoolExecutor
  9  from dataclasses import replace
 10  from fnmatch import fnmatch
 11  from typing import Any, cast
 12  
 13  import httpx
 14  from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential
 15  
 16  from haystack import component, logging
 17  from haystack.dataclasses import ByteStream
 18  from haystack.lazy_imports import LazyImport
 19  from haystack.version import __version__
 20  
 21  # HTTP/2 support via lazy import
 22  with LazyImport("Run 'pip install httpx[http2]' to use HTTP/2 support") as h2_import:
 23      pass  # nothing to import as we simply set the http2 attribute, library handles the rest
 24  
 25  logger = logging.getLogger(__name__)
 26  
 27  DEFAULT_USER_AGENT = f"haystack/LinkContentFetcher/{__version__}"
 28  
 29  REQUEST_HEADERS = {
 30      "accept": "*/*",
 31      "User-Agent": DEFAULT_USER_AGENT,
 32      "Accept-Language": "en-US,en;q=0.9,it;q=0.8,es;q=0.7",
 33      "referer": "https://www.google.com/",
 34  }
 35  
 36  
 37  def _merge_headers(*args: dict[str, str]) -> dict[str, str]:
 38      """
 39      Merge a list of dict using case-insensitively
 40  
 41      :param args: a list of dict to merge
 42      :returns: The merged dict
 43      """
 44      merged = {}
 45      keymap = {}
 46  
 47      for d in args:
 48          for k, v in d.items():
 49              kl = k.lower()
 50              keymap[kl] = k
 51              merged[kl] = v
 52  
 53      return {keymap[kl]: v for kl, v in merged.items()}
 54  
 55  
 56  def _text_content_handler(response: httpx.Response) -> ByteStream:
 57      """
 58      Handles text content.
 59  
 60      :param response: Response object from the request.
 61      :returns: The extracted text.
 62      """
 63      return ByteStream.from_string(response.text)
 64  
 65  
 66  def _binary_content_handler(response: httpx.Response) -> ByteStream:
 67      """
 68      Handles binary content.
 69  
 70      :param response: Response object from the request.
 71      :returns: The extracted binary file-like object.
 72      """
 73      return ByteStream(data=response.content)
 74  
 75  
 76  @component
 77  class LinkContentFetcher:
 78      """
 79      Fetches and extracts content from URLs.
 80  
 81      It supports various content types, retries on failures, and automatic user-agent rotation for failed web
 82      requests. Use it as the data-fetching step in your pipelines.
 83  
 84      You may need to convert LinkContentFetcher's output into a list of documents. Use HTMLToDocument
 85      converter to do this.
 86  
 87      ### Usage example
 88  
 89      ```python
 90      from haystack.components.fetchers.link_content import LinkContentFetcher
 91  
 92      fetcher = LinkContentFetcher()
 93      streams = fetcher.run(urls=["https://www.google.com"])["streams"]
 94  
 95      assert len(streams) == 1
 96      assert streams[0].meta == {'content_type': 'text/html', 'url': 'https://www.google.com'}
 97      assert streams[0].data
 98      ```
 99  
100      For async usage:
101  
102      ```python
103      import asyncio
104      from haystack.components.fetchers import LinkContentFetcher
105  
106      async def fetch_async():
107          fetcher = LinkContentFetcher()
108          result = await fetcher.run_async(urls=["https://www.google.com"])
109          return result["streams"]
110  
111      streams = asyncio.run(fetch_async())
112      ```
113      """
114  
115      def __init__(
116          self,
117          raise_on_failure: bool = True,
118          user_agents: list[str] | None = None,
119          retry_attempts: int = 2,
120          timeout: int = 3,
121          http2: bool = False,
122          client_kwargs: dict | None = None,
123          request_headers: dict[str, str] | None = None,
124      ) -> None:
125          """
126          Initializes the component.
127  
128          :param raise_on_failure: If `True`, raises an exception if it fails to fetch a single URL.
129              For multiple URLs, it logs errors and returns the content it successfully fetched.
130          :param user_agents: [User agents](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent)
131              for fetching content. If `None`, a default user agent is used.
132          :param retry_attempts: The number of times to retry to fetch the URL's content.
133          :param timeout: Timeout in seconds for the request.
134          :param http2: Whether to enable HTTP/2 support for requests. Defaults to False.
135                       Requires the 'h2' package to be installed (via `pip install httpx[http2]`).
136          :param client_kwargs: Additional keyword arguments to pass to the httpx client.
137                       If `None`, default values are used.
138          """
139          self.raise_on_failure = raise_on_failure
140          self.user_agents = user_agents or [DEFAULT_USER_AGENT]
141          self.current_user_agent_idx: int = 0
142          self.retry_attempts = retry_attempts
143          self.timeout = timeout
144          self.http2 = http2
145          self.client_kwargs = client_kwargs or {}
146          self.request_headers = request_headers or {}
147  
148          # Configure default client settings
149          self.client_kwargs.setdefault("timeout", timeout)
150          self.client_kwargs.setdefault("follow_redirects", True)
151  
152          # Create httpx clients
153          client_kwargs = {**self.client_kwargs}
154  
155          # Optional HTTP/2 support
156          if http2:
157              try:
158                  h2_import.check()
159                  client_kwargs["http2"] = True
160              except ImportError:
161                  logger.warning(
162                      "HTTP/2 support requested but 'h2' package is not installed. "
163                      "Falling back to HTTP/1.1. Install with `pip install httpx[http2]` to enable HTTP/2 support."
164                  )
165                  self.http2 = False  # Update the setting to match actual capability
166  
167          # Initialize synchronous client
168          self._client = httpx.Client(**client_kwargs)
169  
170          # Initialize asynchronous client
171          self._async_client = httpx.AsyncClient(**client_kwargs)
172  
173          # register default content handlers that extract data from the response
174          self.handlers: dict[str, Callable[[httpx.Response], ByteStream]] = defaultdict(lambda: _text_content_handler)
175          self.handlers["text/*"] = _text_content_handler
176          self.handlers["text/html"] = _binary_content_handler
177          self.handlers["application/json"] = _text_content_handler
178          self.handlers["application/*"] = _binary_content_handler
179          self.handlers["image/*"] = _binary_content_handler
180          self.handlers["audio/*"] = _binary_content_handler
181          self.handlers["video/*"] = _binary_content_handler
182  
183          @retry(
184              reraise=True,
185              stop=stop_after_attempt(self.retry_attempts),
186              wait=wait_exponential(multiplier=1, min=2, max=10),
187              retry=(retry_if_exception_type((httpx.HTTPStatusError, httpx.RequestError))),
188              # This method is invoked only after failed requests (exception raised)
189              after=self._switch_user_agent,
190          )
191          def get_response(url: str) -> httpx.Response:
192              response = self._client.get(url, headers=self._get_headers())
193              response.raise_for_status()
194              return response
195  
196          self._get_response: Callable = get_response
197  
198      def _get_headers(self) -> dict[str, str]:
199          """
200          Build headers with precedence
201  
202          client defaults -> component defaults -> user-provided -> rotating UA
203          """
204          base = dict(self._client.headers)
205          return _merge_headers(
206              base, REQUEST_HEADERS, self.request_headers, {"User-Agent": self.user_agents[self.current_user_agent_idx]}
207          )
208  
209      def __del__(self) -> None:
210          """
211          Clean up resources when the component is deleted.
212  
213          Closes both the synchronous and asynchronous HTTP clients to prevent
214          resource leaks.
215          """
216          try:
217              # Close the synchronous client if it exists
218              if hasattr(self, "_client"):
219                  self._client.close()
220  
221              # There is no way to close the async client without await
222          except Exception:
223              # Suppress any exceptions during cleanup
224              pass
225  
226      @component.output_types(streams=list[ByteStream])
227      def run(self, urls: list[str]) -> dict[str, Any]:
228          """
229          Fetches content from a list of URLs and returns a list of extracted content streams.
230  
231          Each content stream is a `ByteStream` object containing the extracted content as binary data.
232          Each ByteStream object in the returned list corresponds to the contents of a single URL.
233          The content type of each stream is stored in the metadata of the ByteStream object under
234          the key "content_type". The URL of the fetched content is stored under the key "url".
235  
236          :param urls: A list of URLs to fetch content from.
237          :returns: `ByteStream` objects representing the extracted content.
238  
239          :raises Exception: If the provided list of URLs contains only a single URL, and `raise_on_failure` is set to
240              `True`, an exception will be raised in case of an error during content retrieval.
241              In all other scenarios, any retrieval errors are logged, and a list of successfully retrieved `ByteStream`
242               objects is returned.
243          """
244          streams: list[ByteStream] = []
245          if not urls:
246              return {"streams": streams}
247  
248          # don't use multithreading if there's only one URL
249          if len(urls) == 1:
250              stream_metadata, stream = self._fetch(urls[0])
251              stream.meta.update(stream_metadata)
252              stream = replace(stream, mime_type=stream.meta.get("content_type", None))
253              streams.append(stream)
254          else:
255              with ThreadPoolExecutor() as executor:
256                  results = executor.map(self._fetch_with_exception_suppression, urls)
257  
258              for stream_metadata, stream in results:  # type: ignore
259                  if stream_metadata is not None and stream is not None:
260                      stream.meta.update(stream_metadata)
261                      stream = replace(stream, mime_type=stream.meta.get("content_type", None))
262                      streams.append(stream)
263  
264          return {"streams": streams}
265  
266      @component.output_types(streams=list[ByteStream])
267      async def run_async(self, urls: list[str]) -> dict[str, Any]:
268          """
269          Asynchronously fetches content from a list of URLs and returns a list of extracted content streams.
270  
271          This is the asynchronous version of the `run` method with the same parameters and return values.
272  
273          :param urls: A list of URLs to fetch content from.
274          :returns: `ByteStream` objects representing the extracted content.
275          """
276          streams: list[ByteStream] = []
277          if not urls:
278              return {"streams": streams}
279  
280          # Create tasks for all URLs using _fetch_async directly
281          tasks = [self._fetch_async(url, self._async_client) for url in urls]
282  
283          # Only capture exceptions when we have multiple URLs or raise_on_failure=False
284          # This ensures errors propagate appropriately for single URLs with raise_on_failure=True
285          return_exceptions = not (len(urls) == 1 and self.raise_on_failure)
286          results = await asyncio.gather(*tasks, return_exceptions=return_exceptions)
287  
288          # Process results
289          for i, result in enumerate(results):
290              # Handle exception results (only happens when return_exceptions=True)
291              if isinstance(result, Exception):
292                  logger.warning("Error fetching {url}: {error}", url=urls[i], error=str(result))
293                  # Add an empty result for failed URLs when raise_on_failure=False
294                  if not self.raise_on_failure:
295                      streams.append(ByteStream(data=b"", meta={"content_type": "Unknown", "url": urls[i]}))
296                  continue
297  
298              # Process successful results
299              # At this point, result is not an exception, so we need to cast it to the correct type for mypy
300              if not isinstance(result, Exception):  # Runtime check
301                  # Use cast to tell mypy that result is the tuple type returned by _fetch_async
302                  result_tuple = cast(tuple[dict[str, str] | None, ByteStream | None], result)
303                  stream_metadata, stream = result_tuple
304                  if stream_metadata is not None and stream is not None:
305                      stream.meta.update(stream_metadata)
306                      stream = replace(stream, mime_type=stream.meta.get("content_type", None))
307                      streams.append(stream)
308  
309          return {"streams": streams}
310  
311      def _fetch(self, url: str) -> tuple[dict[str, str], ByteStream]:
312          """
313          Fetches content from a URL and returns it as a ByteStream.
314  
315          :param url: The URL to fetch content from.
316          :returns: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
317               ByteStream metadata contains the URL and the content type of the fetched content.
318               The content type is a string indicating the type of content fetched (for example, "text/html",
319               "application/pdf"). The ByteStream object contains the fetched content as binary data.
320  
321          :raises: If an error occurs during content retrieval and `raise_on_failure` is set to True, this method will
322          raise an exception. Otherwise, all fetching errors are logged, and an empty ByteStream is returned.
323  
324          """
325          content_type: str = "text/html"
326          stream: ByteStream = ByteStream(data=b"")
327          try:
328              response = self._get_response(url)
329              content_type = self._get_content_type(response)
330              handler: Callable = self._resolve_handler(content_type)
331              stream = handler(response)
332          except Exception as e:
333              if self.raise_on_failure:
334                  raise e
335              # less verbose log as this is expected to happen often (requests failing, blocked, etc.)
336              logger.debug("Couldn't retrieve content from {url} because {error}", url=url, error=str(e))
337  
338          finally:
339              self.current_user_agent_idx = 0
340  
341          return {"content_type": content_type, "url": url}, stream
342  
343      async def _fetch_async(
344          self, url: str, client: httpx.AsyncClient
345      ) -> tuple[dict[str, str] | None, ByteStream | None]:
346          """
347          Asynchronously fetches content from a URL and returns it as a ByteStream.
348  
349          :param url: The URL to fetch content from.
350          :param client: The async httpx client to use for making requests.
351          :returns: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
352          """
353          content_type: str = "text/html"
354          stream: ByteStream | None = None
355          metadata: dict[str, str] | None = None
356  
357          try:
358              response = await self._get_response_async(url, client)
359              content_type = self._get_content_type(response)
360              handler: Callable = self._resolve_handler(content_type)
361              stream = handler(response)
362              metadata = {"content_type": content_type, "url": url}
363          except Exception as e:
364              if self.raise_on_failure:
365                  raise e
366              logger.debug("Couldn't retrieve content from {url} because {error}", url=url, error=str(e))
367              # Create an empty ByteStream for failed requests when raise_on_failure is False
368              stream = ByteStream(data=b"")
369              metadata = {"content_type": content_type, "url": url}
370          finally:
371              self.current_user_agent_idx = 0
372  
373          return metadata, stream
374  
375      def _fetch_with_exception_suppression(self, url: str) -> tuple[dict[str, str] | None, ByteStream | None]:
376          """
377          Fetches content from a URL and returns it as a ByteStream.
378  
379          If `raise_on_failure` is set to True, this method will wrap the fetch() method and catch any exceptions.
380          Otherwise, it will simply call the fetch() method.
381          :param url: The URL to fetch content from.
382          :returns: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
383  
384          """
385          if self.raise_on_failure:
386              try:
387                  return self._fetch(url)
388              except Exception as e:
389                  logger.warning("Error fetching {url}: {error}", url=url, error=str(e))
390                  return {"content_type": "Unknown", "url": url}, None
391          else:
392              return self._fetch(url)
393  
394      async def _get_response_async(self, url: str, client: httpx.AsyncClient) -> httpx.Response:
395          """
396          Asynchronously gets a response from a URL with retry logic.
397  
398          :param url: The URL to fetch.
399          :param client: The async httpx client to use for making requests.
400          :returns: The httpx Response object.
401          """
402          attempt = 0
403          last_exception = None
404  
405          while attempt <= self.retry_attempts:
406              try:
407                  response = await client.get(url, headers=self._get_headers())
408                  response.raise_for_status()
409                  return response
410              except (httpx.HTTPStatusError, httpx.RequestError) as e:
411                  last_exception = e
412                  attempt += 1
413                  if attempt <= self.retry_attempts:
414                      self._switch_user_agent(None)  # Switch user agent for next retry
415                      # Wait before retry using exponential backoff
416                      await asyncio.sleep(min(2 * 2 ** (attempt - 1), 10))
417                  else:
418                      break
419  
420          # If we've exhausted all retries, raise the last exception
421          if last_exception:
422              raise last_exception
423  
424          # This should never happen, but just in case
425          raise httpx.RequestError("Failed to get response after retries", request=None)
426  
427      def _get_content_type(self, response: httpx.Response) -> str:
428          """
429          Get the content type of the response.
430  
431          :param response: The response object.
432          :returns: The content type of the response.
433          """
434          content_type = response.headers.get("Content-Type", "")
435          return content_type.split(";")[0]
436  
437      def _resolve_handler(self, content_type: str) -> Callable[[httpx.Response], ByteStream]:
438          """
439          Resolves the handler for the given content type.
440  
441          First, it tries to find a direct match for the content type in the handlers dictionary.
442          If no direct match is found, it tries to find a pattern match using the fnmatch function.
443          If no pattern match is found, it returns the default handler for text/plain.
444  
445          :param content_type: The content type to resolve the handler for.
446          :returns: The handler for the given content type, if found. Otherwise, the default handler for text/plain.
447          """
448          # direct match
449          if content_type in self.handlers:
450              return self.handlers[content_type]
451  
452          # pattern matches
453          for pattern, handler in self.handlers.items():
454              if fnmatch(content_type, pattern):
455                  return handler
456  
457          # default handler
458          return self.handlers["text/plain"]
459  
460      def _switch_user_agent(self, retry_state: RetryCallState | None = None) -> None:  # noqa: ARG002
461          """
462          Switches the User-Agent for this LinkContentRetriever to the next one in the list of user agents.
463  
464          Used by tenacity to retry the requests with a different user agent.
465  
466          :param retry_state: The retry state (unused, required by tenacity).
467          """
468          self.current_user_agent_idx = (self.current_user_agent_idx + 1) % len(self.user_agents)
469          logger.debug("Switched user agent to {user_agent}", user_agent=self.user_agents[self.current_user_agent_idx])