cron_log.py
1 """DB-backed logging helper for cron scripts. 2 3 Usage: 4 from restai.cron_log import CronLogger 5 6 def main(): 7 log = CronLogger("sync") 8 try: 9 # ... do work ... 10 log.info("Synced 3 sources") 11 log.finish(items_processed=3) 12 except Exception as e: 13 log.error(f"Failed: {e}", details=traceback.format_exc()) 14 log.finish() 15 16 The constructor also installs a `logging.Handler` on the root logger so any 17 `logger.info()` / `logger.warning()` / `logger.error()` call from anywhere in 18 the codebase during the cron run is mirrored into the DB log entry. That 19 gives the admin the same view in `/admin/cron-logs` that they would have 20 gotten by tailing the console. 21 """ 22 from __future__ import annotations 23 24 import logging 25 import time 26 import traceback 27 28 29 class _CronLogHandler(logging.Handler): 30 """Forwards every log record into a CronLogger's message buffer.""" 31 32 def __init__(self, owner: "CronLogger"): 33 super().__init__(level=logging.INFO) 34 self._owner = owner 35 # Match the CronLogger's own prefix scheme so explicit cron.info() 36 # calls and captured log.info() calls render the same way. 37 self.setFormatter(logging.Formatter("%(name)s: %(message)s")) 38 39 def emit(self, record: logging.LogRecord) -> None: 40 try: 41 msg = self.format(record) 42 except Exception: 43 msg = record.getMessage() 44 if record.levelno >= logging.ERROR: 45 self._owner._capture("ERROR", msg, exc_info=record.exc_info) 46 elif record.levelno >= logging.WARNING: 47 self._owner._capture("WARNING", msg) 48 else: 49 self._owner._capture(None, msg) 50 51 52 class CronLogger: 53 """Collects log messages during a cron run and writes a single DB entry on finish.""" 54 55 def __init__(self, job: str): 56 self.job = job 57 self._start = time.time() 58 self._messages: list[str] = [] 59 self._details: str | None = None 60 self._has_error = False 61 self._has_warning = False 62 self._finished = False 63 64 # Mirror everything `logging` produces into our buffer so the cron 65 # log entry contains the same console output. We attach to the root 66 # logger so ALL named loggers propagate up here. 67 self._log_handler = _CronLogHandler(self) 68 # Filter out our own handler's output so a cron.info() that triggers 69 # nothing else doesn't get double-counted. 70 logging.getLogger().addHandler(self._log_handler) 71 72 def _capture(self, level: str | None, message: str, exc_info=None) -> None: 73 """Internal: record a message originating from the logging handler. 74 Distinct from the public info()/warning()/error() so we don't add 75 prefixes twice.""" 76 if level == "ERROR": 77 self._messages.append(f"ERROR: {message}") 78 self._has_error = True 79 if exc_info and not self._details: 80 try: 81 self._details = "".join(traceback.format_exception(*exc_info)) 82 except Exception: 83 pass 84 elif level == "WARNING": 85 self._messages.append(f"WARNING: {message}") 86 self._has_warning = True 87 else: 88 self._messages.append(message) 89 90 def info(self, message: str): 91 self._messages.append(message) 92 93 def warning(self, message: str): 94 self._messages.append(f"WARNING: {message}") 95 self._has_warning = True 96 97 def error(self, message: str, details: str = None): 98 self._messages.append(f"ERROR: {message}") 99 self._has_error = True 100 if details: 101 self._details = details 102 103 def finish(self, items_processed: int = 0): 104 """Write the log entry to the database.""" 105 if self._finished: 106 return 107 self._finished = True 108 109 # Detach the logging handler — otherwise re-running a cron in the 110 # same process (e.g. unit tests, the admin "Run Now" button if it 111 # ever runs in-process) leaks handlers. 112 try: 113 logging.getLogger().removeHandler(self._log_handler) 114 except Exception: 115 pass 116 117 duration_ms = int((time.time() - self._start) * 1000) 118 119 if self._has_error: 120 status = "error" 121 elif self._has_warning: 122 status = "warning" 123 else: 124 status = "success" 125 126 message = "\n".join(self._messages) if self._messages else "No output" 127 128 try: 129 from restai.database import DBWrapper 130 db = DBWrapper() 131 try: 132 db.create_cron_log( 133 job=self.job, 134 status=status, 135 message=message, 136 details=self._details, 137 items_processed=items_processed, 138 duration_ms=duration_ms, 139 ) 140 finally: 141 db.db.close() 142 except Exception: 143 # If DB write fails, at least print to stdout 144 import logging 145 logging.getLogger(__name__).warning( 146 "Failed to write cron log to DB for job '%s'", self.job 147 ) 148 149 def __del__(self): 150 """Safety net: if finish() was never called (script crashed), write an error entry.""" 151 if not self._finished: 152 self.error("Script exited without calling finish()") 153 self.finish()