/ outbound-engine / scripts / competitive-monitor.py
competitive-monitor.py
  1  #!/usr/bin/env python3
  2  """
  3  Competitive Monitor — tracks pricing, blog posts, and feature changes across competitors.
  4  
  5  Generates weekly competitive intelligence diffs. Configurable competitor list.
  6  
  7  Usage:
  8      python3 competitive-monitor.py
  9      python3 competitive-monitor.py --company acme
 10      python3 competitive-monitor.py --output report.md
 11      python3 competitive-monitor.py --config competitors.json
 12  
 13  Competitor config can be provided via:
 14      1. --config flag pointing to a JSON file
 15      2. COMPETITORS_CONFIG env var pointing to a JSON file
 16      3. Built-in example competitors (for demo purposes)
 17  """
 18  
 19  import argparse
 20  import json
 21  import os
 22  import re
 23  import sys
 24  import urllib.request
 25  import urllib.parse
 26  from datetime import datetime, timedelta
 27  from difflib import unified_diff
 28  from typing import Dict, List, Optional
 29  from html.parser import HTMLParser
 30  from urllib.error import URLError, HTTPError
 31  
 32  
 33  def validate_text(text, max_length=500000):
 34      """Basic input validation for scraped content."""
 35      if not text or not isinstance(text, str):
 36          return text
 37      # Truncate extremely long content
 38      if len(text) > max_length:
 39          text = text[:max_length]
 40      return text
 41  
 42  
 43  class BlogExtractor(HTMLParser):
 44      """Extract blog post titles and dates from HTML."""
 45  
 46      def __init__(self):
 47          super().__init__()
 48          self.posts = []
 49          self.current_title = None
 50          self.current_date = None
 51          self.in_title = False
 52          self.in_date = False
 53          self.title_tags = ['h1', 'h2', 'h3', 'h4']
 54  
 55      def handle_starttag(self, tag, attrs):
 56          if tag.lower() in self.title_tags:
 57              self.in_title = True
 58          for name, value in attrs:
 59              if name in ['class', 'id'] and any(
 60                  date_word in value.lower() for date_word in ['date', 'time', 'published']
 61              ):
 62                  self.in_date = True
 63  
 64      def handle_endtag(self, tag):
 65          if tag.lower() in self.title_tags:
 66              self.in_title = False
 67          self.in_date = False
 68  
 69      def handle_data(self, data):
 70          if self.in_title and data.strip():
 71              self.current_title = data.strip()
 72  
 73          if self.in_date and data.strip():
 74              date_match = re.search(
 75                  r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b|\b\w+ \d{1,2},? \d{4}\b', data
 76              )
 77              if date_match:
 78                  self.current_date = date_match.group()
 79  
 80          if self.current_title and self.current_date:
 81              self.posts.append({
 82                  'title': self.current_title,
 83                  'date': self.current_date,
 84              })
 85              self.current_title = None
 86              self.current_date = None
 87  
 88  
 89  class CompetitiveMonitor:
 90      """Main competitive monitoring class."""
 91  
 92      # Example competitors for demo. Override with --config or COMPETITORS_CONFIG.
 93      EXAMPLE_COMPETITORS = {
 94          'competitor_a': {
 95              'name': 'Competitor A',
 96              'domain': 'competitor-a.com',
 97              'pricing_url': 'https://www.competitor-a.com/pricing',
 98              'blog_url': 'https://www.competitor-a.com/blog',
 99              'linkedin_query': 'Competitor A site:linkedin.com',
100              'jobs_query': 'Competitor A careers OR jobs',
101          },
102          'competitor_b': {
103              'name': 'Competitor B',
104              'domain': 'competitor-b.com',
105              'pricing_url': 'https://www.competitor-b.com/pricing',
106              'blog_url': 'https://www.competitor-b.com/blog',
107              'linkedin_query': 'Competitor B site:linkedin.com',
108              'jobs_query': 'Competitor B careers OR jobs',
109          },
110      }
111  
112      def __init__(self, data_dir: str = None, competitors: dict = None):
113          self.data_dir = data_dir or os.path.join(os.getcwd(), 'data', 'competitive')
114          self.pricing_dir = os.path.join(self.data_dir, 'pricing-snapshots')
115          self.history_dir = os.path.join(self.data_dir, 'scan-history')
116          self.competitors = competitors or self.EXAMPLE_COMPETITORS
117  
118          os.makedirs(self.pricing_dir, exist_ok=True)
119          os.makedirs(self.history_dir, exist_ok=True)
120  
121      def fetch_url(self, url: str, timeout: int = 10) -> Optional[str]:
122          """Fetch URL content with error handling."""
123          try:
124              headers = {
125                  'User-Agent': (
126                      'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) '
127                      'AppleWebKit/537.36 (KHTML, like Gecko) '
128                      'Chrome/91.0.4472.124 Safari/537.36'
129                  )
130              }
131              request = urllib.request.Request(url, headers=headers)
132  
133              with urllib.request.urlopen(request, timeout=timeout) as response:
134                  content = response.read().decode('utf-8', errors='ignore')
135                  content = validate_text(content)
136                  return content
137  
138          except (URLError, HTTPError, UnicodeDecodeError) as e:
139              print(f"āŒ Error fetching {url}: {e}")
140              return None
141  
142      def extract_blog_posts(self, html: str) -> List[Dict]:
143          """Extract blog posts from HTML."""
144          if not html:
145              return []
146  
147          extractor = BlogExtractor()
148          try:
149              extractor.feed(html)
150              return extractor.posts
151          except Exception as e:
152              print(f"Error extracting blog posts: {e}")
153              return []
154  
155      def is_recent_post(self, date_str: str, days_back: int = 7) -> bool:
156          """Check if post is from last N days."""
157          if not date_str:
158              return False
159  
160          formats = [
161              '%m/%d/%Y', '%m-%d-%Y', '%Y-%m-%d',
162              '%B %d, %Y', '%b %d, %Y', '%B %d %Y', '%b %d %Y',
163          ]
164  
165          for fmt in formats:
166              try:
167                  post_date = datetime.strptime(date_str, fmt)
168                  cutoff_date = datetime.now() - timedelta(days=days_back)
169                  return post_date >= cutoff_date
170              except ValueError:
171                  continue
172  
173          return False
174  
175      def get_pricing_diff(self, company_key: str, current_content: str) -> Optional[str]:
176          """Compare current pricing with previous snapshot."""
177          today = datetime.now().strftime('%Y-%m-%d')
178          pricing_file = os.path.join(self.pricing_dir, f'{company_key}-{today}.txt')
179  
180          with open(pricing_file, 'w', encoding='utf-8') as f:
181              f.write(current_content)
182  
183          previous_files = [
184              f for f in os.listdir(self.pricing_dir)
185              if f.startswith(f'{company_key}-') and f != f'{company_key}-{today}.txt'
186          ]
187  
188          if not previous_files:
189              return "šŸ†• First pricing snapshot saved"
190  
191          previous_files.sort(reverse=True)
192          previous_file = os.path.join(self.pricing_dir, previous_files[0])
193  
194          try:
195              with open(previous_file, 'r', encoding='utf-8') as f:
196                  previous_content = f.read()
197  
198              if current_content.strip() == previous_content.strip():
199                  return None
200  
201              current_lines = current_content.splitlines()
202              previous_lines = previous_content.splitlines()
203  
204              diff = list(unified_diff(
205                  previous_lines, current_lines,
206                  fromfile='previous', tofile='current', n=0
207              ))
208  
209              changes = len([
210                  line for line in diff
211                  if line.startswith(('+', '-')) and not line.startswith(('+++', '---'))
212              ])
213  
214              return f"šŸ” {changes} lines changed since last snapshot"
215  
216          except Exception as e:
217              return f"āŒ Error comparing snapshots: {e}"
218  
219      def scan_competitor(self, company_key: str) -> Dict:
220          """Scan single competitor."""
221          company = self.competitors[company_key]
222          print(f"\nšŸ” Scanning {company['name']}...")
223  
224          results = {
225              'company': company['name'],
226              'domain': company['domain'],
227              'scan_time': datetime.now().isoformat(),
228              'pricing': {},
229              'blog': {},
230              'search_queries': {
231                  'linkedin': company.get('linkedin_query', ''),
232                  'jobs': company.get('jobs_query', ''),
233              },
234          }
235  
236          # Fetch pricing page
237          pricing_url = company.get('pricing_url')
238          if pricing_url:
239              print(f"  šŸ“„ Fetching pricing: {pricing_url}")
240              pricing_content = self.fetch_url(pricing_url)
241  
242              if pricing_content:
243                  clean_content = re.sub(r'<[^>]+>', '', pricing_content)
244                  clean_content = re.sub(r'\s+', ' ', clean_content).strip()
245  
246                  pricing_diff = self.get_pricing_diff(company_key, clean_content)
247  
248                  results['pricing'] = {
249                      'url': pricing_url,
250                      'fetched': True,
251                      'content_length': len(clean_content),
252                      'diff': pricing_diff,
253                  }
254              else:
255                  results['pricing'] = {
256                      'url': pricing_url,
257                      'fetched': False,
258                      'error': 'Failed to fetch pricing page',
259                  }
260  
261          # Fetch blog page
262          blog_url = company.get('blog_url')
263          if blog_url:
264              print(f"  šŸ“ Fetching blog: {blog_url}")
265              blog_content = self.fetch_url(blog_url)
266  
267              recent_posts = []
268              if blog_content:
269                  all_posts = self.extract_blog_posts(blog_content)
270                  recent_posts = [post for post in all_posts if self.is_recent_post(post['date'])]
271  
272              results['blog'] = {
273                  'url': blog_url,
274                  'fetched': bool(blog_content),
275                  'total_posts_found': len(self.extract_blog_posts(blog_content)) if blog_content else 0,
276                  'recent_posts': recent_posts,
277              }
278  
279          return results
280  
281      def generate_report(self, scan_results: List[Dict], threat_keywords: List[str] = None) -> str:
282          """Generate markdown report."""
283          today = datetime.now().strftime('%Y-%m-%d')
284  
285          # Configurable threat keywords (topics that signal competitive overlap)
286          if threat_keywords is None:
287              threat_keywords = ['funnel', 'conversion', 'landing page', 'ab test', 'optimize', 'cro']
288  
289          report = f"""# šŸ” Competitive Intelligence Report - {today}
290  
291  ## Executive Summary
292  
293  Monitored {len(scan_results)} competitors for pricing changes, recent blog activity, and market signals.
294  
295  """
296  
297          threats = []
298          interesting = []
299          opportunities = []
300          search_queries = []
301  
302          for result in scan_results:
303              company = result['company']
304  
305              pricing = result.get('pricing', {})
306              if pricing.get('diff') and 'šŸ”' in str(pricing['diff']):
307                  interesting.append(
308                      f"**{company}**: {pricing['diff']} → *Monitor for pricing strategy shifts*"
309                  )
310              elif pricing.get('diff') and 'šŸ†•' in str(pricing['diff']):
311                  interesting.append(
312                      f"**{company}**: {pricing['diff']} → *Baseline established for future tracking*"
313                  )
314  
315              blog = result.get('blog', {})
316              recent_posts = blog.get('recent_posts', [])
317  
318              if recent_posts:
319                  post_titles = [
320                      post['title'][:80] + '...' if len(post['title']) > 80 else post['title']
321                      for post in recent_posts[:3]
322                  ]
323                  content_lower = ' '.join(post_titles).lower()
324  
325                  if any(keyword in content_lower for keyword in threat_keywords):
326                      threats.append(
327                          f"**{company}**: {len(recent_posts)} recent posts, potential feature overlap → *Review competitive positioning*"
328                      )
329                  else:
330                      interesting.append(
331                          f"**{company}**: {len(recent_posts)} recent posts → *{', '.join(post_titles[:2])}*"
332                      )
333              else:
334                  opportunities.append(
335                      f"**{company}**: No recent blog content → *Content marketing gap you can exploit*"
336                  )
337  
338              sq = result.get('search_queries', {})
339              if sq.get('linkedin'):
340                  search_queries.append(f"LinkedIn search: {sq['linkedin']}")
341              if sq.get('jobs'):
342                  search_queries.append(f"Jobs search: {sq['jobs']}")
343  
344          if threats:
345              report += "## šŸ”“ THREATS\n\n"
346              for threat in threats:
347                  report += f"- {threat}\n"
348              report += "\n"
349  
350          if interesting:
351              report += "## 🟔 INTERESTING\n\n"
352              for item in interesting:
353                  report += f"- {item}\n"
354              report += "\n"
355  
356          if opportunities:
357              report += "## 🟢 OPPORTUNITIES\n\n"
358              for opp in opportunities:
359                  report += f"- {opp}\n"
360              report += "\n"
361  
362          if search_queries:
363              report += "## šŸ”Ž LinkedIn/Jobs Search Queries\n\n"
364              report += "Run these queries for social/hiring signals:\n\n"
365              for query in search_queries:
366                  report += f"- `{query}`\n"
367              report += "\n"
368  
369          report += "## šŸ“Š Technical Summary\n\n"
370          for result in scan_results:
371              company = result['company']
372              pricing = result.get('pricing', {})
373              blog = result.get('blog', {})
374  
375              report += f"**{company}:**\n"
376              report += f"- Pricing: {'āœ…' if pricing.get('fetched') else 'āŒ'} {pricing.get('diff', 'No changes')}\n"
377              report += f"- Blog: {'āœ…' if blog.get('fetched') else 'āŒ'} {len(blog.get('recent_posts', []))} recent posts\n\n"
378  
379          return report
380  
381      def save_results(self, scan_results: List[Dict]) -> str:
382          """Save scan results to files."""
383          today = datetime.now().strftime('%Y-%m-%d')
384  
385          latest_file = os.path.join(self.data_dir, 'latest-scan.json')
386          with open(latest_file, 'w') as f:
387              json.dump(scan_results, f, indent=2)
388  
389          history_file = os.path.join(self.history_dir, f'{today}.json')
390          with open(history_file, 'w') as f:
391              json.dump(scan_results, f, indent=2)
392  
393          return latest_file
394  
395      def run(self, company_filter: Optional[str] = None) -> str:
396          """Run competitive monitoring scan."""
397          print("šŸš€ Starting competitive monitoring scan...")
398  
399          companies_to_scan = (
400              [company_filter] if company_filter else list(self.competitors.keys())
401          )
402  
403          if company_filter and company_filter not in self.competitors:
404              print(f"āŒ Unknown company: {company_filter}")
405              print(f"Available companies: {', '.join(self.competitors.keys())}")
406              return ""
407  
408          scan_results = []
409          for company_key in companies_to_scan:
410              try:
411                  result = self.scan_competitor(company_key)
412                  scan_results.append(result)
413              except Exception as e:
414                  print(f"āŒ Error scanning {company_key}: {e}")
415  
416          self.save_results(scan_results)
417          report = self.generate_report(scan_results)
418  
419          print(f"\nāœ… Scan complete! Results for {len(scan_results)} companies.")
420          return report
421  
422  
423  def load_competitors_config(config_path: str) -> dict:
424      """Load competitors from a JSON config file.
425  
426      Expected format:
427      {
428          "competitor_key": {
429              "name": "Competitor Name",
430              "domain": "competitor.com",
431              "pricing_url": "https://competitor.com/pricing",
432              "blog_url": "https://competitor.com/blog",
433              "linkedin_query": "Competitor Name site:linkedin.com",
434              "jobs_query": "Competitor Name careers OR jobs"
435          }
436      }
437      """
438      with open(config_path, 'r') as f:
439          return json.load(f)
440  
441  
442  def main():
443      parser = argparse.ArgumentParser(description='Competitive Monitoring Scraper')
444      parser.add_argument('--company', help='Scan specific company only (by key)')
445      parser.add_argument('--output', '-o', help='Save report to file')
446      parser.add_argument('--config', help='Path to competitors JSON config file')
447      parser.add_argument('--data-dir', help='Directory for storing scan data')
448      parser.add_argument('--threat-keywords', nargs='*',
449                          help='Keywords that signal competitive overlap (space-separated)')
450  
451      args = parser.parse_args()
452  
453      # Load competitor config
454      config_path = args.config or os.environ.get('COMPETITORS_CONFIG')
455      competitors = None
456      if config_path:
457          try:
458              competitors = load_competitors_config(config_path)
459              print(f"šŸ“‹ Loaded {len(competitors)} competitors from {config_path}")
460          except Exception as e:
461              print(f"āŒ Error loading config: {e}")
462              sys.exit(1)
463  
464      monitor = CompetitiveMonitor(
465          data_dir=args.data_dir,
466          competitors=competitors,
467      )
468  
469      report = monitor.run(args.company)
470  
471      if report:
472          print("\n" + "=" * 60)
473          print(report)
474          print("=" * 60)
475  
476          if args.output:
477              with open(args.output, 'w') as f:
478                  f.write(report)
479              print(f"\nšŸ“ Report saved to: {args.output}")
480  
481  
482  if __name__ == '__main__':
483      main()