post-restore-migrate.py
1 #!/usr/bin/env python3 2 """ 3 Post-restore DB migration script. 4 5 Run this ONCE after copying a fresh sites.db from the Mar-7-2026 Syncthing backup. 6 It applies all data migrations that were done in sessions since the backup date: 7 - Status renames: scored→prog_scored, rescored→semantic_scored, ignore→ignored 8 - Country code: GB→UK 9 - Seeds: countries, cron_jobs, pipeline_control, settings, email_exclusion_list 10 11 Usage: 12 cd /home/jason/code/333Method 13 python3 scripts/post-restore-migrate.py [path/to/sites.db] 14 15 Safety checks: 16 - Aborts if sites count < 100,000 (empty/wrong DB) 17 - Aborts if any migration already applied (idempotent marker check) 18 - Creates a checkpoint WAL before starting 19 """ 20 21 import sqlite3 22 import sys 23 import os 24 import re 25 26 DB_PATH = sys.argv[1] if len(sys.argv) > 1 else "db/sites.db" 27 28 if not os.path.exists(DB_PATH): 29 print(f"ERROR: DB not found at {DB_PATH}") 30 sys.exit(1) 31 32 db_size_mb = os.path.getsize(DB_PATH) / (1024 * 1024) 33 print(f"DB path: {DB_PATH} ({db_size_mb:.0f} MB)") 34 35 conn = sqlite3.connect(DB_PATH, isolation_level=None) # autocommit mode 36 conn.execute("PRAGMA journal_mode = WAL") 37 conn.execute("PRAGMA busy_timeout = 120000") 38 conn.execute("PRAGMA foreign_keys = OFF") 39 40 # Safety check: DB must have real data 41 site_count = conn.execute("SELECT COUNT(*) FROM sites").fetchone()[0] 42 print(f"Sites count: {site_count:,}") 43 if site_count < 100_000: 44 print(f"ERROR: Only {site_count} sites — this looks like an empty or wrong DB. Aborting.") 45 conn.close() 46 sys.exit(1) 47 48 # Check idempotency: if migrations table already has 097, we already ran this 49 tables = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()} 50 if 'migrations' in tables: 51 already = {r[0] for r in conn.execute("SELECT filename FROM migrations").fetchall()} 52 if '097-rename-site-statuses.sql' in already: 53 print("Migration 097 already applied — this script has already run. Exiting.") 54 conn.close() 55 sys.exit(0) 56 57 # Force WAL checkpoint to ensure clean state 58 conn.execute("PRAGMA wal_checkpoint(FULL)") 59 conn.commit() 60 61 print("\n=== Step 1: Drop views that reference 'sites' (re-created at end) ===") 62 views_to_recreate = [] 63 for (view_name, view_sql) in conn.execute("SELECT name, sql FROM sqlite_master WHERE type='view'").fetchall(): 64 print(f" Dropping view: {view_name}") 65 views_to_recreate.append((view_name, view_sql)) 66 conn.execute(f"DROP VIEW IF EXISTS {view_name}") 67 conn.commit() 68 print(f" Dropped {len(views_to_recreate)} views.") 69 70 print("\n=== Step 2: Status renames in sites table ===") 71 # Check what statuses currently exist 72 statuses_before = dict(conn.execute("SELECT status, COUNT(*) FROM sites GROUP BY status ORDER BY COUNT(*) DESC").fetchall()) 73 print(f" Before: {dict(list(statuses_before.items())[:8])}") 74 75 old_statuses_present = any(k in statuses_before for k in ('scored', 'rescored', 'ignore')) 76 77 if old_statuses_present: 78 print(" Old status values present — need table recreation to update CHECK constraint") 79 80 # Get columns from the CURRENT (old) table — preserve ALL of them 81 old_cols = [r[1] for r in conn.execute("PRAGMA table_info(sites)").fetchall()] 82 83 # Get the old DDL and patch it to allow new status values 84 # We use the CURRENT table's DDL (not schema.sql) to preserve all columns 85 old_ddl_row = conn.execute( 86 "SELECT sql FROM sqlite_master WHERE type='table' AND name='sites'" 87 ).fetchone() 88 if not old_ddl_row: 89 print(" ERROR: Could not get DDL for sites table") 90 conn.close() 91 sys.exit(1) 92 93 old_ddl = old_ddl_row[0] 94 # Patch the CHECK constraint on status to include the new values 95 # Old: status IN ('found', 'assets_captured', ..., 'ignore', ...) 96 # New: include prog_scored, semantic_scored, ignored + keep old ones for safe transition 97 new_create_sql = re.sub(r'CREATE TABLE "?sites"?(?!_)', 'CREATE TABLE sites_new', old_ddl, count=1) 98 # Patch the bare 'status' column CHECK (not conversation_status, http_status_code, ssl_status etc.) 99 # Match: start-of-line whitespace + "status TEXT" (not preceded by other word chars) 100 new_create_sql = re.sub( 101 r"(\n\s+)status TEXT[^\n]*(?:\n[^\n]*)*?CHECK\s*\(status IN \([^;]+?\)\)", 102 r"\1status TEXT DEFAULT 'found' CHECK(status IN (" 103 "'found', 'assets_captured', 'prog_scored', 'semantic_scored', 'vision_scored', " 104 "'enriched_regex', 'enriched_llm', 'enriched', " 105 "'proposals_drafted', 'outreach_partial', 'outreach_sent', " 106 "'ignored', 'failing', 'high_score', 'dead_letter', " 107 "'scored', 'rescored', 'ignore'))", 108 new_create_sql, 109 count=1 110 ) 111 112 common_cols = old_cols # use ALL old columns 113 print(f" Preserving all {len(common_cols)} columns from current table") 114 115 # Recreate sites_new with patched schema (all columns preserved) 116 conn.execute("DROP TABLE IF EXISTS sites_new") 117 conn.commit() 118 conn.execute(new_create_sql) 119 conn.commit() 120 121 # Build INSERT with renames for common columns 122 select_parts = [] 123 for c in common_cols: 124 if c == 'status': 125 select_parts.append("""CASE status 126 WHEN 'scored' THEN 'prog_scored' 127 WHEN 'rescored' THEN 'semantic_scored' 128 WHEN 'ignore' THEN 'ignored' 129 ELSE status 130 END""") 131 elif c == 'country_code': 132 select_parts.append("CASE country_code WHEN 'GB' THEN 'UK' ELSE country_code END") 133 else: 134 select_parts.append(f'"{c}"') 135 136 col_list = ', '.join(f'"{c}"' for c in common_cols) 137 sel_list = ', '.join(select_parts) 138 139 print(f" Copying {site_count:,} rows to sites_new...") 140 conn.execute(f"INSERT INTO sites_new ({col_list}) SELECT {sel_list} FROM sites") 141 after = conn.execute("SELECT COUNT(*) FROM sites_new").fetchone()[0] 142 print(f" Copied: {after:,}") 143 144 if after != site_count: 145 print(f" ABORT: count mismatch {site_count} vs {after}") 146 conn.execute("DROP TABLE IF EXISTS sites_new") 147 conn.commit() 148 conn.close() 149 sys.exit(1) 150 151 conn.execute("DROP TABLE sites") 152 conn.execute("ALTER TABLE sites_new RENAME TO sites") 153 conn.commit() 154 print(" Sites table recreation complete.") 155 else: 156 print(" No old status values found — checking GB→UK only") 157 gb_count = conn.execute("SELECT COUNT(*) FROM sites WHERE country_code = 'GB'").fetchone()[0] 158 if gb_count > 0: 159 print(f" Renaming {gb_count} GB→UK records...") 160 conn.execute("UPDATE sites SET country_code = 'UK' WHERE country_code = 'GB'") 161 conn.commit() 162 else: 163 print(" No GB records found either — data already clean.") 164 165 statuses_after = dict(conn.execute("SELECT status, COUNT(*) FROM sites GROUP BY status ORDER BY COUNT(*) DESC").fetchall()) 166 print(f" After: {dict(list(statuses_after.items())[:8])}") 167 gb_remaining = conn.execute("SELECT COUNT(*) FROM sites WHERE country_code = 'GB'").fetchone()[0] 168 print(f" GB remaining: {gb_remaining}") 169 170 print("\n=== Step 3: Status renames in site_status table ===") 171 ss_count = conn.execute("SELECT COUNT(*) FROM site_status").fetchone()[0] 172 print(f" site_status rows: {ss_count:,}") 173 if ss_count > 0: 174 old_in_ss = conn.execute("SELECT COUNT(*) FROM site_status WHERE status IN ('scored','rescored','ignore')").fetchone()[0] 175 if old_in_ss > 0: 176 print(f" {old_in_ss} old values — recreating site_status with patched CHECK constraint...") 177 ss_cols = [r[1] for r in conn.execute("PRAGMA table_info(site_status)").fetchall()] 178 179 # Use current table's DDL (not schema.sql) to preserve all columns 180 old_ss_ddl_row = conn.execute( 181 "SELECT sql FROM sqlite_master WHERE type='table' AND name='site_status'" 182 ).fetchone() 183 if not old_ss_ddl_row: 184 print(" ERROR: Could not get DDL for site_status table") 185 conn.close() 186 sys.exit(1) 187 old_ss_ddl = old_ss_ddl_row[0] 188 new_ss_sql = re.sub(r'CREATE TABLE "?site_status"?(?!_)', 'CREATE TABLE site_status_new', old_ss_ddl, count=1) 189 new_ss_sql = re.sub( 190 r"(\n\s+)status TEXT[^\n]*(?:\n[^\n]*)*?CHECK\s*\(status IN \([^;]+?\)\)", 191 r"\1status TEXT NOT NULL CHECK(status IN (" 192 "'found', 'assets_captured', 'prog_scored', 'semantic_scored', 'vision_scored', " 193 "'enriched_regex', 'enriched_llm', 'enriched', " 194 "'proposals_drafted', 'outreach_partial', 'outreach_sent', " 195 "'ignored', 'failing', 'high_score', 'dead_letter', " 196 "'scored', 'rescored', 'ignore'))", 197 new_ss_sql, 198 count=1 199 ) 200 201 common_ss_cols = ss_cols # preserve ALL old columns 202 print(f" Preserving all {len(common_ss_cols)} columns from current site_status table") 203 204 conn.execute("DROP TABLE IF EXISTS site_status_new") 205 conn.execute(new_ss_sql) 206 ss_sel_parts = [] 207 for c in common_ss_cols: 208 if c == 'status': 209 ss_sel_parts.append("""CASE status 210 WHEN 'scored' THEN 'prog_scored' 211 WHEN 'rescored' THEN 'semantic_scored' 212 WHEN 'ignore' THEN 'ignored' 213 ELSE status 214 END""") 215 else: 216 ss_sel_parts.append(f'"{c}"') 217 col_list_ss = ', '.join(f'"{c}"' for c in common_ss_cols) 218 sel_list_ss = ', '.join(ss_sel_parts) 219 conn.execute(f"INSERT INTO site_status_new ({col_list_ss}) SELECT {sel_list_ss} FROM site_status") 220 ss_after = conn.execute("SELECT COUNT(*) FROM site_status_new").fetchone()[0] 221 print(f" Copied: {ss_after:,}") 222 conn.execute("DROP TABLE site_status") 223 conn.execute("ALTER TABLE site_status_new RENAME TO site_status") 224 conn.commit() 225 print(" site_status recreation complete.") 226 else: 227 print(" site_status already clean.") 228 229 print("\n=== Step 4: Recreate views ===") 230 for (vname, vsql) in views_to_recreate: 231 try: 232 conn.execute(vsql) 233 print(f" Recreated: {vname}") 234 except Exception as e: 235 print(f" WARNING: Could not recreate view {vname}: {e}") 236 conn.commit() 237 238 print("\n=== Step 5: Recreate indexes ===") 239 indexes = [ 240 "CREATE INDEX IF NOT EXISTS idx_sites_keyword ON sites(keyword)", 241 "CREATE INDEX IF NOT EXISTS idx_sites_status ON sites(status)", 242 "CREATE INDEX IF NOT EXISTS idx_sites_score ON sites(score)", 243 "CREATE INDEX IF NOT EXISTS idx_sites_last_outreach ON sites(last_outreach_at)", 244 "CREATE INDEX IF NOT EXISTS idx_sites_country ON sites(country_code)", 245 "CREATE INDEX IF NOT EXISTS idx_sites_domain ON sites(domain)", 246 "CREATE INDEX IF NOT EXISTS idx_sites_created_at ON sites(created_at)", 247 ] 248 for idx in indexes: 249 conn.execute(idx) 250 conn.commit() 251 print(f" Created {len(indexes)} indexes.") 252 253 print("\n=== Step 6: Seed countries table ===") 254 country_count = conn.execute("SELECT COUNT(*) FROM countries").fetchone()[0] 255 if country_count >= 25: 256 print(f" Countries already seeded ({country_count} rows). Skipping.") 257 else: 258 print(f" Only {country_count} countries — seeding...") 259 # Read and execute migration 022 260 migration_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 261 'db', 'migrations', '022-create-countries-table.sql') 262 with open(migration_path) as f: 263 country_sql = f.read() 264 # Strip the DROP TABLE IF EXISTS to avoid losing data 265 country_sql_safe = country_sql.replace('DROP TABLE IF EXISTS countries;', '-- DROP TABLE IF EXISTS countries (skipped);') 266 # Execute each statement 267 for stmt in country_sql_safe.split(';'): 268 stmt = stmt.strip() 269 if stmt and not stmt.startswith('--') and not stmt.startswith('SELECT'): 270 try: 271 conn.execute(stmt) 272 except Exception as e: 273 if 'already exists' not in str(e) and 'duplicate' not in str(e).lower(): 274 print(f" Warning: {e} for stmt: {stmt[:80]}") 275 conn.commit() 276 country_count = conn.execute("SELECT COUNT(*) FROM countries").fetchone()[0] 277 print(f" Countries seeded: {country_count}") 278 279 print("\n=== Step 7: Seed pipeline_control ===") 280 pc_count = conn.execute("SELECT COUNT(*) FROM pipeline_control").fetchone()[0] 281 if pc_count == 0: 282 conn.execute("INSERT OR IGNORE INTO pipeline_control (id, paused) VALUES (1, 0)") 283 conn.commit() 284 print(" pipeline_control seeded.") 285 else: 286 print(f" pipeline_control already has {pc_count} rows.") 287 288 print("\n=== Step 8: Seed settings ===") 289 cbc = conn.execute("SELECT value FROM settings WHERE key='cron_circuit_breaker_enabled'").fetchone() 290 if not cbc: 291 conn.execute("INSERT OR IGNORE INTO settings (key, value) VALUES ('cron_circuit_breaker_enabled', 'true')") 292 conn.commit() 293 print(" settings: cron_circuit_breaker_enabled seeded.") 294 else: 295 print(f" settings: cron_circuit_breaker_enabled = {cbc[0]}") 296 297 print("\n=== Step 9: Seed email_exclusion_list ===") 298 if 'email_exclusion_list' not in tables: 299 print(" Table does not exist — will be created by db-migrate (cron:migrate). Skipping.") 300 excl_count = -1 301 else: 302 excl_count = conn.execute("SELECT COUNT(*) FROM email_exclusion_list").fetchone()[0] 303 if excl_count == -1: 304 pass # table not present, handled above 305 elif excl_count == 0: 306 noise_emails = [ 307 'contact@example.com', 'info@example.com', 'noreply@example.com', 308 'no-reply@example.com', 'test@example.com', 'admin@example.com', 309 'support@example.com', 'hello@example.com', 'webmaster@example.com', 310 'postmaster@example.com', 'abuse@example.com', 'hostmaster@example.com', 311 'enquiries@example.com', 'sales@example.com', 'marketing@example.com', 312 'privacy@example.com', 'legal@example.com', 313 ] 314 # Read migration 105 if available 315 m105_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 316 'db', 'migrations', '105-email-exclusion-list.sql') 317 if os.path.exists(m105_path): 318 with open(m105_path) as f: 319 m105_sql = f.read() 320 for stmt in m105_sql.split(';'): 321 stmt = stmt.strip() 322 if stmt and not stmt.startswith('--') and 'INSERT' in stmt.upper(): 323 try: 324 conn.execute(stmt) 325 except Exception as e: 326 if 'UNIQUE' not in str(e): 327 print(f" Warning: {e}") 328 conn.commit() 329 excl_count = conn.execute("SELECT COUNT(*) FROM email_exclusion_list").fetchone()[0] 330 print(f" email_exclusion_list seeded: {excl_count} rows.") 331 else: 332 print(f" email_exclusion_list already has {excl_count} rows.") 333 334 print("\n=== Step 10: Mark all migrations as applied ===") 335 # IMPORTANT: This marks all migrations as applied so db-migrate.js won't re-run them. 336 # The assumption is that the Mar-7 backup schema + the status/column fixes above put 337 # the DB in the correct state. Any schema gaps will be caught by the test suite. 338 # NEVER run individual migrations via `sqlite3 < file.sql` after this — always use 339 # `node scripts/db-migrate.js --force` which wraps each migration in a transaction. 340 if 'migrations' not in tables: 341 conn.execute("""CREATE TABLE IF NOT EXISTS migrations ( 342 id INTEGER PRIMARY KEY AUTOINCREMENT, 343 filename TEXT UNIQUE NOT NULL, 344 applied_at DATETIME DEFAULT CURRENT_TIMESTAMP 345 )""") 346 conn.commit() 347 348 import glob 349 migrations_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'db', 'migrations') 350 all_sql_migrations = sorted([os.path.basename(f) for f in glob.glob(os.path.join(migrations_dir, '*.sql'))]) 351 inserted = 0 352 for mig in all_sql_migrations: 353 try: 354 conn.execute("INSERT OR IGNORE INTO migrations (filename) VALUES (?)", (mig,)) 355 if conn.total_changes > 0: 356 inserted += 1 357 except Exception as e: 358 print(f" Warning: Could not mark {mig}: {e}") 359 conn.commit() 360 total_marked = conn.execute("SELECT COUNT(*) FROM migrations").fetchone()[0] 361 print(f" Marked {total_marked} migrations as applied ({inserted} newly inserted).") 362 363 print("\n=== Step 11: Final verification ===") 364 final_count = conn.execute("SELECT COUNT(*) FROM sites").fetchone()[0] 365 statuses_final = dict(conn.execute("SELECT status, COUNT(*) FROM sites GROUP BY status ORDER BY COUNT(*) DESC").fetchall()) 366 gb_final = conn.execute("SELECT COUNT(*) FROM sites WHERE country_code = 'GB'").fetchone()[0] 367 print(f" Final site count: {final_count:,}") 368 print(f" Statuses: {dict(list(statuses_final.items())[:8])}") 369 print(f" GB remaining: {gb_final}") 370 371 # Run ANALYZE for better query plans 372 conn.execute("ANALYZE") 373 conn.commit() 374 375 conn.execute("PRAGMA foreign_keys = ON") 376 conn.close() 377 378 if final_count < 100_000: 379 print(f"\nFAIL: Final count {final_count} is too low!") 380 sys.exit(1) 381 else: 382 print(f"\n✅ Done! DB is ready with {final_count:,} sites.") 383 print("Next steps:") 384 print(" npm run cron:migrate # seed cron_jobs table") 385 print(" # Optionally check for schema gaps: node scripts/db-migrate.js --force") 386 print(" # (uses transactions — safe to run even if some migrations were pre-marked)") 387 print(" systemctl --user start 333method-pipeline.service") 388 print(" systemctl --user start mmo-cron.timer") 389 print(" systemctl --user start 333method-orchestrator.timer")