memory_bank.py
1 #!/usr/bin/env python3 2 """Project Memory Bank Runner — cron-friendly script. 3 4 For every agent project with `memory_bank_enabled=True`, summarizes any 5 conversations that have new activity since the last summarization and are 6 now idle, then runs the compression ladder so the bank stays within each 7 project's `memory_bank_max_tokens` budget. 8 9 Usage: 10 uv run python crons/memory_bank.py 11 """ 12 13 import logging 14 import time 15 import traceback 16 17 logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") 18 logger = logging.getLogger("restai.memory_bank_cron") 19 20 from restai import config # noqa: F401 — side effect: env loaded 21 from restai.settings import ensure_settings_table 22 from restai.database import get_db_wrapper, engine as db_engine 23 from restai.brain import Brain 24 from restai.cron_log import CronLogger 25 from restai import memory_bank 26 27 28 # Hard cap on how many conversations one cron tick will summarize. With 29 # the System LLM being remote (Ollama / OpenAI / etc.), each summary is 30 # ~1-5s best case and up to ~120s worst case (Ollama's request timeout). 31 # At 25 chats × worst case that's still under the runner's 600s job 32 # timeout — meaning the cron always reports back instead of getting 33 # silently killed and looking "stuck" to the admin. Backlog beyond this 34 # rolls over to subsequent ticks. 35 MAX_CHATS_PER_TICK = 25 36 37 38 def _run(): 39 ensure_settings_table(db_engine) 40 41 brain = Brain(lightweight=True) 42 db = get_db_wrapper() 43 cron = CronLogger("memory_bank") 44 45 if brain.get_system_llm(db) is None: 46 # No System LLM configured — nothing this cron can do. Log a single 47 # warning so the admin sees why the bank stays empty, then exit. 48 cron.warning("No System LLM configured; memory bank cron is a no-op until one is set.") 49 cron.finish() 50 db.db.close() 51 return 52 53 try: 54 summarized = 0 55 compressed_projects = 0 56 # Per-tick global budget. Once exhausted we still keep iterating 57 # projects to run their compression step (cheap, no LLM calls 58 # unless a project is over budget) — only the summarizer loop is 59 # gated. Remaining chats roll into the next minute's run. 60 budget_left = MAX_CHATS_PER_TICK 61 deferred_total = 0 62 63 import json 64 for proj in memory_bank.list_enabled_projects(db): 65 try: 66 opts = json.loads(proj.options) if proj.options else {} 67 except Exception: 68 opts = {} 69 max_tokens = int(opts.get("memory_bank_max_tokens") or 2000) 70 71 chat_ids = memory_bank.chat_ids_needing_refresh(db, proj.id) 72 if chat_ids: 73 # Cap to whatever budget remains for this tick. Anything over 74 # gets logged so the admin can see the backlog draining over 75 # subsequent ticks instead of wondering why nothing changes. 76 to_process = chat_ids[:budget_left] 77 deferred = len(chat_ids) - len(to_process) 78 if deferred > 0: 79 deferred_total += deferred 80 logger.info( 81 "memory_bank: project=%s — processing %d/%d chats this tick, %d deferred to next run", 82 proj.id, len(to_process), len(chat_ids), deferred, 83 ) 84 else: 85 logger.info( 86 "memory_bank: project=%s — processing %d chat(s) this tick", 87 proj.id, len(to_process), 88 ) 89 for idx, chat_id in enumerate(to_process, 1): 90 t0 = time.monotonic() 91 try: 92 written = memory_bank.summarize_conversation(brain, db, proj.id, chat_id) 93 if written is not None: 94 summarized += 1 95 elapsed = time.monotonic() - t0 96 logger.info( 97 "memory_bank: project=%s chat=%s summarized in %.1fs (%d/%d)", 98 proj.id, chat_id[:12], elapsed, idx, len(to_process), 99 ) 100 except Exception as e: 101 logger.warning( 102 "memory_bank: summarize_conversation failed (project=%s chat=%s): %s", 103 proj.id, chat_id, e, 104 ) 105 db.db.rollback() 106 budget_left -= 1 107 if budget_left <= 0: 108 deferred_total += len(chat_ids) - idx 109 break 110 111 try: 112 memory_bank.compress_entries(brain, db, proj.id, max_tokens) 113 compressed_projects += 1 114 except Exception as e: 115 logger.warning( 116 "memory_bank: compress_entries failed (project=%s): %s", proj.id, e, 117 ) 118 db.db.rollback() 119 120 if budget_left <= 0: 121 # Don't summarize more chats this tick, but we already 122 # ran compression for this project. Keep iterating 123 # remaining projects to record their deferred totals 124 # and run their compression (which is cheap). 125 continue 126 127 msg = f"Summarized {summarized} conversation(s); compressed {compressed_projects} project(s)." 128 if deferred_total > 0: 129 msg += f" {deferred_total} chat(s) deferred to next tick." 130 cron.info(msg) 131 cron.finish(items_processed=summarized) 132 except Exception as e: 133 cron.error(f"Memory bank runner crashed: {e}", details=traceback.format_exc()) 134 cron.finish() 135 finally: 136 db.db.close() 137 138 139 def main(): 140 _run() 141 142 143 if __name__ == "__main__": 144 main()