scraper.py
1 #!/usr/bin/env python3 2 """ 3 News Scraper for Hong Kong Fire Documentary 4 Extracts URLs from markdown files, deduplicates, and archives HTML content. 5 Now with PARALLEL scraping across different domains! 6 """ 7 8 import argparse 9 import asyncio 10 import json 11 import os 12 import re 13 import sys 14 import time 15 import random 16 from datetime import datetime 17 from pathlib import Path 18 from urllib.parse import urlparse 19 from collections import defaultdict 20 import unicodedata 21 22 import yaml 23 from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeout 24 25 26 # Project paths 27 SCRIPT_DIR = Path(__file__).parent.resolve() 28 PROJECT_ROOT = SCRIPT_DIR.parent.parent 29 NEWS_DIR = PROJECT_ROOT / "content" / "news" 30 CONFIG_FILE = SCRIPT_DIR / "config.yml" 31 REGISTRY_FILE = SCRIPT_DIR / "scraped_urls.json" 32 33 # Concurrency settings 34 MAX_CONCURRENT_DOMAINS = 5 # Scrape up to 5 different domains at once 35 MAX_CONCURRENT_PER_DOMAIN = 1 # Only 1 request per domain at a time (be nice) 36 37 38 def log(msg: str, level: str = "INFO"): 39 """Print timestamped log message""" 40 timestamp = datetime.now().strftime("%H:%M:%S") 41 print(f"[{timestamp}] {level}: {msg}", flush=True) 42 43 44 def load_config() -> dict: 45 """Load configuration from config.yml""" 46 if CONFIG_FILE.exists(): 47 with open(CONFIG_FILE, "r", encoding="utf-8") as f: 48 return yaml.safe_load(f) 49 return { 50 "rate_limit": { 51 "delay_seconds": 3, 52 "max_retries": 3, 53 "timeout_seconds": 60, 54 }, 55 "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", 56 "sites": {}, 57 } 58 59 60 def load_registry() -> dict: 61 """Load the registry of previously scraped URLs""" 62 if REGISTRY_FILE.exists(): 63 with open(REGISTRY_FILE, "r", encoding="utf-8") as f: 64 return json.load(f) 65 return {"scraped_urls": {}, "last_updated": None} 66 67 68 def save_registry(registry: dict): 69 """Save the registry of scraped URLs""" 70 registry["last_updated"] = datetime.now().isoformat() 71 with open(REGISTRY_FILE, "w", encoding="utf-8") as f: 72 json.dump(registry, f, indent=2, ensure_ascii=False) 73 74 75 def slugify(text: str, max_length: int = 80) -> str: 76 """Convert text to a filesystem-safe slug""" 77 text = unicodedata.normalize("NFKD", text) 78 text = text.lower() 79 text = re.sub(r"[^\w\s-]", "", text) 80 text = re.sub(r"[-\s]+", "-", text) 81 text = text.strip("-") 82 if len(text) > max_length: 83 text = text[:max_length].rsplit("-", 1)[0] 84 return text or "untitled" 85 86 87 def extract_urls_from_markdown(filepath: Path) -> list[dict]: 88 """Extract URLs and titles from a markdown file.""" 89 urls = [] 90 91 with open(filepath, "r", encoding="utf-8") as f: 92 content = f.read() 93 94 # Pattern 1: Markdown links [Title](URL) 95 link_pattern = r'\[([^\]]+)\]\((https?://[^\)]+)\)' 96 for match in re.finditer(link_pattern, content): 97 title, url = match.groups() 98 if not url.endswith('.md') and not url.startswith('#'): 99 urls.append({ 100 "title": title.strip("*").strip(), 101 "url": url.strip(), 102 "source_file": str(filepath.relative_to(PROJECT_ROOT)), 103 }) 104 105 # Pattern 2: Table format | Title | URL | 106 table_pattern = r'\|\s*([^|]+?)\s*\|\s*<?(\s*https?://[^\s>|]+)\s*>?\s*\|' 107 for match in re.finditer(table_pattern, content): 108 title, url = match.groups() 109 title = title.strip() 110 url = url.strip() 111 if title.lower() not in ['標題', 'title', '連結', 'link', '---', '------']: 112 if url and not url.startswith('---'): 113 urls.append({ 114 "title": title, 115 "url": url, 116 "source_file": str(filepath.relative_to(PROJECT_ROOT)), 117 }) 118 119 # Pattern 3: List item with angle-bracket URL format: - Title (<URL>) 120 # Used by 東方日報 and similar sources 121 list_angle_pattern = r'^-\s+(.+?)\s+\(<(https?://[^>]+)>\)' 122 for match in re.finditer(list_angle_pattern, content, re.MULTILINE): 123 title, url = match.groups() 124 title = title.strip() 125 url = url.strip() 126 if title and url: 127 urls.append({ 128 "title": title, 129 "url": url, 130 "source_file": str(filepath.relative_to(PROJECT_ROOT)), 131 }) 132 133 return urls 134 135 136 def get_source_name(filepath: Path) -> str: 137 """Extract source name from filepath""" 138 parts = filepath.relative_to(NEWS_DIR).parts 139 return parts[0] if parts else "unknown" 140 141 142 def discover_news_sources() -> dict[str, Path]: 143 """Discover all news source directories with markdown files""" 144 sources = {} 145 if not NEWS_DIR.exists(): 146 return sources 147 148 for item in NEWS_DIR.iterdir(): 149 if item.is_dir(): 150 for readme in item.glob("[Rr][Ee][Aa][Dd][Mm][Ee].*[Mm][Dd]"): 151 sources[item.name] = readme 152 break 153 return sources 154 155 156 def get_all_urls(sources: dict[str, Path] = None, source_filter: str = None) -> list[dict]: 157 """Get all URLs from news markdown files.""" 158 if sources is None: 159 sources = discover_news_sources() 160 161 all_urls = [] 162 for source_name, filepath in sources.items(): 163 if source_filter and source_name.lower() != source_filter.lower(): 164 continue 165 urls = extract_urls_from_markdown(filepath) 166 for url_info in urls: 167 url_info["source"] = source_name 168 all_urls.extend(urls) 169 return all_urls 170 171 172 def filter_new_urls(urls: list[dict], registry: dict) -> list[dict]: 173 """Filter out URLs that have already been scraped""" 174 scraped = registry.get("scraped_urls", {}) 175 return [u for u in urls if u["url"] not in scraped] 176 177 178 def get_domain(url: str) -> str: 179 """Extract domain from URL""" 180 parsed = urlparse(url) 181 return parsed.netloc.replace("www.", "") 182 183 184 def group_urls_by_domain(urls: list[dict]) -> dict[str, list[dict]]: 185 """Group URLs by their domain for parallel processing""" 186 grouped = defaultdict(list) 187 for url_info in urls: 188 domain = get_domain(url_info["url"]) 189 grouped[domain].append(url_info) 190 return dict(grouped) 191 192 193 def get_site_config(url: str, config: dict) -> dict: 194 """Get site-specific configuration""" 195 domain = get_domain(url) 196 site_config = config.get("sites", {}).get(domain, {}) 197 return { 198 "delay_seconds": site_config.get("delay_seconds", config["rate_limit"]["delay_seconds"]), 199 "max_retries": site_config.get("max_retries", config["rate_limit"]["max_retries"]), 200 "timeout_seconds": site_config.get("timeout_seconds", config["rate_limit"]["timeout_seconds"]), 201 } 202 203 204 def save_archive(url_info: dict, html: str, source_dir: Path) -> Path: 205 """Save scraped content to archive directory""" 206 archive_dir = source_dir / "archive" 207 archive_dir.mkdir(exist_ok=True) 208 209 slug = slugify(url_info["title"]) 210 article_dir = archive_dir / slug 211 212 if article_dir.exists(): 213 counter = 1 214 while (archive_dir / f"{slug}-{counter}").exists(): 215 counter += 1 216 article_dir = archive_dir / f"{slug}-{counter}" 217 218 article_dir.mkdir(exist_ok=True) 219 220 # Save HTML 221 with open(article_dir / "index.html", "w", encoding="utf-8") as f: 222 f.write(html) 223 224 # Save metadata 225 metadata = { 226 "url": url_info["url"], 227 "title": url_info["title"], 228 "source": url_info["source"], 229 "source_file": url_info["source_file"], 230 "scraped_at": datetime.now().isoformat(), 231 "archive_path": str(article_dir.relative_to(PROJECT_ROOT)), 232 } 233 with open(article_dir / "metadata.json", "w", encoding="utf-8") as f: 234 json.dump(metadata, f, indent=2, ensure_ascii=False) 235 236 return article_dir 237 238 239 async def scrape_url_async(url_info: dict, context, config: dict, retries: int = 0) -> tuple[str, bool]: 240 """Scrape a single URL asynchronously""" 241 url = url_info["url"] 242 site_config = get_site_config(url, config) 243 timeout = site_config["timeout_seconds"] * 1000 244 max_retries = site_config["max_retries"] 245 246 page = await context.new_page() 247 try: 248 await page.goto(url, timeout=timeout, wait_until="domcontentloaded") 249 await page.wait_for_timeout(1500) # Wait for JS 250 html = await page.content() 251 return html, True 252 except PlaywrightTimeout: 253 if retries < max_retries: 254 log(f" ⏳ Timeout, retry {retries + 1}/{max_retries}...", "WARN") 255 await asyncio.sleep(2 ** retries) 256 return await scrape_url_async(url_info, context, config, retries + 1) 257 return "", False 258 except Exception as e: 259 if retries < max_retries: 260 log(f" ⚠️ Error: {str(e)[:50]}, retry {retries + 1}/{max_retries}...", "WARN") 261 await asyncio.sleep(2 ** retries) 262 return await scrape_url_async(url_info, context, config, retries + 1) 263 return "", False 264 finally: 265 await page.close() 266 267 268 async def scrape_domain_queue( 269 domain: str, 270 urls: list[dict], 271 browser, 272 config: dict, 273 registry: dict, 274 results: dict, 275 progress: dict 276 ): 277 """Scrape all URLs for a single domain sequentially""" 278 context = await browser.new_context( 279 user_agent=config["user_agent"], 280 viewport={"width": 1920, "height": 1080}, 281 ) 282 283 site_config = get_site_config(urls[0]["url"], config) 284 delay = site_config["delay_seconds"] 285 286 for url_info in urls: 287 url = url_info["url"] 288 title = url_info["title"][:40] 289 source = url_info["source"] 290 291 progress["current"] += 1 292 pct = (progress["current"] / progress["total"]) * 100 293 log(f"[{progress['current']}/{progress['total']}] ({pct:.0f}%) {domain}: {title}...") 294 295 html, success = await scrape_url_async(url_info, context, config) 296 297 if success and html: 298 # Find source directory 299 source_dir = NEWS_DIR / source 300 if not source_dir.exists(): 301 for d in NEWS_DIR.iterdir(): 302 if d.is_dir() and d.name.lower() == source.lower(): 303 source_dir = d 304 break 305 306 archive_path = save_archive(url_info, html, source_dir) 307 308 # Update registry 309 registry["scraped_urls"][url] = { 310 "title": url_info["title"], 311 "source": source, 312 "scraped_at": datetime.now().isoformat(), 313 "archive_path": str(archive_path.relative_to(PROJECT_ROOT)), 314 } 315 save_registry(registry) 316 317 results["success"] += 1 318 log(f" ✓ Saved ({len(html)//1024}KB)") 319 else: 320 results["failed"] += 1 321 log(f" ✗ Failed") 322 323 # Rate limit delay between requests to same domain 324 if urls.index(url_info) < len(urls) - 1: 325 await asyncio.sleep(delay + random.uniform(0, 1)) 326 327 await context.close() 328 329 330 async def run_scraper_async( 331 dry_run: bool = False, 332 source_filter: str = None, 333 limit: int = None, 334 verbose: bool = False, 335 ): 336 """Main async scraper function with parallel domain processing""" 337 config = load_config() 338 registry = load_registry() 339 sources = discover_news_sources() 340 341 log(f"Found {len(sources)} news sources") 342 343 # Get all URLs 344 all_urls = get_all_urls(sources, source_filter) 345 log(f"Found {len(all_urls)} total URLs") 346 347 # Filter to new URLs only 348 new_urls = filter_new_urls(all_urls, registry) 349 log(f"Found {len(new_urls)} NEW URLs to scrape") 350 351 if limit: 352 new_urls = new_urls[:limit] 353 log(f"Limited to {limit} URLs") 354 355 if dry_run: 356 print("\n=== DRY RUN ===\n") 357 domains = group_urls_by_domain(new_urls) 358 for domain, urls in sorted(domains.items(), key=lambda x: -len(x[1])): 359 print(f"{domain}: {len(urls)} URLs") 360 if verbose: 361 for u in urls[:3]: 362 print(f" - {u['title'][:50]}") 363 if len(urls) > 3: 364 print(f" ... and {len(urls) - 3} more") 365 print(f"\nWould scrape {len(new_urls)} URLs across {len(domains)} domains") 366 return 367 368 if not new_urls: 369 log("No new URLs to scrape") 370 return 371 372 # Group URLs by domain 373 domains = group_urls_by_domain(new_urls) 374 log(f"URLs grouped into {len(domains)} domains") 375 376 # Show domain distribution 377 for domain, urls in sorted(domains.items(), key=lambda x: -len(x[1]))[:5]: 378 log(f" {domain}: {len(urls)} URLs") 379 if len(domains) > 5: 380 log(f" ... and {len(domains) - 5} more domains") 381 382 print() 383 log("🚀 Starting parallel scraper...") 384 print() 385 386 results = {"success": 0, "failed": 0} 387 progress = {"current": 0, "total": len(new_urls)} 388 389 async with async_playwright() as p: 390 browser = await p.chromium.launch(headless=True) 391 392 # Create tasks for each domain (limited concurrency) 393 semaphore = asyncio.Semaphore(MAX_CONCURRENT_DOMAINS) 394 395 async def bounded_scrape(domain, urls): 396 async with semaphore: 397 await scrape_domain_queue( 398 domain, urls, browser, config, registry, results, progress 399 ) 400 401 # Run all domain scrapers concurrently (bounded by semaphore) 402 tasks = [ 403 bounded_scrape(domain, urls) 404 for domain, urls in domains.items() 405 ] 406 407 await asyncio.gather(*tasks) 408 409 await browser.close() 410 411 print() 412 log("=" * 50) 413 log(f"✅ Success: {results['success']}") 414 log(f"❌ Failed: {results['failed']}") 415 log(f"📊 Total: {results['success'] + results['failed']}") 416 log("=" * 50) 417 418 419 def run_scraper( 420 dry_run: bool = False, 421 source_filter: str = None, 422 limit: int = None, 423 verbose: bool = False, 424 ): 425 """Wrapper to run async scraper""" 426 asyncio.run(run_scraper_async(dry_run, source_filter, limit, verbose)) 427 428 429 def main(): 430 parser = argparse.ArgumentParser( 431 description="Scrape news articles from URLs in markdown files (PARALLEL)" 432 ) 433 parser.add_argument( 434 "--dry-run", 435 action="store_true", 436 help="Show what would be scraped without actually scraping", 437 ) 438 parser.add_argument( 439 "--source", 440 type=str, 441 help="Only scrape URLs from a specific source (e.g., BBC, HK01)", 442 ) 443 parser.add_argument( 444 "--limit", 445 type=int, 446 help="Limit the number of URLs to scrape", 447 ) 448 parser.add_argument( 449 "--verbose", "-v", 450 action="store_true", 451 help="Show verbose output", 452 ) 453 parser.add_argument( 454 "--list-sources", 455 action="store_true", 456 help="List all available news sources", 457 ) 458 459 args = parser.parse_args() 460 461 if args.list_sources: 462 sources = discover_news_sources() 463 registry = load_registry() 464 scraped = registry.get("scraped_urls", {}) 465 466 print("Available news sources:") 467 for name, path in sorted(sources.items()): 468 urls = extract_urls_from_markdown(path) 469 new_count = len([u for u in urls if u["url"] not in scraped]) 470 print(f" {name}: {len(urls)} URLs ({new_count} new)") 471 return 472 473 run_scraper( 474 dry_run=args.dry_run, 475 source_filter=args.source, 476 limit=args.limit, 477 verbose=args.verbose, 478 ) 479 480 481 if __name__ == "__main__": 482 main()