/ scripts / scraper / scraper.py
scraper.py
  1  #!/usr/bin/env python3
  2  """
  3  News Scraper for Hong Kong Fire Documentary
  4  Extracts URLs from markdown files, deduplicates, and archives HTML content.
  5  Now with PARALLEL scraping across different domains!
  6  """
  7  
  8  import argparse
  9  import asyncio
 10  import json
 11  import os
 12  import re
 13  import sys
 14  import time
 15  import random
 16  from datetime import datetime
 17  from pathlib import Path
 18  from urllib.parse import urlparse
 19  from collections import defaultdict
 20  import unicodedata
 21  
 22  import yaml
 23  from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeout
 24  
 25  
 26  # Project paths
 27  SCRIPT_DIR = Path(__file__).parent.resolve()
 28  PROJECT_ROOT = SCRIPT_DIR.parent.parent
 29  NEWS_DIR = PROJECT_ROOT / "content" / "news"
 30  CONFIG_FILE = SCRIPT_DIR / "config.yml"
 31  REGISTRY_FILE = SCRIPT_DIR / "scraped_urls.json"
 32  
 33  # Concurrency settings
 34  MAX_CONCURRENT_DOMAINS = 5  # Scrape up to 5 different domains at once
 35  MAX_CONCURRENT_PER_DOMAIN = 1  # Only 1 request per domain at a time (be nice)
 36  
 37  
 38  def log(msg: str, level: str = "INFO"):
 39      """Print timestamped log message"""
 40      timestamp = datetime.now().strftime("%H:%M:%S")
 41      print(f"[{timestamp}] {level}: {msg}", flush=True)
 42  
 43  
 44  def load_config() -> dict:
 45      """Load configuration from config.yml"""
 46      if CONFIG_FILE.exists():
 47          with open(CONFIG_FILE, "r", encoding="utf-8") as f:
 48              return yaml.safe_load(f)
 49      return {
 50          "rate_limit": {
 51              "delay_seconds": 3,
 52              "max_retries": 3,
 53              "timeout_seconds": 60,
 54          },
 55          "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
 56          "sites": {},
 57      }
 58  
 59  
 60  def load_registry() -> dict:
 61      """Load the registry of previously scraped URLs"""
 62      if REGISTRY_FILE.exists():
 63          with open(REGISTRY_FILE, "r", encoding="utf-8") as f:
 64              return json.load(f)
 65      return {"scraped_urls": {}, "last_updated": None}
 66  
 67  
 68  def save_registry(registry: dict):
 69      """Save the registry of scraped URLs"""
 70      registry["last_updated"] = datetime.now().isoformat()
 71      with open(REGISTRY_FILE, "w", encoding="utf-8") as f:
 72          json.dump(registry, f, indent=2, ensure_ascii=False)
 73  
 74  
 75  def slugify(text: str, max_length: int = 80) -> str:
 76      """Convert text to a filesystem-safe slug"""
 77      text = unicodedata.normalize("NFKD", text)
 78      text = text.lower()
 79      text = re.sub(r"[^\w\s-]", "", text)
 80      text = re.sub(r"[-\s]+", "-", text)
 81      text = text.strip("-")
 82      if len(text) > max_length:
 83          text = text[:max_length].rsplit("-", 1)[0]
 84      return text or "untitled"
 85  
 86  
 87  def extract_urls_from_markdown(filepath: Path) -> list[dict]:
 88      """Extract URLs and titles from a markdown file."""
 89      urls = []
 90      
 91      with open(filepath, "r", encoding="utf-8") as f:
 92          content = f.read()
 93      
 94      # Pattern 1: Markdown links [Title](URL)
 95      link_pattern = r'\[([^\]]+)\]\((https?://[^\)]+)\)'
 96      for match in re.finditer(link_pattern, content):
 97          title, url = match.groups()
 98          if not url.endswith('.md') and not url.startswith('#'):
 99              urls.append({
100                  "title": title.strip("*").strip(),
101                  "url": url.strip(),
102                  "source_file": str(filepath.relative_to(PROJECT_ROOT)),
103              })
104      
105      # Pattern 2: Table format | Title | URL |
106      table_pattern = r'\|\s*([^|]+?)\s*\|\s*<?(\s*https?://[^\s>|]+)\s*>?\s*\|'
107      for match in re.finditer(table_pattern, content):
108          title, url = match.groups()
109          title = title.strip()
110          url = url.strip()
111          if title.lower() not in ['標題', 'title', '連結', 'link', '---', '------']:
112              if url and not url.startswith('---'):
113                  urls.append({
114                      "title": title,
115                      "url": url,
116                      "source_file": str(filepath.relative_to(PROJECT_ROOT)),
117                  })
118      
119      # Pattern 3: List item with angle-bracket URL format: - Title (<URL>)
120      # Used by 東方日報 and similar sources
121      list_angle_pattern = r'^-\s+(.+?)\s+\(<(https?://[^>]+)>\)'
122      for match in re.finditer(list_angle_pattern, content, re.MULTILINE):
123          title, url = match.groups()
124          title = title.strip()
125          url = url.strip()
126          if title and url:
127              urls.append({
128                  "title": title,
129                  "url": url,
130                  "source_file": str(filepath.relative_to(PROJECT_ROOT)),
131              })
132      
133      return urls
134  
135  
136  def get_source_name(filepath: Path) -> str:
137      """Extract source name from filepath"""
138      parts = filepath.relative_to(NEWS_DIR).parts
139      return parts[0] if parts else "unknown"
140  
141  
142  def discover_news_sources() -> dict[str, Path]:
143      """Discover all news source directories with markdown files"""
144      sources = {}
145      if not NEWS_DIR.exists():
146          return sources
147      
148      for item in NEWS_DIR.iterdir():
149          if item.is_dir():
150              for readme in item.glob("[Rr][Ee][Aa][Dd][Mm][Ee].*[Mm][Dd]"):
151                  sources[item.name] = readme
152                  break
153      return sources
154  
155  
156  def get_all_urls(sources: dict[str, Path] = None, source_filter: str = None) -> list[dict]:
157      """Get all URLs from news markdown files."""
158      if sources is None:
159          sources = discover_news_sources()
160      
161      all_urls = []
162      for source_name, filepath in sources.items():
163          if source_filter and source_name.lower() != source_filter.lower():
164              continue
165          urls = extract_urls_from_markdown(filepath)
166          for url_info in urls:
167              url_info["source"] = source_name
168          all_urls.extend(urls)
169      return all_urls
170  
171  
172  def filter_new_urls(urls: list[dict], registry: dict) -> list[dict]:
173      """Filter out URLs that have already been scraped"""
174      scraped = registry.get("scraped_urls", {})
175      return [u for u in urls if u["url"] not in scraped]
176  
177  
178  def get_domain(url: str) -> str:
179      """Extract domain from URL"""
180      parsed = urlparse(url)
181      return parsed.netloc.replace("www.", "")
182  
183  
184  def group_urls_by_domain(urls: list[dict]) -> dict[str, list[dict]]:
185      """Group URLs by their domain for parallel processing"""
186      grouped = defaultdict(list)
187      for url_info in urls:
188          domain = get_domain(url_info["url"])
189          grouped[domain].append(url_info)
190      return dict(grouped)
191  
192  
193  def get_site_config(url: str, config: dict) -> dict:
194      """Get site-specific configuration"""
195      domain = get_domain(url)
196      site_config = config.get("sites", {}).get(domain, {})
197      return {
198          "delay_seconds": site_config.get("delay_seconds", config["rate_limit"]["delay_seconds"]),
199          "max_retries": site_config.get("max_retries", config["rate_limit"]["max_retries"]),
200          "timeout_seconds": site_config.get("timeout_seconds", config["rate_limit"]["timeout_seconds"]),
201      }
202  
203  
204  def save_archive(url_info: dict, html: str, source_dir: Path) -> Path:
205      """Save scraped content to archive directory"""
206      archive_dir = source_dir / "archive"
207      archive_dir.mkdir(exist_ok=True)
208      
209      slug = slugify(url_info["title"])
210      article_dir = archive_dir / slug
211      
212      if article_dir.exists():
213          counter = 1
214          while (archive_dir / f"{slug}-{counter}").exists():
215              counter += 1
216          article_dir = archive_dir / f"{slug}-{counter}"
217      
218      article_dir.mkdir(exist_ok=True)
219      
220      # Save HTML
221      with open(article_dir / "index.html", "w", encoding="utf-8") as f:
222          f.write(html)
223      
224      # Save metadata
225      metadata = {
226          "url": url_info["url"],
227          "title": url_info["title"],
228          "source": url_info["source"],
229          "source_file": url_info["source_file"],
230          "scraped_at": datetime.now().isoformat(),
231          "archive_path": str(article_dir.relative_to(PROJECT_ROOT)),
232      }
233      with open(article_dir / "metadata.json", "w", encoding="utf-8") as f:
234          json.dump(metadata, f, indent=2, ensure_ascii=False)
235      
236      return article_dir
237  
238  
239  async def scrape_url_async(url_info: dict, context, config: dict, retries: int = 0) -> tuple[str, bool]:
240      """Scrape a single URL asynchronously"""
241      url = url_info["url"]
242      site_config = get_site_config(url, config)
243      timeout = site_config["timeout_seconds"] * 1000
244      max_retries = site_config["max_retries"]
245      
246      page = await context.new_page()
247      try:
248          await page.goto(url, timeout=timeout, wait_until="domcontentloaded")
249          await page.wait_for_timeout(1500)  # Wait for JS
250          html = await page.content()
251          return html, True
252      except PlaywrightTimeout:
253          if retries < max_retries:
254              log(f"  ⏳ Timeout, retry {retries + 1}/{max_retries}...", "WARN")
255              await asyncio.sleep(2 ** retries)
256              return await scrape_url_async(url_info, context, config, retries + 1)
257          return "", False
258      except Exception as e:
259          if retries < max_retries:
260              log(f"  ⚠️ Error: {str(e)[:50]}, retry {retries + 1}/{max_retries}...", "WARN")
261              await asyncio.sleep(2 ** retries)
262              return await scrape_url_async(url_info, context, config, retries + 1)
263          return "", False
264      finally:
265          await page.close()
266  
267  
268  async def scrape_domain_queue(
269      domain: str,
270      urls: list[dict],
271      browser,
272      config: dict,
273      registry: dict,
274      results: dict,
275      progress: dict
276  ):
277      """Scrape all URLs for a single domain sequentially"""
278      context = await browser.new_context(
279          user_agent=config["user_agent"],
280          viewport={"width": 1920, "height": 1080},
281      )
282      
283      site_config = get_site_config(urls[0]["url"], config)
284      delay = site_config["delay_seconds"]
285      
286      for url_info in urls:
287          url = url_info["url"]
288          title = url_info["title"][:40]
289          source = url_info["source"]
290          
291          progress["current"] += 1
292          pct = (progress["current"] / progress["total"]) * 100
293          log(f"[{progress['current']}/{progress['total']}] ({pct:.0f}%) {domain}: {title}...")
294          
295          html, success = await scrape_url_async(url_info, context, config)
296          
297          if success and html:
298              # Find source directory
299              source_dir = NEWS_DIR / source
300              if not source_dir.exists():
301                  for d in NEWS_DIR.iterdir():
302                      if d.is_dir() and d.name.lower() == source.lower():
303                          source_dir = d
304                          break
305              
306              archive_path = save_archive(url_info, html, source_dir)
307              
308              # Update registry
309              registry["scraped_urls"][url] = {
310                  "title": url_info["title"],
311                  "source": source,
312                  "scraped_at": datetime.now().isoformat(),
313                  "archive_path": str(archive_path.relative_to(PROJECT_ROOT)),
314              }
315              save_registry(registry)
316              
317              results["success"] += 1
318              log(f"  ✓ Saved ({len(html)//1024}KB)")
319          else:
320              results["failed"] += 1
321              log(f"  ✗ Failed")
322          
323          # Rate limit delay between requests to same domain
324          if urls.index(url_info) < len(urls) - 1:
325              await asyncio.sleep(delay + random.uniform(0, 1))
326      
327      await context.close()
328  
329  
330  async def run_scraper_async(
331      dry_run: bool = False,
332      source_filter: str = None,
333      limit: int = None,
334      verbose: bool = False,
335  ):
336      """Main async scraper function with parallel domain processing"""
337      config = load_config()
338      registry = load_registry()
339      sources = discover_news_sources()
340      
341      log(f"Found {len(sources)} news sources")
342      
343      # Get all URLs
344      all_urls = get_all_urls(sources, source_filter)
345      log(f"Found {len(all_urls)} total URLs")
346      
347      # Filter to new URLs only
348      new_urls = filter_new_urls(all_urls, registry)
349      log(f"Found {len(new_urls)} NEW URLs to scrape")
350      
351      if limit:
352          new_urls = new_urls[:limit]
353          log(f"Limited to {limit} URLs")
354      
355      if dry_run:
356          print("\n=== DRY RUN ===\n")
357          domains = group_urls_by_domain(new_urls)
358          for domain, urls in sorted(domains.items(), key=lambda x: -len(x[1])):
359              print(f"{domain}: {len(urls)} URLs")
360              if verbose:
361                  for u in urls[:3]:
362                      print(f"  - {u['title'][:50]}")
363                  if len(urls) > 3:
364                      print(f"  ... and {len(urls) - 3} more")
365          print(f"\nWould scrape {len(new_urls)} URLs across {len(domains)} domains")
366          return
367      
368      if not new_urls:
369          log("No new URLs to scrape")
370          return
371      
372      # Group URLs by domain
373      domains = group_urls_by_domain(new_urls)
374      log(f"URLs grouped into {len(domains)} domains")
375      
376      # Show domain distribution
377      for domain, urls in sorted(domains.items(), key=lambda x: -len(x[1]))[:5]:
378          log(f"  {domain}: {len(urls)} URLs")
379      if len(domains) > 5:
380          log(f"  ... and {len(domains) - 5} more domains")
381      
382      print()
383      log("🚀 Starting parallel scraper...")
384      print()
385      
386      results = {"success": 0, "failed": 0}
387      progress = {"current": 0, "total": len(new_urls)}
388      
389      async with async_playwright() as p:
390          browser = await p.chromium.launch(headless=True)
391          
392          # Create tasks for each domain (limited concurrency)
393          semaphore = asyncio.Semaphore(MAX_CONCURRENT_DOMAINS)
394          
395          async def bounded_scrape(domain, urls):
396              async with semaphore:
397                  await scrape_domain_queue(
398                      domain, urls, browser, config, registry, results, progress
399                  )
400          
401          # Run all domain scrapers concurrently (bounded by semaphore)
402          tasks = [
403              bounded_scrape(domain, urls)
404              for domain, urls in domains.items()
405          ]
406          
407          await asyncio.gather(*tasks)
408          
409          await browser.close()
410      
411      print()
412      log("=" * 50)
413      log(f"✅ Success: {results['success']}")
414      log(f"❌ Failed:  {results['failed']}")
415      log(f"📊 Total:   {results['success'] + results['failed']}")
416      log("=" * 50)
417  
418  
419  def run_scraper(
420      dry_run: bool = False,
421      source_filter: str = None,
422      limit: int = None,
423      verbose: bool = False,
424  ):
425      """Wrapper to run async scraper"""
426      asyncio.run(run_scraper_async(dry_run, source_filter, limit, verbose))
427  
428  
429  def main():
430      parser = argparse.ArgumentParser(
431          description="Scrape news articles from URLs in markdown files (PARALLEL)"
432      )
433      parser.add_argument(
434          "--dry-run",
435          action="store_true",
436          help="Show what would be scraped without actually scraping",
437      )
438      parser.add_argument(
439          "--source",
440          type=str,
441          help="Only scrape URLs from a specific source (e.g., BBC, HK01)",
442      )
443      parser.add_argument(
444          "--limit",
445          type=int,
446          help="Limit the number of URLs to scrape",
447      )
448      parser.add_argument(
449          "--verbose", "-v",
450          action="store_true",
451          help="Show verbose output",
452      )
453      parser.add_argument(
454          "--list-sources",
455          action="store_true",
456          help="List all available news sources",
457      )
458      
459      args = parser.parse_args()
460      
461      if args.list_sources:
462          sources = discover_news_sources()
463          registry = load_registry()
464          scraped = registry.get("scraped_urls", {})
465          
466          print("Available news sources:")
467          for name, path in sorted(sources.items()):
468              urls = extract_urls_from_markdown(path)
469              new_count = len([u for u in urls if u["url"] not in scraped])
470              print(f"  {name}: {len(urls)} URLs ({new_count} new)")
471          return
472      
473      run_scraper(
474          dry_run=args.dry_run,
475          source_filter=args.source,
476          limit=args.limit,
477          verbose=args.verbose,
478      )
479  
480  
481  if __name__ == "__main__":
482      main()