client.py
1 from __future__ import annotations 2 import asyncio 3 import xml.etree.ElementTree as ET 4 from typing import Union, List, Optional, Set, Callable, Awaitable, Any, Tuple 5 from datetime import datetime 6 from urllib.parse import quote_plus 7 from time import monotonic 8 from ..base.http import AsyncHttpClient 9 from ..base.utils import read_metadata 10 from ..base.decorators import dual_mode 11 from .post import Rule34Post 12 13 14 def _parse_posts(xml_text: str) -> List[Rule34Post]: 15 """ 16 Parse XML API response into a list of Rule34Post objects. 17 Returns an empty list on parse errors. 18 """ 19 try: 20 root = ET.fromstring(xml_text) 21 except Exception: 22 return [] 23 return [Rule34Post(**el.attrib) for el in root.findall("post")] 24 25 26 async def _retry(coro_factory: Callable[[], Awaitable[Any]], retries: int = 3, base_delay: float = 0.4) -> Any: 27 """ 28 Retry an async operation with exponential backoff. 29 """ 30 exc = None 31 for i in range(retries): 32 try: 33 return await coro_factory() 34 except Exception as e: 35 exc = e 36 if i + 1 == retries: 37 break 38 await asyncio.sleep(base_delay * (2 ** i)) 39 raise exc 40 41 42 async def _enrich_meta(posts: List[Rule34Post], http: AsyncHttpClient, max_concurrency: int = 12, max_bytes: int = 1_572_864, debug: bool = False) -> Tuple[int, int]: 43 """ 44 Fetch and fill media metadata for a list of posts with bounded concurrency and byte budget. 45 Returns a tuple of (success_count, fail_count). 46 """ 47 sem = asyncio.Semaphore(max_concurrency) 48 ok = 0 49 fail = 0 50 async def one(p: Rule34Post): 51 nonlocal ok, fail 52 async with sem: 53 try: 54 meta = await _retry(lambda: read_metadata(http, p.file_url, max_bytes=max_bytes)) 55 except Exception: 56 meta = None 57 if not meta: 58 fail += 1 59 return 60 p.duration = meta.get("duration") 61 p.bitrate = meta.get("bitrate") 62 p.sample_rate = meta.get("sample_rate") 63 p.channels = meta.get("channels") 64 p.bits_per_sample = meta.get("bits_per_sample") 65 p.codec = meta.get("codec") 66 p.meta_tags = meta.get("tags", {}) 67 ok += 1 68 if debug: 69 print(f"[DEBUG] meta.start total={len(posts)} conc={max_concurrency} budget={max_bytes}B") 70 t0 = monotonic() 71 await asyncio.gather(*(one(p) for p in posts)) 72 dt = monotonic() - t0 73 if debug: 74 print(f"[DEBUG] meta.done ok={ok} fail={fail} took={dt:.2f}s") 75 return ok, fail 76 77 78 def _valid( 79 post: Rule34Post, 80 blocked: Set[int], 81 min_date: Optional[datetime], 82 max_date: Optional[datetime], 83 min_duration: Optional[float], 84 max_duration: Optional[float], 85 ) -> bool: 86 """ 87 Validate a post against filtering rules. 88 """ 89 if post.id in blocked: 90 return False 91 if min_date and isinstance(post.created_at, datetime) and post.created_at < min_date: 92 return False 93 if max_date and isinstance(post.created_at, datetime) and post.created_at > max_date: 94 return False 95 if min_duration is not None and post.duration is not None and post.duration < min_duration: 96 return False 97 if max_duration is not None and post.duration is not None and post.duration > max_duration: 98 return False 99 return True 100 101 102 @dual_mode 103 async def search( 104 tags: Union[str, List[str]], 105 limit: int = 10, 106 blocked_ids: Optional[List[int]] = None, 107 min_date: Optional[datetime] = None, 108 max_date: Optional[datetime] = None, 109 min_duration: Optional[float] = None, 110 max_duration: Optional[float] = None, 111 debug: bool = False 112 ) -> List[Rule34Post]: 113 """ 114 Search posts from rule34.xxx API with adaptive paging and optional metadata filtering. 115 If duration constraints are provided, metadata probing is performed with bounded concurrency and byte budget. 116 Set debug=True for verbose progress output with per-wave statistics and timings. 117 """ 118 if isinstance(tags, list): 119 tags = " ".join(tags) 120 q = quote_plus(tags.strip()) 121 blocked: Set[int] = set(blocked_ids or []) 122 collected: List[Rule34Post] = [] 123 seen: Set[int] = set() 124 page_size = 100 125 wave_pages = 4 126 pid = 0 127 need_meta = min_duration is not None or max_duration is not None 128 async with AsyncHttpClient() as http: 129 async def fetch_pid(p: int) -> List[Rule34Post]: 130 url = f"https://rule34.xxx/index.php?page=dapi&s=post&q=index&tags={q}&limit={page_size}&pid={p}" 131 text = await _retry(lambda: http.read_text(url)) 132 return _parse_posts(text) 133 while len(collected) < limit: 134 t0 = monotonic() 135 if debug: 136 print(f"[DEBUG] wave.fetch start_pid={pid} pages={wave_pages}") 137 groups = await asyncio.gather(*(fetch_pid(pid + i) for i in range(wave_pages))) 138 fetched = sum(len(g) for g in groups) 139 new_batch = [] 140 dup = 0 141 for g in groups: 142 for p in g: 143 if p.id in seen: 144 dup += 1 145 continue 146 seen.add(p.id) 147 new_batch.append(p) 148 if debug: 149 dt = monotonic() - t0 150 print(f"[DEBUG] wave.fetched posts_total={fetched} new={len(new_batch)} dup={dup} took={dt:.2f}s") 151 if not new_batch: 152 if debug: 153 print("[DEBUG] wave.empty no-more-posts") 154 break 155 if need_meta: 156 ok, fail = await _enrich_meta(new_batch, http, max_concurrency=12, max_bytes=1_572_864, debug=debug) 157 if debug: 158 print(f"[DEBUG] wave.meta ok={ok} fail={fail}") 159 added = 0 160 for p in new_batch: 161 if len(collected) >= limit: 162 break 163 if _valid(p, blocked, min_date, max_date, min_duration, max_duration): 164 collected.append(p) 165 added += 1 166 if debug: 167 print(f"[DEBUG] wave.collect added={added} total={len(collected)}/{limit}") 168 if len(collected) < limit: 169 pid += wave_pages 170 if len(new_batch) < page_size and wave_pages > 1: 171 wave_pages = 2 172 elif len(collected) == 0 and wave_pages < 8: 173 wave_pages = 8 174 else: 175 break 176 if debug: 177 print(f"[DEBUG] done return={len(collected)}") 178 return collected[:limit]