/ scripts / post-restore-migrate.py
post-restore-migrate.py
  1  #!/usr/bin/env python3
  2  """
  3  Post-restore DB migration script.
  4  
  5  Run this ONCE after copying a fresh sites.db from the Mar-7-2026 Syncthing backup.
  6  It applies all data migrations that were done in sessions since the backup date:
  7    - Status renames: scored→prog_scored, rescored→semantic_scored, ignore→ignored
  8    - Country code: GB→UK
  9    - Seeds: countries, cron_jobs, pipeline_control, settings, email_exclusion_list
 10  
 11  Usage:
 12      cd /home/jason/code/333Method
 13      python3 scripts/post-restore-migrate.py [path/to/sites.db]
 14  
 15  Safety checks:
 16    - Aborts if sites count < 100,000 (empty/wrong DB)
 17    - Aborts if any migration already applied (idempotent marker check)
 18    - Creates a checkpoint WAL before starting
 19  """
 20  
 21  import sqlite3
 22  import sys
 23  import os
 24  import re
 25  
 26  DB_PATH = sys.argv[1] if len(sys.argv) > 1 else "db/sites.db"
 27  
 28  if not os.path.exists(DB_PATH):
 29      print(f"ERROR: DB not found at {DB_PATH}")
 30      sys.exit(1)
 31  
 32  db_size_mb = os.path.getsize(DB_PATH) / (1024 * 1024)
 33  print(f"DB path: {DB_PATH} ({db_size_mb:.0f} MB)")
 34  
 35  conn = sqlite3.connect(DB_PATH, isolation_level=None)  # autocommit mode
 36  conn.execute("PRAGMA journal_mode = WAL")
 37  conn.execute("PRAGMA busy_timeout = 120000")
 38  conn.execute("PRAGMA foreign_keys = OFF")
 39  
 40  # Safety check: DB must have real data
 41  site_count = conn.execute("SELECT COUNT(*) FROM sites").fetchone()[0]
 42  print(f"Sites count: {site_count:,}")
 43  if site_count < 100_000:
 44      print(f"ERROR: Only {site_count} sites — this looks like an empty or wrong DB. Aborting.")
 45      conn.close()
 46      sys.exit(1)
 47  
 48  # Check idempotency: if migrations table already has 097, we already ran this
 49  tables = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
 50  if 'migrations' in tables:
 51      already = {r[0] for r in conn.execute("SELECT filename FROM migrations").fetchall()}
 52      if '097-rename-site-statuses.sql' in already:
 53          print("Migration 097 already applied — this script has already run. Exiting.")
 54          conn.close()
 55          sys.exit(0)
 56  
 57  # Force WAL checkpoint to ensure clean state
 58  conn.execute("PRAGMA wal_checkpoint(FULL)")
 59  conn.commit()
 60  
 61  print("\n=== Step 1: Drop views that reference 'sites' (re-created at end) ===")
 62  views_to_recreate = []
 63  for (view_name, view_sql) in conn.execute("SELECT name, sql FROM sqlite_master WHERE type='view'").fetchall():
 64      print(f"  Dropping view: {view_name}")
 65      views_to_recreate.append((view_name, view_sql))
 66      conn.execute(f"DROP VIEW IF EXISTS {view_name}")
 67  conn.commit()
 68  print(f"  Dropped {len(views_to_recreate)} views.")
 69  
 70  print("\n=== Step 2: Status renames in sites table ===")
 71  # Check what statuses currently exist
 72  statuses_before = dict(conn.execute("SELECT status, COUNT(*) FROM sites GROUP BY status ORDER BY COUNT(*) DESC").fetchall())
 73  print(f"  Before: {dict(list(statuses_before.items())[:8])}")
 74  
 75  old_statuses_present = any(k in statuses_before for k in ('scored', 'rescored', 'ignore'))
 76  
 77  if old_statuses_present:
 78      print("  Old status values present — need table recreation to update CHECK constraint")
 79  
 80      # Get columns from the CURRENT (old) table — preserve ALL of them
 81      old_cols = [r[1] for r in conn.execute("PRAGMA table_info(sites)").fetchall()]
 82  
 83      # Get the old DDL and patch it to allow new status values
 84      # We use the CURRENT table's DDL (not schema.sql) to preserve all columns
 85      old_ddl_row = conn.execute(
 86          "SELECT sql FROM sqlite_master WHERE type='table' AND name='sites'"
 87      ).fetchone()
 88      if not old_ddl_row:
 89          print("  ERROR: Could not get DDL for sites table")
 90          conn.close()
 91          sys.exit(1)
 92  
 93      old_ddl = old_ddl_row[0]
 94      # Patch the CHECK constraint on status to include the new values
 95      # Old: status IN ('found', 'assets_captured', ..., 'ignore', ...)
 96      # New: include prog_scored, semantic_scored, ignored + keep old ones for safe transition
 97      new_create_sql = re.sub(r'CREATE TABLE "?sites"?(?!_)', 'CREATE TABLE sites_new', old_ddl, count=1)
 98      # Patch the bare 'status' column CHECK (not conversation_status, http_status_code, ssl_status etc.)
 99      # Match: start-of-line whitespace + "status TEXT" (not preceded by other word chars)
