link_content.py
1 # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai> 2 # 3 # SPDX-License-Identifier: Apache-2.0 4 5 import asyncio 6 from collections import defaultdict 7 from collections.abc import Callable 8 from concurrent.futures import ThreadPoolExecutor 9 from dataclasses import replace 10 from fnmatch import fnmatch 11 from typing import Any, cast 12 13 import httpx 14 from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential 15 16 from haystack import component, logging 17 from haystack.dataclasses import ByteStream 18 from haystack.lazy_imports import LazyImport 19 from haystack.version import __version__ 20 21 # HTTP/2 support via lazy import 22 with LazyImport("Run 'pip install httpx[http2]' to use HTTP/2 support") as h2_import: 23 pass # nothing to import as we simply set the http2 attribute, library handles the rest 24 25 logger = logging.getLogger(__name__) 26 27 DEFAULT_USER_AGENT = f"haystack/LinkContentFetcher/{__version__}" 28 29 REQUEST_HEADERS = { 30 "accept": "*/*", 31 "User-Agent": DEFAULT_USER_AGENT, 32 "Accept-Language": "en-US,en;q=0.9,it;q=0.8,es;q=0.7", 33 "referer": "https://www.google.com/", 34 } 35 36 37 def _merge_headers(*args: dict[str, str]) -> dict[str, str]: 38 """ 39 Merge a list of dict using case-insensitively 40 41 :param args: a list of dict to merge 42 :returns: The merged dict 43 """ 44 merged = {} 45 keymap = {} 46 47 for d in args: 48 for k, v in d.items(): 49 kl = k.lower() 50 keymap[kl] = k 51 merged[kl] = v 52 53 return {keymap[kl]: v for kl, v in merged.items()} 54 55 56 def _text_content_handler(response: httpx.Response) -> ByteStream: 57 """ 58 Handles text content. 59 60 :param response: Response object from the request. 61 :returns: The extracted text. 62 """ 63 return ByteStream.from_string(response.text) 64 65 66 def _binary_content_handler(response: httpx.Response) -> ByteStream: 67 """ 68 Handles binary content. 69 70 :param response: Response object from the request. 71 :returns: The extracted binary file-like object. 72 """ 73 return ByteStream(data=response.content) 74 75 76 @component 77 class LinkContentFetcher: 78 """ 79 Fetches and extracts content from URLs. 80 81 It supports various content types, retries on failures, and automatic user-agent rotation for failed web 82 requests. Use it as the data-fetching step in your pipelines. 83 84 You may need to convert LinkContentFetcher's output into a list of documents. Use HTMLToDocument 85 converter to do this. 86 87 ### Usage example 88 89 ```python 90 from haystack.components.fetchers.link_content import LinkContentFetcher 91 92 fetcher = LinkContentFetcher() 93 streams = fetcher.run(urls=["https://www.google.com"])["streams"] 94 95 assert len(streams) == 1 96 assert streams[0].meta == {'content_type': 'text/html', 'url': 'https://www.google.com'} 97 assert streams[0].data 98 ``` 99 100 For async usage: 101 102 ```python 103 import asyncio 104 from haystack.components.fetchers import LinkContentFetcher 105 106 async def fetch_async(): 107 fetcher = LinkContentFetcher() 108 result = await fetcher.run_async(urls=["https://www.google.com"]) 109 return result["streams"] 110 111 streams = asyncio.run(fetch_async()) 112 ``` 113 """ 114 115 def __init__( 116 self, 117 raise_on_failure: bool = True, 118 user_agents: list[str] | None = None, 119 retry_attempts: int = 2, 120 timeout: int = 3, 121 http2: bool = False, 122 client_kwargs: dict | None = None, 123 request_headers: dict[str, str] | None = None, 124 ) -> None: 125 """ 126 Initializes the component. 127 128 :param raise_on_failure: If `True`, raises an exception if it fails to fetch a single URL. 129 For multiple URLs, it logs errors and returns the content it successfully fetched. 130 :param user_agents: [User agents](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent) 131 for fetching content. If `None`, a default user agent is used. 132 :param retry_attempts: The number of times to retry to fetch the URL's content. 133 :param timeout: Timeout in seconds for the request. 134 :param http2: Whether to enable HTTP/2 support for requests. Defaults to False. 135 Requires the 'h2' package to be installed (via `pip install httpx[http2]`). 136 :param client_kwargs: Additional keyword arguments to pass to the httpx client. 137 If `None`, default values are used. 138 """ 139 self.raise_on_failure = raise_on_failure 140 self.user_agents = user_agents or [DEFAULT_USER_AGENT] 141 self.current_user_agent_idx: int = 0 142 self.retry_attempts = retry_attempts 143 self.timeout = timeout 144 self.http2 = http2 145 self.client_kwargs = client_kwargs or {} 146 self.request_headers = request_headers or {} 147 148 # Configure default client settings 149 self.client_kwargs.setdefault("timeout", timeout) 150 self.client_kwargs.setdefault("follow_redirects", True) 151 152 # Create httpx clients 153 client_kwargs = {**self.client_kwargs} 154 155 # Optional HTTP/2 support 156 if http2: 157 try: 158 h2_import.check() 159 client_kwargs["http2"] = True 160 except ImportError: 161 logger.warning( 162 "HTTP/2 support requested but 'h2' package is not installed. " 163 "Falling back to HTTP/1.1. Install with `pip install httpx[http2]` to enable HTTP/2 support." 164 ) 165 self.http2 = False # Update the setting to match actual capability 166 167 # Initialize synchronous client 168 self._client = httpx.Client(**client_kwargs) 169 170 # Initialize asynchronous client 171 self._async_client = httpx.AsyncClient(**client_kwargs) 172 173 # register default content handlers that extract data from the response 174 self.handlers: dict[str, Callable[[httpx.Response], ByteStream]] = defaultdict(lambda: _text_content_handler) 175 self.handlers["text/*"] = _text_content_handler 176 self.handlers["text/html"] = _binary_content_handler 177 self.handlers["application/json"] = _text_content_handler 178 self.handlers["application/*"] = _binary_content_handler 179 self.handlers["image/*"] = _binary_content_handler 180 self.handlers["audio/*"] = _binary_content_handler 181 self.handlers["video/*"] = _binary_content_handler 182 183 @retry( 184 reraise=True, 185 stop=stop_after_attempt(self.retry_attempts), 186 wait=wait_exponential(multiplier=1, min=2, max=10), 187 retry=(retry_if_exception_type((httpx.HTTPStatusError, httpx.RequestError))), 188 # This method is invoked only after failed requests (exception raised) 189 after=self._switch_user_agent, 190 ) 191 def get_response(url: str) -> httpx.Response: 192 response = self._client.get(url, headers=self._get_headers()) 193 response.raise_for_status() 194 return response 195 196 self._get_response: Callable = get_response 197 198 def _get_headers(self) -> dict[str, str]: 199 """ 200 Build headers with precedence 201 202 client defaults -> component defaults -> user-provided -> rotating UA 203 """ 204 base = dict(self._client.headers) 205 return _merge_headers( 206 base, REQUEST_HEADERS, self.request_headers, {"User-Agent": self.user_agents[self.current_user_agent_idx]} 207 ) 208 209 def __del__(self) -> None: 210 """ 211 Clean up resources when the component is deleted. 212 213 Closes both the synchronous and asynchronous HTTP clients to prevent 214 resource leaks. 215 """ 216 try: 217 # Close the synchronous client if it exists 218 if hasattr(self, "_client"): 219 self._client.close() 220 221 # There is no way to close the async client without await 222 except Exception: 223 # Suppress any exceptions during cleanup 224 pass 225 226 @component.output_types(streams=list[ByteStream]) 227 def run(self, urls: list[str]) -> dict[str, Any]: 228 """ 229 Fetches content from a list of URLs and returns a list of extracted content streams. 230 231 Each content stream is a `ByteStream` object containing the extracted content as binary data. 232 Each ByteStream object in the returned list corresponds to the contents of a single URL. 233 The content type of each stream is stored in the metadata of the ByteStream object under 234 the key "content_type". The URL of the fetched content is stored under the key "url". 235 236 :param urls: A list of URLs to fetch content from. 237 :returns: `ByteStream` objects representing the extracted content. 238 239 :raises Exception: If the provided list of URLs contains only a single URL, and `raise_on_failure` is set to 240 `True`, an exception will be raised in case of an error during content retrieval. 241 In all other scenarios, any retrieval errors are logged, and a list of successfully retrieved `ByteStream` 242 objects is returned. 243 """ 244 streams: list[ByteStream] = [] 245 if not urls: 246 return {"streams": streams} 247 248 # don't use multithreading if there's only one URL 249 if len(urls) == 1: 250 stream_metadata, stream = self._fetch(urls[0]) 251 stream.meta.update(stream_metadata) 252 stream = replace(stream, mime_type=stream.meta.get("content_type", None)) 253 streams.append(stream) 254 else: 255 with ThreadPoolExecutor() as executor: 256 results = executor.map(self._fetch_with_exception_suppression, urls) 257 258 for stream_metadata, stream in results: # type: ignore 259 if stream_metadata is not None and stream is not None: 260 stream.meta.update(stream_metadata) 261 stream = replace(stream, mime_type=stream.meta.get("content_type", None)) 262 streams.append(stream) 263 264 return {"streams": streams} 265 266 @component.output_types(streams=list[ByteStream]) 267 async def run_async(self, urls: list[str]) -> dict[str, Any]: 268 """ 269 Asynchronously fetches content from a list of URLs and returns a list of extracted content streams. 270 271 This is the asynchronous version of the `run` method with the same parameters and return values. 272 273 :param urls: A list of URLs to fetch content from. 274 :returns: `ByteStream` objects representing the extracted content. 275 """ 276 streams: list[ByteStream] = [] 277 if not urls: 278 return {"streams": streams} 279 280 # Create tasks for all URLs using _fetch_async directly 281 tasks = [self._fetch_async(url, self._async_client) for url in urls] 282 283 # Only capture exceptions when we have multiple URLs or raise_on_failure=False 284 # This ensures errors propagate appropriately for single URLs with raise_on_failure=True 285 return_exceptions = not (len(urls) == 1 and self.raise_on_failure) 286 results = await asyncio.gather(*tasks, return_exceptions=return_exceptions) 287 288 # Process results 289 for i, result in enumerate(results): 290 # Handle exception results (only happens when return_exceptions=True) 291 if isinstance(result, Exception): 292 logger.warning("Error fetching {url}: {error}", url=urls[i], error=str(result)) 293 # Add an empty result for failed URLs when raise_on_failure=False 294 if not self.raise_on_failure: 295 streams.append(ByteStream(data=b"", meta={"content_type": "Unknown", "url": urls[i]})) 296 continue 297 298 # Process successful results 299 # At this point, result is not an exception, so we need to cast it to the correct type for mypy 300 if not isinstance(result, Exception): # Runtime check 301 # Use cast to tell mypy that result is the tuple type returned by _fetch_async 302 result_tuple = cast(tuple[dict[str, str] | None, ByteStream | None], result) 303 stream_metadata, stream = result_tuple 304 if stream_metadata is not None and stream is not None: 305 stream.meta.update(stream_metadata) 306 stream = replace(stream, mime_type=stream.meta.get("content_type", None)) 307 streams.append(stream) 308 309 return {"streams": streams} 310 311 def _fetch(self, url: str) -> tuple[dict[str, str], ByteStream]: 312 """ 313 Fetches content from a URL and returns it as a ByteStream. 314 315 :param url: The URL to fetch content from. 316 :returns: A tuple containing the ByteStream metadata dict and the corresponding ByteStream. 317 ByteStream metadata contains the URL and the content type of the fetched content. 318 The content type is a string indicating the type of content fetched (for example, "text/html", 319 "application/pdf"). The ByteStream object contains the fetched content as binary data. 320 321 :raises: If an error occurs during content retrieval and `raise_on_failure` is set to True, this method will 322 raise an exception. Otherwise, all fetching errors are logged, and an empty ByteStream is returned. 323 324 """ 325 content_type: str = "text/html" 326 stream: ByteStream = ByteStream(data=b"") 327 try: 328 response = self._get_response(url) 329 content_type = self._get_content_type(response) 330 handler: Callable = self._resolve_handler(content_type) 331 stream = handler(response) 332 except Exception as e: 333 if self.raise_on_failure: 334 raise e 335 # less verbose log as this is expected to happen often (requests failing, blocked, etc.) 336 logger.debug("Couldn't retrieve content from {url} because {error}", url=url, error=str(e)) 337 338 finally: 339 self.current_user_agent_idx = 0 340 341 return {"content_type": content_type, "url": url}, stream 342 343 async def _fetch_async( 344 self, url: str, client: httpx.AsyncClient 345 ) -> tuple[dict[str, str] | None, ByteStream | None]: 346 """ 347 Asynchronously fetches content from a URL and returns it as a ByteStream. 348 349 :param url: The URL to fetch content from. 350 :param client: The async httpx client to use for making requests. 351 :returns: A tuple containing the ByteStream metadata dict and the corresponding ByteStream. 352 """ 353 content_type: str = "text/html" 354 stream: ByteStream | None = None 355 metadata: dict[str, str] | None = None 356 357 try: 358 response = await self._get_response_async(url, client) 359 content_type = self._get_content_type(response) 360 handler: Callable = self._resolve_handler(content_type) 361 stream = handler(response) 362 metadata = {"content_type": content_type, "url": url} 363 except Exception as e: 364 if self.raise_on_failure: 365 raise e 366 logger.debug("Couldn't retrieve content from {url} because {error}", url=url, error=str(e)) 367 # Create an empty ByteStream for failed requests when raise_on_failure is False 368 stream = ByteStream(data=b"") 369 metadata = {"content_type": content_type, "url": url} 370 finally: 371 self.current_user_agent_idx = 0 372 373 return metadata, stream 374 375 def _fetch_with_exception_suppression(self, url: str) -> tuple[dict[str, str] | None, ByteStream | None]: 376 """ 377 Fetches content from a URL and returns it as a ByteStream. 378 379 If `raise_on_failure` is set to True, this method will wrap the fetch() method and catch any exceptions. 380 Otherwise, it will simply call the fetch() method. 381 :param url: The URL to fetch content from. 382 :returns: A tuple containing the ByteStream metadata dict and the corresponding ByteStream. 383 384 """ 385 if self.raise_on_failure: 386 try: 387 return self._fetch(url) 388 except Exception as e: 389 logger.warning("Error fetching {url}: {error}", url=url, error=str(e)) 390 return {"content_type": "Unknown", "url": url}, None 391 else: 392 return self._fetch(url) 393 394 async def _get_response_async(self, url: str, client: httpx.AsyncClient) -> httpx.Response: 395 """ 396 Asynchronously gets a response from a URL with retry logic. 397 398 :param url: The URL to fetch. 399 :param client: The async httpx client to use for making requests. 400 :returns: The httpx Response object. 401 """ 402 attempt = 0 403 last_exception = None 404 405 while attempt <= self.retry_attempts: 406 try: 407 response = await client.get(url, headers=self._get_headers()) 408 response.raise_for_status() 409 return response 410 except (httpx.HTTPStatusError, httpx.RequestError) as e: 411 last_exception = e 412 attempt += 1 413 if attempt <= self.retry_attempts: 414 self._switch_user_agent(None) # Switch user agent for next retry 415 # Wait before retry using exponential backoff 416 await asyncio.sleep(min(2 * 2 ** (attempt - 1), 10)) 417 else: 418 break 419 420 # If we've exhausted all retries, raise the last exception 421 if last_exception: 422 raise last_exception 423 424 # This should never happen, but just in case 425 raise httpx.RequestError("Failed to get response after retries", request=None) 426 427 def _get_content_type(self, response: httpx.Response) -> str: 428 """ 429 Get the content type of the response. 430 431 :param response: The response object. 432 :returns: The content type of the response. 433 """ 434 content_type = response.headers.get("Content-Type", "") 435 return content_type.split(";")[0] 436 437 def _resolve_handler(self, content_type: str) -> Callable[[httpx.Response], ByteStream]: 438 """ 439 Resolves the handler for the given content type. 440 441 First, it tries to find a direct match for the content type in the handlers dictionary. 442 If no direct match is found, it tries to find a pattern match using the fnmatch function. 443 If no pattern match is found, it returns the default handler for text/plain. 444 445 :param content_type: The content type to resolve the handler for. 446 :returns: The handler for the given content type, if found. Otherwise, the default handler for text/plain. 447 """ 448 # direct match 449 if content_type in self.handlers: 450 return self.handlers[content_type] 451 452 # pattern matches 453 for pattern, handler in self.handlers.items(): 454 if fnmatch(content_type, pattern): 455 return handler 456 457 # default handler 458 return self.handlers["text/plain"] 459 460 def _switch_user_agent(self, retry_state: RetryCallState | None = None) -> None: # noqa: ARG002 461 """ 462 Switches the User-Agent for this LinkContentRetriever to the next one in the list of user agents. 463 464 Used by tenacity to retry the requests with a different user agent. 465 466 :param retry_state: The retry state (unused, required by tenacity). 467 """ 468 self.current_user_agent_idx = (self.current_user_agent_idx + 1) % len(self.user_agents) 469 logger.debug("Switched user agent to {user_agent}", user_agent=self.user_agents[self.current_user_agent_idx])