sync.py
1 #!/usr/bin/env python3 2 """Knowledge Base Sync — cron-friendly script. 3 4 Checks all projects with sync enabled, syncs sources whose interval has elapsed, then exits. 5 6 Usage: 7 uv run python crons/sync.py 8 """ 9 10 import json 11 import logging 12 import sys 13 from datetime import datetime, timezone 14 15 logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") 16 logger = logging.getLogger("restai.sync") 17 18 # Load config (reads .env) 19 from restai import config 20 from restai.settings import ensure_settings_table 21 from restai.database import get_db_wrapper, engine as db_engine 22 from restai.models.databasemodels import ProjectDatabase 23 from restai.brain import Brain 24 25 26 def main(): 27 from restai.cron_log import CronLogger 28 cron = CronLogger("sync") 29 30 ensure_settings_table(db_engine) 31 32 brain = Brain(lightweight=True) 33 db = get_db_wrapper() 34 synced_count = 0 35 36 try: 37 projects = db.db.query(ProjectDatabase).all() 38 synced_any = False 39 40 for proj in projects: 41 opts = json.loads(proj.options) if proj.options else {} 42 if not opts.get("sync_enabled") or not opts.get("sync_sources"): 43 continue 44 45 sources = opts["sync_sources"] 46 now = datetime.now(timezone.utc) 47 48 for i, source in enumerate(sources): 49 interval_minutes = source.get("sync_interval") or 60 50 last_sync = source.get("last_sync") 51 52 if last_sync: 53 try: 54 last = datetime.fromisoformat(last_sync) 55 if last.tzinfo is None: 56 last = last.replace(tzinfo=timezone.utc) 57 elapsed = (now - last).total_seconds() / 60 58 if elapsed < interval_minutes: 59 continue 60 except (ValueError, TypeError): 61 pass 62 63 # Load project with vector store 64 project = brain.find_project(proj.id, db) 65 if not project or project.props.type != "rag": 66 break 67 68 # Mark as syncing before starting 69 _update_last_sync(db, proj.id, i) 70 71 try: 72 from restai.sync import _sync_source 73 from restai.models.models import SyncSource 74 from restai.utils.crypto import _decrypt_sync_source 75 76 # Source dict comes straight out of the project's JSON 77 # options, where SYNC_SOURCE_SENSITIVE_KEYS values are 78 # stored encrypted (`$ENC$...`). Decrypt before building 79 # the SyncSource or auth against S3 / Confluence / 80 # SharePoint / Google Drive will fail. 81 src = SyncSource(**_decrypt_sync_source(source)) 82 logger.info(f"Syncing source '{src.name}' for project {proj.name} (ID {proj.id})") 83 _sync_source(project, src, db, brain) 84 _update_last_sync(db, proj.id, i) 85 synced_any = True 86 synced_count += 1 87 cron.info(f"Synced '{src.name}' for {proj.name}") 88 try: 89 from restai.webhooks import emit_event_for_project_id 90 emit_event_for_project_id(proj.id, "sync_completed", { 91 "source": src.name, "status": "ok", 92 }) 93 except Exception: 94 pass 95 except Exception as e: 96 logger.error(f"Failed to sync source '{source.get('name')}' for project {proj.id}: {e}") 97 cron.error(f"Failed '{source.get('name')}' for {proj.name}: {e}") 98 try: 99 from restai.webhooks import emit_event_for_project_id 100 emit_event_for_project_id(proj.id, "sync_completed", { 101 "source": source.get("name"), "status": "error", "error": str(e), 102 }) 103 except Exception: 104 pass 105 106 if not synced_any: 107 logger.debug("No sources due for sync") 108 109 cron.finish(items_processed=synced_count) 110 except Exception as e: 111 cron.error(f"Sync crashed: {e}", details=__import__("traceback").format_exc()) 112 cron.finish() 113 finally: 114 db.db.close() 115 116 117 def _update_last_sync(db, project_id, source_index): 118 proj_db = db.db.query(ProjectDatabase).filter(ProjectDatabase.id == project_id).first() 119 if proj_db: 120 current_opts = json.loads(proj_db.options) if proj_db.options else {} 121 src_list = current_opts.get("sync_sources", []) 122 if source_index < len(src_list): 123 src_list[source_index]["last_sync"] = datetime.now(timezone.utc).isoformat() 124 current_opts["sync_sources"] = src_list 125 proj_db.options = json.dumps(current_opts) 126 db.db.commit() 127 128 129 if __name__ == "__main__": 130 main()