100      new_create_sql = re.sub(
101          r"(\n\s+)status TEXT[^\n]*(?:\n[^\n]*)*?CHECK\s*\(status IN \([^;]+?\)\)",
102          r"\1status TEXT DEFAULT 'found' CHECK(status IN ("
103          "'found', 'assets_captured', 'prog_scored', 'semantic_scored', 'vision_scored', "
104          "'enriched_regex', 'enriched_llm', 'enriched', "
105          "'proposals_drafted', 'outreach_partial', 'outreach_sent', "
106          "'ignored', 'failing', 'high_score', 'dead_letter', "
107          "'scored', 'rescored', 'ignore'))",
108          new_create_sql,
109          count=1
110      )
111  
112      common_cols = old_cols  # use ALL old columns
113      print(f"  Preserving all {len(common_cols)} columns from current table")
114  
115      # Recreate sites_new with patched schema (all columns preserved)
116      conn.execute("DROP TABLE IF EXISTS sites_new")
117      conn.commit()
118      conn.execute(new_create_sql)
119      conn.commit()
120  
121      # Build INSERT with renames for common columns
122      select_parts = []
123      for c in common_cols:
124          if c == 'status':
125              select_parts.append("""CASE status
126                  WHEN 'scored'   THEN 'prog_scored'
127                  WHEN 'rescored' THEN 'semantic_scored'
128                  WHEN 'ignore'   THEN 'ignored'
129                  ELSE status
130              END""")
131          elif c == 'country_code':
132              select_parts.append("CASE country_code WHEN 'GB' THEN 'UK' ELSE country_code END")
133          else:
134              select_parts.append(f'"{c}"')
135  
136      col_list = ', '.join(f'"{c}"' for c in common_cols)
137      sel_list = ', '.join(select_parts)
138  
139      print(f"  Copying {site_count:,} rows to sites_new...")
140      conn.execute(f"INSERT INTO sites_new ({col_list}) SELECT {sel_list} FROM sites")
141      after = conn.execute("SELECT COUNT(*) FROM sites_new").fetchone()[0]
142      print(f"  Copied: {after:,}")
143  
144      if after != site_count:
145          print(f"  ABORT: count mismatch {site_count} vs {after}")
146          conn.execute("DROP TABLE IF EXISTS sites_new")
147          conn.commit()
148          conn.close()
149          sys.exit(1)
150  
151      conn.execute("DROP TABLE sites")
152      conn.execute("ALTER TABLE sites_new RENAME TO sites")
153      conn.commit()
154      print("  Sites table recreation complete.")
155  else:
156      print("  No old status values found — checking GB→UK only")
157      gb_count = conn.execute("SELECT COUNT(*) FROM sites WHERE country_code = 'GB'").fetchone()[0]
158      if gb_count > 0:
159          print(f"  Renaming {gb_count} GB→UK records...")
160          conn.execute("UPDATE sites SET country_code = 'UK' WHERE country_code = 'GB'")
161          conn.commit()
162      else:
163          print("  No GB records found either — data already clean.")
164  
165  statuses_after = dict(conn.execute("SELECT status, COUNT(*) FROM sites GROUP BY status ORDER BY COUNT(*) DESC").fetchall())
166  print(f"  After: {dict(list(statuses_after.items())[:8])}")
167  gb_remaining = conn.execute("SELECT COUNT(*) FROM sites WHERE country_code = 'GB'").fetchone()[0]
168  print(f"  GB remaining: {gb_remaining}")
169  
170  print("\n=== Step 3: Status renames in site_status table ===")
171  ss_count = conn.execute("SELECT COUNT(*) FROM site_status").fetchone()[0]
172  print(f"  site_status rows: {ss_count:,}")
173  if ss_count > 0:
174      old_in_ss = conn.execute("SELECT COUNT(*) FROM site_status WHERE status IN ('scored','rescored','ignore')").fetchone()[0]
175      if old_in_ss > 0:
176          print(f"  {old_in_ss} old values — recreating site_status with patched CHECK constraint...")
177          ss_cols = [r[1] for r in conn.execute("PRAGMA table_info(site_status)").fetchall()]
178  
179          # Use current table's DDL (not schema.sql) to preserve all columns
180          old_ss_ddl_row = conn.execute(
181              "SELECT sql FROM sqlite_master WHERE type='table' AND name='site_status'"
182          ).fetchone()
183          if not old_ss_ddl_row:
184              print("  ERROR: Could not get DDL for site_status table")
185              conn.close()
186              sys.exit(1)
187          old_ss_ddl = old_ss_ddl_row[0]
188          new_ss_sql = re.sub(r'CREATE TABLE "?site_status"?(?!_)', 'CREATE TABLE site_status_new', old_ss_ddl, count=1)
189          new_ss_sql = re.sub(
190              r"(\n\s+)status TEXT[^\n]*(?:\n[^\n]*)*?CHECK\s*\(status IN \([^;]+?\)\)",
191              r"\1status TEXT NOT NULL CHECK(status IN ("
192              "'found', 'assets_captured', 'prog_scored', 'semantic_scored', 'vision_scored', "
193              "'enriched_regex', 'enriched_llm', 'enriched', "
194              "'proposals_drafted', 'outreach_partial', 'outreach_sent', "
195              "'ignored', 'failing', 'high_score', 'dead_letter', "
196              "'scored', 'rescored', 'ignore'))",
197              new_ss_sql,
198              count=1
199          )
200  
201          common_ss_cols = ss_cols  # preserve ALL old columns
202          print(f"  Preserving all {len(common_ss_cols)} columns from current site_status table")
203  
204          conn.execute("DROP TABLE IF EXISTS site_status_new")
205          conn.execute(new_ss_sql)
206          ss_sel_parts = []
207          for c in common_ss_cols:
208              if c == 'status':
209                  ss_sel_parts.append("""CASE status
210                      WHEN 'scored' THEN 'prog_scored'
211                      WHEN 'rescored' THEN 'semantic_scored'
212                      WHEN 'ignore' THEN 'ignored'
213                      ELSE status
214                  END""")
215              else:
216                  ss_sel_parts.append(f'"{c}"')
217          col_list_ss = ', '.join(f'"{c}"' for c in common_ss_cols)
218          sel_list_ss = ', '.join(ss_sel_parts)
219          conn.execute(f"INSERT INTO site_status_new ({col_list_ss}) SELECT {sel_list_ss} FROM site_status")
220          ss_after = conn.execute("SELECT COUNT(*) FROM site_status_new").fetchone()[0]
221          print(f"  Copied: {ss_after:,}")
222          conn.execute("DROP TABLE site_status")
223          conn.execute("ALTER TABLE site_status_new RENAME TO site_status")
224          conn.commit()
225          print("  site_status recreation complete.")
226      else:
227          print("  site_status already clean.")
228  
229  print("\n=== Step 4: Recreate views ===")
230  for (vname, vsql) in views_to_recreate:
231      try:
232          conn.execute(vsql)
233          print(f"  Recreated: {vname}")
234      except Exception as e:
235          print(f"  WARNING: Could not recreate view {vname}: {e}")
236  conn.commit()
237  
238  print("\n=== Step 5: Recreate indexes ===")
239  indexes = [
240      "CREATE INDEX IF NOT EXISTS idx_sites_keyword ON sites(keyword)",
241      "CREATE INDEX IF NOT EXISTS idx_sites_status ON sites(status)",
242      "CREATE INDEX IF NOT EXISTS idx_sites_score ON sites(score)",
243      "CREATE INDEX IF NOT EXISTS idx_sites_last_outreach ON sites(last_outreach_at)",
244      "CREATE INDEX IF NOT EXISTS idx_sites_country ON sites(country_code)",
245      "CREATE INDEX IF NOT EXISTS idx_sites_domain ON sites(domain)",
246      "CREATE INDEX IF NOT EXISTS idx_sites_created_at ON sites(created_at)",
247  ]
248  for idx in indexes:
249      conn.execute(idx)
250  conn.commit()
251  print(f"  Created {len(indexes)} indexes.")
252  
253  print("\n=== Step 6: Seed countries table ===")
254  country_count = conn.execute("SELECT COUNT(*) FROM countries").fetchone()[0]
255  if country_count >= 25:
256      print(f"  Countries already seeded ({country_count} rows). Skipping.")
257  else:
258      print(f"  Only {country_count} countries — seeding...")
259      # Read and execute migration 022
260      migration_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
261                                     'db', 'migrations', '022-create-countries-table.sql')
262      with open(migration_path) as f:
263          country_sql = f.read()
264      # Strip the DROP TABLE IF EXISTS to avoid losing data
265      country_sql_safe = country_sql.replace('DROP TABLE IF EXISTS countries;', '-- DROP TABLE IF EXISTS countries (skipped);')
266      # Execute each statement
267      for stmt in country_sql_safe.split(';'):
268          stmt = stmt.strip()
269          if stmt and not stmt.startswith('--') and not stmt.startswith('SELECT'):
270              try:
271                  conn.execute(stmt)
272              except Exception as e:
273                  if 'already exists' not in str(e) and 'duplicate' not in str(e).lower():
274                      print(f"  Warning: {e} for stmt: {stmt[:80]}")
275      conn.commit()
276      country_count = conn.execute("SELECT COUNT(*) FROM countries").fetchone()[0]
277      print(f"  Countries seeded: {country_count}")
278  
279  print("\n=== Step 7: Seed pipeline_control ===")
280  pc_count = conn.execute("SELECT COUNT(*) FROM pipeline_control").fetchone()[0]
281  if pc_count == 0:
282      conn.execute("INSERT OR IGNORE INTO pipeline_control (id, paused) VALUES (1, 0)")
283      conn.commit()
284      print("  pipeline_control seeded.")
285  else:
286      print(f"  pipeline_control already has {pc_count} rows.")
287  
288  print("\n=== Step 8: Seed settings ===")
289  cbc = conn.execute("SELECT value FROM settings WHERE key='cron_circuit_breaker_enabled'").fetchone()
290  if not cbc:
291      conn.execute("INSERT OR IGNORE INTO settings (key, value) VALUES ('cron_circuit_breaker_enabled', 'true')")
292      conn.commit()
293      print("  settings: cron_circuit_breaker_enabled seeded.")
294  else:
295      print(f"  settings: cron_circuit_breaker_enabled = {cbc[0]}")
296  
297  print("\n=== Step 9: Seed email_exclusion_list ===")
298  if 'email_exclusion_list' not in tables:
299      print("  Table does not exist — will be created by db-migrate (cron:migrate). Skipping.")
300      excl_count = -1
301  else:
302      excl_count = conn.execute("SELECT COUNT(*) FROM email_exclusion_list").fetchone()[0]
303  if excl_count == -1:
304      pass  # table not present, handled above
305  elif excl_count == 0:
306      noise_emails = [
307          'contact@example.com', 'info@example.com', 'noreply@example.com',
308          'no-reply@example.com', 'test@example.com', 'admin@example.com',
309          'support@example.com', 'hello@example.com', 'webmaster@example.com',
310          'postmaster@example.com', 'abuse@example.com', 'hostmaster@example.com',
311          'enquiries@example.com', 'sales@example.com', 'marketing@example.com',
312          'privacy@example.com', 'legal@example.com',
313      ]
314      # Read migration 105 if available
315      m105_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
316                                'db', 'migrations', '105-email-exclusion-list.sql')
317      if os.path.exists(m105_path):
318          with open(m105_path) as f:
319              m105_sql = f.read()
320          for stmt in m105_sql.split(';'):
321              stmt = stmt.strip()
322              if stmt and not stmt.startswith('--') and 'INSERT' in stmt.upper():
323                  try:
324                      conn.execute(stmt)
325                  except Exception as e:
326                      if 'UNIQUE' not in str(e):
327                          print(f"  Warning: {e}")
328          conn.commit()
329      excl_count = conn.execute("SELECT COUNT(*) FROM email_exclusion_list").fetchone()[0]
330      print(f"  email_exclusion_list seeded: {excl_count} rows.")
331  else:
332      print(f"  email_exclusion_list already has {excl_count} rows.")
333  
334  print("\n=== Step 10: Mark all migrations as applied ===")
335  # IMPORTANT: This marks all migrations as applied so db-migrate.js won't re-run them.
336  # The assumption is that the Mar-7 backup schema + the status/column fixes above put
337  # the DB in the correct state. Any schema gaps will be caught by the test suite.
338  # NEVER run individual migrations via `sqlite3 < file.sql` after this — always use
339  # `node scripts/db-migrate.js --force` which wraps each migration in a transaction.
340  if 'migrations' not in tables:
341      conn.execute("""CREATE TABLE IF NOT EXISTS migrations (
342          id INTEGER PRIMARY KEY AUTOINCREMENT,
343          filename TEXT UNIQUE NOT NULL,
344          applied_at DATETIME DEFAULT CURRENT_TIMESTAMP
345      )""")
346      conn.commit()
347  
348  import glob
349  migrations_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'db', 'migrations')
350  all_sql_migrations = sorted([os.path.basename(f) for f in glob.glob(os.path.join(migrations_dir, '*.sql'))])
351  inserted = 0
352  for mig in all_sql_migrations:
353      try:
354          conn.execute("INSERT OR IGNORE INTO migrations (filename) VALUES (?)", (mig,))
355          if conn.total_changes > 0:
356              inserted += 1
357      except Exception as e:
358          print(f"  Warning: Could not mark {mig}: {e}")
359  conn.commit()
360  total_marked = conn.execute("SELECT COUNT(*) FROM migrations").fetchone()[0]
361  print(f"  Marked {total_marked} migrations as applied ({inserted} newly inserted).")
362  
363  print("\n=== Step 11: Final verification ===")
364  final_count = conn.execute("SELECT COUNT(*) FROM sites").fetchone()[0]
365  statuses_final = dict(conn.execute("SELECT status, COUNT(*) FROM sites GROUP BY status ORDER BY COUNT(*) DESC").fetchall())
366  gb_final = conn.execute("SELECT COUNT(*) FROM sites WHERE country_code = 'GB'").fetchone()[0]
367  print(f"  Final site count: {final_count:,}")
368  print(f"  Statuses: {dict(list(statuses_final.items())[:8])}")
369  print(f"  GB remaining: {gb_final}")
370  
371  # Run ANALYZE for better query plans
372  conn.execute("ANALYZE")
373  conn.commit()
374  
375  conn.execute("PRAGMA foreign_keys = ON")
376  conn.close()
377  
378  if final_count < 100_000:
379      print(f"\nFAIL: Final count {final_count} is too low!")
380      sys.exit(1)
381  else:
382      print(f"\n✅ Done! DB is ready with {final_count:,} sites.")
383      print("Next steps:")
384      print("  npm run cron:migrate  # seed cron_jobs table")
385      print("  # Optionally check for schema gaps: node scripts/db-migrate.js --force")
386      print("  # (uses transactions — safe to run even if some migrations were pre-marked)")
387      print("  systemctl --user start 333method-pipeline.service")
388      print("  systemctl --user start mmo-cron.timer")
389      print("  systemctl --user start 333method-orchestrator.timer")