/ crons / sync.py
sync.py
  1  #!/usr/bin/env python3
  2  """Knowledge Base Sync — cron-friendly script.
  3  
  4  Checks all projects with sync enabled, syncs sources whose interval has elapsed, then exits.
  5  
  6  Usage:
  7      uv run python crons/sync.py
  8  """
  9  
 10  import json
 11  import logging
 12  import sys
 13  from datetime import datetime, timezone
 14  
 15  logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
 16  logger = logging.getLogger("restai.sync")
 17  
 18  # Load config (reads .env)
 19  from restai import config
 20  from restai.settings import ensure_settings_table
 21  from restai.database import get_db_wrapper, engine as db_engine
 22  from restai.models.databasemodels import ProjectDatabase
 23  from restai.brain import Brain
 24  
 25  
 26  def main():
 27      from restai.cron_log import CronLogger
 28      cron = CronLogger("sync")
 29  
 30      ensure_settings_table(db_engine)
 31  
 32      brain = Brain(lightweight=True)
 33      db = get_db_wrapper()
 34      synced_count = 0
 35  
 36      try:
 37          projects = db.db.query(ProjectDatabase).all()
 38          synced_any = False
 39  
 40          for proj in projects:
 41              opts = json.loads(proj.options) if proj.options else {}
 42              if not opts.get("sync_enabled") or not opts.get("sync_sources"):
 43                  continue
 44  
 45              sources = opts["sync_sources"]
 46              now = datetime.now(timezone.utc)
 47  
 48              for i, source in enumerate(sources):
 49                  interval_minutes = source.get("sync_interval") or 60
 50                  last_sync = source.get("last_sync")
 51  
 52                  if last_sync:
 53                      try:
 54                          last = datetime.fromisoformat(last_sync)
 55                          if last.tzinfo is None:
 56                              last = last.replace(tzinfo=timezone.utc)
 57                          elapsed = (now - last).total_seconds() / 60
 58                          if elapsed < interval_minutes:
 59                              continue
 60                      except (ValueError, TypeError):
 61                          pass
 62  
 63                  # Load project with vector store
 64                  project = brain.find_project(proj.id, db)
 65                  if not project or project.props.type != "rag":
 66                      break
 67  
 68                  # Mark as syncing before starting
 69                  _update_last_sync(db, proj.id, i)
 70  
 71                  try:
 72                      from restai.sync import _sync_source
 73                      from restai.models.models import SyncSource
 74                      from restai.utils.crypto import _decrypt_sync_source
 75  
 76                      # Source dict comes straight out of the project's JSON
 77                      # options, where SYNC_SOURCE_SENSITIVE_KEYS values are
 78                      # stored encrypted (`$ENC$...`). Decrypt before building
 79                      # the SyncSource or auth against S3 / Confluence /
 80                      # SharePoint / Google Drive will fail.
 81                      src = SyncSource(**_decrypt_sync_source(source))
 82                      logger.info(f"Syncing source '{src.name}' for project {proj.name} (ID {proj.id})")
 83                      _sync_source(project, src, db, brain)
 84                      _update_last_sync(db, proj.id, i)
 85                      synced_any = True
 86                      synced_count += 1
 87                      cron.info(f"Synced '{src.name}' for {proj.name}")
 88                      try:
 89                          from restai.webhooks import emit_event_for_project_id
 90                          emit_event_for_project_id(proj.id, "sync_completed", {
 91                              "source": src.name, "status": "ok",
 92                          })
 93                      except Exception:
 94                          pass
 95                  except Exception as e:
 96                      logger.error(f"Failed to sync source '{source.get('name')}' for project {proj.id}: {e}")
 97                      cron.error(f"Failed '{source.get('name')}' for {proj.name}: {e}")
 98                      try:
 99                          from restai.webhooks import emit_event_for_project_id
100                          emit_event_for_project_id(proj.id, "sync_completed", {
101                              "source": source.get("name"), "status": "error", "error": str(e),
102                          })
103                      except Exception:
104                          pass
105  
106          if not synced_any:
107              logger.debug("No sources due for sync")
108  
109          cron.finish(items_processed=synced_count)
110      except Exception as e:
111          cron.error(f"Sync crashed: {e}", details=__import__("traceback").format_exc())
112          cron.finish()
113      finally:
114          db.db.close()
115  
116  
117  def _update_last_sync(db, project_id, source_index):
118      proj_db = db.db.query(ProjectDatabase).filter(ProjectDatabase.id == project_id).first()
119      if proj_db:
120          current_opts = json.loads(proj_db.options) if proj_db.options else {}
121          src_list = current_opts.get("sync_sources", [])
122          if source_index < len(src_list):
123              src_list[source_index]["last_sync"] = datetime.now(timezone.utc).isoformat()
124              current_opts["sync_sources"] = src_list
125              proj_db.options = json.dumps(current_opts)
126              db.db.commit()
127  
128  
129  if __name__ == "__main__":
130      main()