/ PornAPI / Rule34 / client.py
client.py
  1  from __future__ import annotations
  2  import asyncio
  3  import xml.etree.ElementTree as ET
  4  from typing import Union, List, Optional, Set, Callable, Awaitable, Any, Tuple
  5  from datetime import datetime
  6  from urllib.parse import quote_plus
  7  from time import monotonic
  8  from ..base.http import AsyncHttpClient
  9  from ..base.utils import read_metadata
 10  from ..base.decorators import dual_mode
 11  from .post import Rule34Post
 12  
 13  
 14  def _parse_posts(xml_text: str) -> List[Rule34Post]:
 15      """
 16      Parse XML API response into a list of Rule34Post objects.
 17      Returns an empty list on parse errors.
 18      """
 19      try:
 20          root = ET.fromstring(xml_text)
 21      except Exception:
 22          return []
 23      return [Rule34Post(**el.attrib) for el in root.findall("post")]
 24  
 25  
 26  async def _retry(coro_factory: Callable[[], Awaitable[Any]], retries: int = 3, base_delay: float = 0.4) -> Any:
 27      """
 28      Retry an async operation with exponential backoff.
 29      """
 30      exc = None
 31      for i in range(retries):
 32          try:
 33              return await coro_factory()
 34          except Exception as e:
 35              exc = e
 36              if i + 1 == retries:
 37                  break
 38              await asyncio.sleep(base_delay * (2 ** i))
 39      raise exc
 40  
 41  
 42  async def _enrich_meta(posts: List[Rule34Post], http: AsyncHttpClient, max_concurrency: int = 12, max_bytes: int = 1_572_864, debug: bool = False) -> Tuple[int, int]:
 43      """
 44      Fetch and fill media metadata for a list of posts with bounded concurrency and byte budget.
 45      Returns a tuple of (success_count, fail_count).
 46      """
 47      sem = asyncio.Semaphore(max_concurrency)
 48      ok = 0
 49      fail = 0
 50      async def one(p: Rule34Post):
 51          nonlocal ok, fail
 52          async with sem:
 53              try:
 54                  meta = await _retry(lambda: read_metadata(http, p.file_url, max_bytes=max_bytes))
 55              except Exception:
 56                  meta = None
 57              if not meta:
 58                  fail += 1
 59                  return
 60              p.duration = meta.get("duration")
 61              p.bitrate = meta.get("bitrate")
 62              p.sample_rate = meta.get("sample_rate")
 63              p.channels = meta.get("channels")
 64              p.bits_per_sample = meta.get("bits_per_sample")
 65              p.codec = meta.get("codec")
 66              p.meta_tags = meta.get("tags", {})
 67              ok += 1
 68      if debug:
 69          print(f"[DEBUG] meta.start total={len(posts)} conc={max_concurrency} budget={max_bytes}B")
 70      t0 = monotonic()
 71      await asyncio.gather(*(one(p) for p in posts))
 72      dt = monotonic() - t0
 73      if debug:
 74          print(f"[DEBUG] meta.done ok={ok} fail={fail} took={dt:.2f}s")
 75      return ok, fail
 76  
 77  
 78  def _valid(
 79      post: Rule34Post,
 80      blocked: Set[int],
 81      min_date: Optional[datetime],
 82      max_date: Optional[datetime],
 83      min_duration: Optional[float],
 84      max_duration: Optional[float],
 85  ) -> bool:
 86      """
 87      Validate a post against filtering rules.
 88      """
 89      if post.id in blocked:
 90          return False
 91      if min_date and isinstance(post.created_at, datetime) and post.created_at < min_date:
 92          return False
 93      if max_date and isinstance(post.created_at, datetime) and post.created_at > max_date:
 94          return False
 95      if min_duration is not None and post.duration is not None and post.duration < min_duration:
 96          return False
 97      if max_duration is not None and post.duration is not None and post.duration > max_duration:
 98          return False
 99      return True
100  
101  
102  @dual_mode
103  async def search(
104      tags: Union[str, List[str]],
105      limit: int = 10,
106      blocked_ids: Optional[List[int]] = None,
107      min_date: Optional[datetime] = None,
108      max_date: Optional[datetime] = None,
109      min_duration: Optional[float] = None,
110      max_duration: Optional[float] = None,
111      debug: bool = False
112  ) -> List[Rule34Post]:
113      """
114      Search posts from rule34.xxx API with adaptive paging and optional metadata filtering.
115      If duration constraints are provided, metadata probing is performed with bounded concurrency and byte budget.
116      Set debug=True for verbose progress output with per-wave statistics and timings.
117      """
118      if isinstance(tags, list):
119          tags = " ".join(tags)
120      q = quote_plus(tags.strip())
121      blocked: Set[int] = set(blocked_ids or [])
122      collected: List[Rule34Post] = []
123      seen: Set[int] = set()
124      page_size = 100
125      wave_pages = 4
126      pid = 0
127      need_meta = min_duration is not None or max_duration is not None
128      async with AsyncHttpClient() as http:
129          async def fetch_pid(p: int) -> List[Rule34Post]:
130              url = f"https://rule34.xxx/index.php?page=dapi&s=post&q=index&tags={q}&limit={page_size}&pid={p}"
131              text = await _retry(lambda: http.read_text(url))
132              return _parse_posts(text)
133          while len(collected) < limit:
134              t0 = monotonic()
135              if debug:
136                  print(f"[DEBUG] wave.fetch start_pid={pid} pages={wave_pages}")
137              groups = await asyncio.gather(*(fetch_pid(pid + i) for i in range(wave_pages)))
138              fetched = sum(len(g) for g in groups)
139              new_batch = []
140              dup = 0
141              for g in groups:
142                  for p in g:
143                      if p.id in seen:
144                          dup += 1
145                          continue
146                      seen.add(p.id)
147                      new_batch.append(p)
148              if debug:
149                  dt = monotonic() - t0
150                  print(f"[DEBUG] wave.fetched posts_total={fetched} new={len(new_batch)} dup={dup} took={dt:.2f}s")
151              if not new_batch:
152                  if debug:
153                      print("[DEBUG] wave.empty no-more-posts")
154                  break
155              if need_meta:
156                  ok, fail = await _enrich_meta(new_batch, http, max_concurrency=12, max_bytes=1_572_864, debug=debug)
157                  if debug:
158                      print(f"[DEBUG] wave.meta ok={ok} fail={fail}")
159              added = 0
160              for p in new_batch:
161                  if len(collected) >= limit:
162                      break
163                  if _valid(p, blocked, min_date, max_date, min_duration, max_duration):
164                      collected.append(p)
165                      added += 1
166              if debug:
167                  print(f"[DEBUG] wave.collect added={added} total={len(collected)}/{limit}")
168              if len(collected) < limit:
169                  pid += wave_pages
170                  if len(new_batch) < page_size and wave_pages > 1:
171                      wave_pages = 2
172                  elif len(collected) == 0 and wave_pages < 8:
173                      wave_pages = 8
174              else:
175                  break
176      if debug:
177          print(f"[DEBUG] done return={len(collected)}")
178      return collected[:limit]