competitive-monitor.py
1 #!/usr/bin/env python3 2 """ 3 Competitive Monitor ā tracks pricing, blog posts, and feature changes across competitors. 4 5 Generates weekly competitive intelligence diffs. Configurable competitor list. 6 7 Usage: 8 python3 competitive-monitor.py 9 python3 competitive-monitor.py --company acme 10 python3 competitive-monitor.py --output report.md 11 python3 competitive-monitor.py --config competitors.json 12 13 Competitor config can be provided via: 14 1. --config flag pointing to a JSON file 15 2. COMPETITORS_CONFIG env var pointing to a JSON file 16 3. Built-in example competitors (for demo purposes) 17 """ 18 19 import argparse 20 import json 21 import os 22 import re 23 import sys 24 import urllib.request 25 import urllib.parse 26 from datetime import datetime, timedelta 27 from difflib import unified_diff 28 from typing import Dict, List, Optional 29 from html.parser import HTMLParser 30 from urllib.error import URLError, HTTPError 31 32 33 def validate_text(text, max_length=500000): 34 """Basic input validation for scraped content.""" 35 if not text or not isinstance(text, str): 36 return text 37 # Truncate extremely long content 38 if len(text) > max_length: 39 text = text[:max_length] 40 return text 41 42 43 class BlogExtractor(HTMLParser): 44 """Extract blog post titles and dates from HTML.""" 45 46 def __init__(self): 47 super().__init__() 48 self.posts = [] 49 self.current_title = None 50 self.current_date = None 51 self.in_title = False 52 self.in_date = False 53 self.title_tags = ['h1', 'h2', 'h3', 'h4'] 54 55 def handle_starttag(self, tag, attrs): 56 if tag.lower() in self.title_tags: 57 self.in_title = True 58 for name, value in attrs: 59 if name in ['class', 'id'] and any( 60 date_word in value.lower() for date_word in ['date', 'time', 'published'] 61 ): 62 self.in_date = True 63 64 def handle_endtag(self, tag): 65 if tag.lower() in self.title_tags: 66 self.in_title = False 67 self.in_date = False 68 69 def handle_data(self, data): 70 if self.in_title and data.strip(): 71 self.current_title = data.strip() 72 73 if self.in_date and data.strip(): 74 date_match = re.search( 75 r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b|\b\w+ \d{1,2},? \d{4}\b', data 76 ) 77 if date_match: 78 self.current_date = date_match.group() 79 80 if self.current_title and self.current_date: 81 self.posts.append({ 82 'title': self.current_title, 83 'date': self.current_date, 84 }) 85 self.current_title = None 86 self.current_date = None 87 88 89 class CompetitiveMonitor: 90 """Main competitive monitoring class.""" 91 92 # Example competitors for demo. Override with --config or COMPETITORS_CONFIG. 93 EXAMPLE_COMPETITORS = { 94 'competitor_a': { 95 'name': 'Competitor A', 96 'domain': 'competitor-a.com', 97 'pricing_url': 'https://www.competitor-a.com/pricing', 98 'blog_url': 'https://www.competitor-a.com/blog', 99 'linkedin_query': 'Competitor A site:linkedin.com', 100 'jobs_query': 'Competitor A careers OR jobs', 101 }, 102 'competitor_b': { 103 'name': 'Competitor B', 104 'domain': 'competitor-b.com', 105 'pricing_url': 'https://www.competitor-b.com/pricing', 106 'blog_url': 'https://www.competitor-b.com/blog', 107 'linkedin_query': 'Competitor B site:linkedin.com', 108 'jobs_query': 'Competitor B careers OR jobs', 109 }, 110 } 111 112 def __init__(self, data_dir: str = None, competitors: dict = None): 113 self.data_dir = data_dir or os.path.join(os.getcwd(), 'data', 'competitive') 114 self.pricing_dir = os.path.join(self.data_dir, 'pricing-snapshots') 115 self.history_dir = os.path.join(self.data_dir, 'scan-history') 116 self.competitors = competitors or self.EXAMPLE_COMPETITORS 117 118 os.makedirs(self.pricing_dir, exist_ok=True) 119 os.makedirs(self.history_dir, exist_ok=True) 120 121 def fetch_url(self, url: str, timeout: int = 10) -> Optional[str]: 122 """Fetch URL content with error handling.""" 123 try: 124 headers = { 125 'User-Agent': ( 126 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) ' 127 'AppleWebKit/537.36 (KHTML, like Gecko) ' 128 'Chrome/91.0.4472.124 Safari/537.36' 129 ) 130 } 131 request = urllib.request.Request(url, headers=headers) 132 133 with urllib.request.urlopen(request, timeout=timeout) as response: 134 content = response.read().decode('utf-8', errors='ignore') 135 content = validate_text(content) 136 return content 137 138 except (URLError, HTTPError, UnicodeDecodeError) as e: 139 print(f"ā Error fetching {url}: {e}") 140 return None 141 142 def extract_blog_posts(self, html: str) -> List[Dict]: 143 """Extract blog posts from HTML.""" 144 if not html: 145 return [] 146 147 extractor = BlogExtractor() 148 try: 149 extractor.feed(html) 150 return extractor.posts 151 except Exception as e: 152 print(f"Error extracting blog posts: {e}") 153 return [] 154 155 def is_recent_post(self, date_str: str, days_back: int = 7) -> bool: 156 """Check if post is from last N days.""" 157 if not date_str: 158 return False 159 160 formats = [ 161 '%m/%d/%Y', '%m-%d-%Y', '%Y-%m-%d', 162 '%B %d, %Y', '%b %d, %Y', '%B %d %Y', '%b %d %Y', 163 ] 164 165 for fmt in formats: 166 try: 167 post_date = datetime.strptime(date_str, fmt) 168 cutoff_date = datetime.now() - timedelta(days=days_back) 169 return post_date >= cutoff_date 170 except ValueError: 171 continue 172 173 return False 174 175 def get_pricing_diff(self, company_key: str, current_content: str) -> Optional[str]: 176 """Compare current pricing with previous snapshot.""" 177 today = datetime.now().strftime('%Y-%m-%d') 178 pricing_file = os.path.join(self.pricing_dir, f'{company_key}-{today}.txt') 179 180 with open(pricing_file, 'w', encoding='utf-8') as f: 181 f.write(current_content) 182 183 previous_files = [ 184 f for f in os.listdir(self.pricing_dir) 185 if f.startswith(f'{company_key}-') and f != f'{company_key}-{today}.txt' 186 ] 187 188 if not previous_files: 189 return "š First pricing snapshot saved" 190 191 previous_files.sort(reverse=True) 192 previous_file = os.path.join(self.pricing_dir, previous_files[0]) 193 194 try: 195 with open(previous_file, 'r', encoding='utf-8') as f: 196 previous_content = f.read() 197 198 if current_content.strip() == previous_content.strip(): 199 return None 200 201 current_lines = current_content.splitlines() 202 previous_lines = previous_content.splitlines() 203 204 diff = list(unified_diff( 205 previous_lines, current_lines, 206 fromfile='previous', tofile='current', n=0 207 )) 208 209 changes = len([ 210 line for line in diff 211 if line.startswith(('+', '-')) and not line.startswith(('+++', '---')) 212 ]) 213 214 return f"š {changes} lines changed since last snapshot" 215 216 except Exception as e: 217 return f"ā Error comparing snapshots: {e}" 218 219 def scan_competitor(self, company_key: str) -> Dict: 220 """Scan single competitor.""" 221 company = self.competitors[company_key] 222 print(f"\nš Scanning {company['name']}...") 223 224 results = { 225 'company': company['name'], 226 'domain': company['domain'], 227 'scan_time': datetime.now().isoformat(), 228 'pricing': {}, 229 'blog': {}, 230 'search_queries': { 231 'linkedin': company.get('linkedin_query', ''), 232 'jobs': company.get('jobs_query', ''), 233 }, 234 } 235 236 # Fetch pricing page 237 pricing_url = company.get('pricing_url') 238 if pricing_url: 239 print(f" š Fetching pricing: {pricing_url}") 240 pricing_content = self.fetch_url(pricing_url) 241 242 if pricing_content: 243 clean_content = re.sub(r'<[^>]+>', '', pricing_content) 244 clean_content = re.sub(r'\s+', ' ', clean_content).strip() 245 246 pricing_diff = self.get_pricing_diff(company_key, clean_content) 247 248 results['pricing'] = { 249 'url': pricing_url, 250 'fetched': True, 251 'content_length': len(clean_content), 252 'diff': pricing_diff, 253 } 254 else: 255 results['pricing'] = { 256 'url': pricing_url, 257 'fetched': False, 258 'error': 'Failed to fetch pricing page', 259 } 260 261 # Fetch blog page 262 blog_url = company.get('blog_url') 263 if blog_url: 264 print(f" š Fetching blog: {blog_url}") 265 blog_content = self.fetch_url(blog_url) 266 267 recent_posts = [] 268 if blog_content: 269 all_posts = self.extract_blog_posts(blog_content) 270 recent_posts = [post for post in all_posts if self.is_recent_post(post['date'])] 271 272 results['blog'] = { 273 'url': blog_url, 274 'fetched': bool(blog_content), 275 'total_posts_found': len(self.extract_blog_posts(blog_content)) if blog_content else 0, 276 'recent_posts': recent_posts, 277 } 278 279 return results 280 281 def generate_report(self, scan_results: List[Dict], threat_keywords: List[str] = None) -> str: 282 """Generate markdown report.""" 283 today = datetime.now().strftime('%Y-%m-%d') 284 285 # Configurable threat keywords (topics that signal competitive overlap) 286 if threat_keywords is None: 287 threat_keywords = ['funnel', 'conversion', 'landing page', 'ab test', 'optimize', 'cro'] 288 289 report = f"""# š Competitive Intelligence Report - {today} 290 291 ## Executive Summary 292 293 Monitored {len(scan_results)} competitors for pricing changes, recent blog activity, and market signals. 294 295 """ 296 297 threats = [] 298 interesting = [] 299 opportunities = [] 300 search_queries = [] 301 302 for result in scan_results: 303 company = result['company'] 304 305 pricing = result.get('pricing', {}) 306 if pricing.get('diff') and 'š' in str(pricing['diff']): 307 interesting.append( 308 f"**{company}**: {pricing['diff']} ā *Monitor for pricing strategy shifts*" 309 ) 310 elif pricing.get('diff') and 'š' in str(pricing['diff']): 311 interesting.append( 312 f"**{company}**: {pricing['diff']} ā *Baseline established for future tracking*" 313 ) 314 315 blog = result.get('blog', {}) 316 recent_posts = blog.get('recent_posts', []) 317 318 if recent_posts: 319 post_titles = [ 320 post['title'][:80] + '...' if len(post['title']) > 80 else post['title'] 321 for post in recent_posts[:3] 322 ] 323 content_lower = ' '.join(post_titles).lower() 324 325 if any(keyword in content_lower for keyword in threat_keywords): 326 threats.append( 327 f"**{company}**: {len(recent_posts)} recent posts, potential feature overlap ā *Review competitive positioning*" 328 ) 329 else: 330 interesting.append( 331 f"**{company}**: {len(recent_posts)} recent posts ā *{', '.join(post_titles[:2])}*" 332 ) 333 else: 334 opportunities.append( 335 f"**{company}**: No recent blog content ā *Content marketing gap you can exploit*" 336 ) 337 338 sq = result.get('search_queries', {}) 339 if sq.get('linkedin'): 340 search_queries.append(f"LinkedIn search: {sq['linkedin']}") 341 if sq.get('jobs'): 342 search_queries.append(f"Jobs search: {sq['jobs']}") 343 344 if threats: 345 report += "## š“ THREATS\n\n" 346 for threat in threats: 347 report += f"- {threat}\n" 348 report += "\n" 349 350 if interesting: 351 report += "## š” INTERESTING\n\n" 352 for item in interesting: 353 report += f"- {item}\n" 354 report += "\n" 355 356 if opportunities: 357 report += "## š¢ OPPORTUNITIES\n\n" 358 for opp in opportunities: 359 report += f"- {opp}\n" 360 report += "\n" 361 362 if search_queries: 363 report += "## š LinkedIn/Jobs Search Queries\n\n" 364 report += "Run these queries for social/hiring signals:\n\n" 365 for query in search_queries: 366 report += f"- `{query}`\n" 367 report += "\n" 368 369 report += "## š Technical Summary\n\n" 370 for result in scan_results: 371 company = result['company'] 372 pricing = result.get('pricing', {}) 373 blog = result.get('blog', {}) 374 375 report += f"**{company}:**\n" 376 report += f"- Pricing: {'ā ' if pricing.get('fetched') else 'ā'} {pricing.get('diff', 'No changes')}\n" 377 report += f"- Blog: {'ā ' if blog.get('fetched') else 'ā'} {len(blog.get('recent_posts', []))} recent posts\n\n" 378 379 return report 380 381 def save_results(self, scan_results: List[Dict]) -> str: 382 """Save scan results to files.""" 383 today = datetime.now().strftime('%Y-%m-%d') 384 385 latest_file = os.path.join(self.data_dir, 'latest-scan.json') 386 with open(latest_file, 'w') as f: 387 json.dump(scan_results, f, indent=2) 388 389 history_file = os.path.join(self.history_dir, f'{today}.json') 390 with open(history_file, 'w') as f: 391 json.dump(scan_results, f, indent=2) 392 393 return latest_file 394 395 def run(self, company_filter: Optional[str] = None) -> str: 396 """Run competitive monitoring scan.""" 397 print("š Starting competitive monitoring scan...") 398 399 companies_to_scan = ( 400 [company_filter] if company_filter else list(self.competitors.keys()) 401 ) 402 403 if company_filter and company_filter not in self.competitors: 404 print(f"ā Unknown company: {company_filter}") 405 print(f"Available companies: {', '.join(self.competitors.keys())}") 406 return "" 407 408 scan_results = [] 409 for company_key in companies_to_scan: 410 try: 411 result = self.scan_competitor(company_key) 412 scan_results.append(result) 413 except Exception as e: 414 print(f"ā Error scanning {company_key}: {e}") 415 416 self.save_results(scan_results) 417 report = self.generate_report(scan_results) 418 419 print(f"\nā Scan complete! Results for {len(scan_results)} companies.") 420 return report 421 422 423 def load_competitors_config(config_path: str) -> dict: 424 """Load competitors from a JSON config file. 425 426 Expected format: 427 { 428 "competitor_key": { 429 "name": "Competitor Name", 430 "domain": "competitor.com", 431 "pricing_url": "https://competitor.com/pricing", 432 "blog_url": "https://competitor.com/blog", 433 "linkedin_query": "Competitor Name site:linkedin.com", 434 "jobs_query": "Competitor Name careers OR jobs" 435 } 436 } 437 """ 438 with open(config_path, 'r') as f: 439 return json.load(f) 440 441 442 def main(): 443 parser = argparse.ArgumentParser(description='Competitive Monitoring Scraper') 444 parser.add_argument('--company', help='Scan specific company only (by key)') 445 parser.add_argument('--output', '-o', help='Save report to file') 446 parser.add_argument('--config', help='Path to competitors JSON config file') 447 parser.add_argument('--data-dir', help='Directory for storing scan data') 448 parser.add_argument('--threat-keywords', nargs='*', 449 help='Keywords that signal competitive overlap (space-separated)') 450 451 args = parser.parse_args() 452 453 # Load competitor config 454 config_path = args.config or os.environ.get('COMPETITORS_CONFIG') 455 competitors = None 456 if config_path: 457 try: 458 competitors = load_competitors_config(config_path) 459 print(f"š Loaded {len(competitors)} competitors from {config_path}") 460 except Exception as e: 461 print(f"ā Error loading config: {e}") 462 sys.exit(1) 463 464 monitor = CompetitiveMonitor( 465 data_dir=args.data_dir, 466 competitors=competitors, 467 ) 468 469 report = monitor.run(args.company) 470 471 if report: 472 print("\n" + "=" * 60) 473 print(report) 474 print("=" * 60) 475 476 if args.output: 477 with open(args.output, 'w') as f: 478 f.write(report) 479 print(f"\nš Report saved to: {args.output}") 480 481 482 if __name__ == '__main__': 483 main()