/ crons / memory_bank.py
memory_bank.py
  1  #!/usr/bin/env python3
  2  """Project Memory Bank Runner — cron-friendly script.
  3  
  4  For every agent project with `memory_bank_enabled=True`, summarizes any
  5  conversations that have new activity since the last summarization and are
  6  now idle, then runs the compression ladder so the bank stays within each
  7  project's `memory_bank_max_tokens` budget.
  8  
  9  Usage:
 10      uv run python crons/memory_bank.py
 11  """
 12  
 13  import logging
 14  import time
 15  import traceback
 16  
 17  logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
 18  logger = logging.getLogger("restai.memory_bank_cron")
 19  
 20  from restai import config  # noqa: F401  — side effect: env loaded
 21  from restai.settings import ensure_settings_table
 22  from restai.database import get_db_wrapper, engine as db_engine
 23  from restai.brain import Brain
 24  from restai.cron_log import CronLogger
 25  from restai import memory_bank
 26  
 27  
 28  # Hard cap on how many conversations one cron tick will summarize. With
 29  # the System LLM being remote (Ollama / OpenAI / etc.), each summary is
 30  # ~1-5s best case and up to ~120s worst case (Ollama's request timeout).
 31  # At 25 chats × worst case that's still under the runner's 600s job
 32  # timeout — meaning the cron always reports back instead of getting
 33  # silently killed and looking "stuck" to the admin. Backlog beyond this
 34  # rolls over to subsequent ticks.
 35  MAX_CHATS_PER_TICK = 25
 36  
 37  
 38  def _run():
 39      ensure_settings_table(db_engine)
 40  
 41      brain = Brain(lightweight=True)
 42      db = get_db_wrapper()
 43      cron = CronLogger("memory_bank")
 44  
 45      if brain.get_system_llm(db) is None:
 46          # No System LLM configured — nothing this cron can do. Log a single
 47          # warning so the admin sees why the bank stays empty, then exit.
 48          cron.warning("No System LLM configured; memory bank cron is a no-op until one is set.")
 49          cron.finish()
 50          db.db.close()
 51          return
 52  
 53      try:
 54          summarized = 0
 55          compressed_projects = 0
 56          # Per-tick global budget. Once exhausted we still keep iterating
 57          # projects to run their compression step (cheap, no LLM calls
 58          # unless a project is over budget) — only the summarizer loop is
 59          # gated. Remaining chats roll into the next minute's run.
 60          budget_left = MAX_CHATS_PER_TICK
 61          deferred_total = 0
 62  
 63          import json
 64          for proj in memory_bank.list_enabled_projects(db):
 65              try:
 66                  opts = json.loads(proj.options) if proj.options else {}
 67              except Exception:
 68                  opts = {}
 69              max_tokens = int(opts.get("memory_bank_max_tokens") or 2000)
 70  
 71              chat_ids = memory_bank.chat_ids_needing_refresh(db, proj.id)
 72              if chat_ids:
 73                  # Cap to whatever budget remains for this tick. Anything over
 74                  # gets logged so the admin can see the backlog draining over
 75                  # subsequent ticks instead of wondering why nothing changes.
 76                  to_process = chat_ids[:budget_left]
 77                  deferred = len(chat_ids) - len(to_process)
 78                  if deferred > 0:
 79                      deferred_total += deferred
 80                      logger.info(
 81                          "memory_bank: project=%s — processing %d/%d chats this tick, %d deferred to next run",
 82                          proj.id, len(to_process), len(chat_ids), deferred,
 83                      )
 84                  else:
 85                      logger.info(
 86                          "memory_bank: project=%s — processing %d chat(s) this tick",
 87                          proj.id, len(to_process),
 88                      )
 89                  for idx, chat_id in enumerate(to_process, 1):
 90                      t0 = time.monotonic()
 91                      try:
 92                          written = memory_bank.summarize_conversation(brain, db, proj.id, chat_id)
 93                          if written is not None:
 94                              summarized += 1
 95                              elapsed = time.monotonic() - t0
 96                              logger.info(
 97                                  "memory_bank: project=%s chat=%s summarized in %.1fs (%d/%d)",
 98                                  proj.id, chat_id[:12], elapsed, idx, len(to_process),
 99                              )
100                      except Exception as e:
101                          logger.warning(
102                              "memory_bank: summarize_conversation failed (project=%s chat=%s): %s",
103                              proj.id, chat_id, e,
104                          )
105                          db.db.rollback()
106                      budget_left -= 1
107                      if budget_left <= 0:
108                          deferred_total += len(chat_ids) - idx
109                          break
110  
111              try:
112                  memory_bank.compress_entries(brain, db, proj.id, max_tokens)
113                  compressed_projects += 1
114              except Exception as e:
115                  logger.warning(
116                      "memory_bank: compress_entries failed (project=%s): %s", proj.id, e,
117                  )
118                  db.db.rollback()
119  
120              if budget_left <= 0:
121                  # Don't summarize more chats this tick, but we already
122                  # ran compression for this project. Keep iterating
123                  # remaining projects to record their deferred totals
124                  # and run their compression (which is cheap).
125                  continue
126  
127          msg = f"Summarized {summarized} conversation(s); compressed {compressed_projects} project(s)."
128          if deferred_total > 0:
129              msg += f" {deferred_total} chat(s) deferred to next tick."
130          cron.info(msg)
131          cron.finish(items_processed=summarized)
132      except Exception as e:
133          cron.error(f"Memory bank runner crashed: {e}", details=traceback.format_exc())
134          cron.finish()
135      finally:
136          db.db.close()
137  
138  
139  def main():
140      _run()
141  
142  
143  if __name__ == "__main__":
144      main()