scheduler.py
1 """ 2 Cron job scheduler - executes due jobs. 3 4 Provides tick() which checks for due jobs and runs them. The gateway 5 calls this every 60 seconds from a background thread. 6 7 Uses a file-based lock (~/.hermes/cron/.tick.lock) so only one tick 8 runs at a time if multiple processes overlap. 9 """ 10 11 import asyncio 12 import concurrent.futures 13 import contextvars 14 import json 15 import logging 16 import os 17 import subprocess 18 import sys 19 20 # fcntl is Unix-only; on Windows use msvcrt for file locking 21 try: 22 import fcntl 23 except ImportError: 24 fcntl = None 25 try: 26 import msvcrt 27 except ImportError: 28 msvcrt = None 29 from pathlib import Path 30 from typing import List, Optional 31 32 # Add parent directory to path for imports BEFORE repo-level imports. 33 # Without this, standalone invocations (e.g. after `hermes update` reloads 34 # the module) fail with ModuleNotFoundError for hermes_time et al. 35 sys.path.insert(0, str(Path(__file__).parent.parent)) 36 37 from hermes_constants import get_hermes_home 38 from hermes_cli.config import load_config 39 from hermes_time import now as _hermes_now 40 41 logger = logging.getLogger(__name__) 42 43 44 def _resolve_cron_enabled_toolsets(job: dict, cfg: dict) -> list[str] | None: 45 """Resolve the toolset list for a cron job. 46 47 Precedence: 48 1. Per-job ``enabled_toolsets`` (set via ``cronjob`` tool on create/update). 49 Keeps the agent's job-scoped toolset override intact — #6130. 50 2. Per-platform ``hermes tools`` config for the ``cron`` platform. 51 Mirrors gateway behavior (``_get_platform_tools(cfg, platform_key)``) 52 so users can gate cron toolsets globally without recreating every job. 53 3. ``None`` on any lookup failure — AIAgent loads the full default set 54 (legacy behavior before this change, preserved as the safety net). 55 56 _DEFAULT_OFF_TOOLSETS ({moa, homeassistant, rl}) are removed by 57 ``_get_platform_tools`` for unconfigured platforms, so fresh installs 58 get cron WITHOUT ``moa`` by default (issue reported by Norbert — 59 surprise $4.63 run). 60 """ 61 per_job = job.get("enabled_toolsets") 62 if per_job: 63 return per_job 64 try: 65 from hermes_cli.tools_config import _get_platform_tools # lazy: avoid heavy import at cron module load 66 return sorted(_get_platform_tools(cfg or {}, "cron")) 67 except Exception as exc: 68 logger.warning( 69 "Cron toolset resolution failed, falling back to full default toolset: %s", 70 exc, 71 ) 72 return None 73 74 # Valid delivery platforms — used to validate user-supplied platform names 75 # in cron delivery targets, preventing env var enumeration via crafted names. 76 _KNOWN_DELIVERY_PLATFORMS = frozenset({ 77 "telegram", "discord", "slack", "whatsapp", "signal", 78 "matrix", "mattermost", "homeassistant", "dingtalk", "feishu", 79 "wecom", "wecom_callback", "weixin", "sms", "email", "webhook", "bluebubbles", 80 "qqbot", "yuanbao", 81 }) 82 83 # Platforms that support a configured cron/notification home target, mapped to 84 # the environment variable used by gateway setup/runtime config. 85 _HOME_TARGET_ENV_VARS = { 86 "matrix": "MATRIX_HOME_ROOM", 87 "telegram": "TELEGRAM_HOME_CHANNEL", 88 "discord": "DISCORD_HOME_CHANNEL", 89 "slack": "SLACK_HOME_CHANNEL", 90 "signal": "SIGNAL_HOME_CHANNEL", 91 "mattermost": "MATTERMOST_HOME_CHANNEL", 92 "sms": "SMS_HOME_CHANNEL", 93 "email": "EMAIL_HOME_ADDRESS", 94 "dingtalk": "DINGTALK_HOME_CHANNEL", 95 "feishu": "FEISHU_HOME_CHANNEL", 96 "wecom": "WECOM_HOME_CHANNEL", 97 "weixin": "WEIXIN_HOME_CHANNEL", 98 "bluebubbles": "BLUEBUBBLES_HOME_CHANNEL", 99 "qqbot": "QQBOT_HOME_CHANNEL", 100 } 101 102 # Legacy env var names kept for back-compat. Each entry is the current 103 # primary env var → the previous name. _get_home_target_chat_id falls 104 # back to the legacy name if the primary is unset, so users who set the 105 # old name before the rename keep working until they migrate. 106 _LEGACY_HOME_TARGET_ENV_VARS = { 107 "QQBOT_HOME_CHANNEL": "QQ_HOME_CHANNEL", 108 } 109 110 from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run 111 112 # Sentinel: when a cron agent has nothing new to report, it can start its 113 # response with this marker to suppress delivery. Output is still saved 114 # locally for audit. 115 SILENT_MARKER = "[SILENT]" 116 117 # Resolve Hermes home directory (respects HERMES_HOME override) 118 _hermes_home = get_hermes_home() 119 120 # File-based lock prevents concurrent ticks from gateway + daemon + systemd timer 121 _LOCK_DIR = _hermes_home / "cron" 122 _LOCK_FILE = _LOCK_DIR / ".tick.lock" 123 124 125 def _resolve_origin(job: dict) -> Optional[dict]: 126 """Extract origin info from a job, preserving any extra routing metadata. 127 128 Treats non-dict origins (free-form provenance strings, ints, lists from 129 migration scripts or hand-edited jobs.json) as missing instead of 130 crashing with ``AttributeError`` on ``origin.get(...)``. Without this 131 guard, a job tagged with e.g. ``"combined-digest-replaces-x-and-y"`` 132 crashed every fire attempt with 133 ``'str' object has no attribute 'get'`` — ``mark_job_run`` recorded the 134 failure, but the next tick re-loaded the same poisoned origin and 135 crashed identically until the field was patched manually (#18722). 136 """ 137 origin = job.get("origin") 138 if not isinstance(origin, dict): 139 return None 140 platform = origin.get("platform") 141 chat_id = origin.get("chat_id") 142 if platform and chat_id: 143 return origin 144 return None 145 146 147 def _get_home_target_chat_id(platform_name: str) -> str: 148 """Return the configured home target chat/room ID for a delivery platform.""" 149 env_var = _HOME_TARGET_ENV_VARS.get(platform_name.lower()) 150 if not env_var: 151 return "" 152 value = os.getenv(env_var, "") 153 if not value: 154 legacy = _LEGACY_HOME_TARGET_ENV_VARS.get(env_var) 155 if legacy: 156 value = os.getenv(legacy, "") 157 return value 158 159 160 def _get_home_target_thread_id(platform_name: str) -> Optional[str]: 161 """Return the optional thread/topic ID for a platform home target.""" 162 env_var = _HOME_TARGET_ENV_VARS.get(platform_name.lower()) 163 if not env_var: 164 return None 165 value = os.getenv(f"{env_var}_THREAD_ID", "").strip() 166 if not value: 167 legacy = _LEGACY_HOME_TARGET_ENV_VARS.get(env_var) 168 if legacy: 169 value = os.getenv(f"{legacy}_THREAD_ID", "").strip() 170 return value or None 171 172 173 def _resolve_single_delivery_target(job: dict, deliver_value: str) -> Optional[dict]: 174 """Resolve one concrete auto-delivery target for a cron job.""" 175 176 origin = _resolve_origin(job) 177 178 if deliver_value == "local": 179 return None 180 181 if deliver_value == "origin": 182 if origin: 183 return { 184 "platform": origin["platform"], 185 "chat_id": str(origin["chat_id"]), 186 "thread_id": origin.get("thread_id"), 187 } 188 # Origin missing (e.g. job created via API/script) — try each 189 # platform's home channel as a fallback instead of silently dropping. 190 for platform_name in _HOME_TARGET_ENV_VARS: 191 chat_id = _get_home_target_chat_id(platform_name) 192 if chat_id: 193 logger.info( 194 "Job '%s' has deliver=origin but no origin; falling back to %s home channel", 195 job.get("name", job.get("id", "?")), 196 platform_name, 197 ) 198 return { 199 "platform": platform_name, 200 "chat_id": chat_id, 201 "thread_id": _get_home_target_thread_id(platform_name), 202 } 203 return None 204 205 if ":" in deliver_value: 206 platform_name, rest = deliver_value.split(":", 1) 207 platform_key = platform_name.lower() 208 209 from tools.send_message_tool import _parse_target_ref 210 211 parsed_chat_id, parsed_thread_id, is_explicit = _parse_target_ref(platform_key, rest) 212 if is_explicit: 213 chat_id, thread_id = parsed_chat_id, parsed_thread_id 214 else: 215 chat_id, thread_id = rest, None 216 217 # Resolve human-friendly labels like "Alice (dm)" to real IDs. 218 try: 219 from gateway.channel_directory import resolve_channel_name 220 resolved = resolve_channel_name(platform_key, chat_id) 221 if resolved: 222 parsed_chat_id, parsed_thread_id, resolved_is_explicit = _parse_target_ref(platform_key, resolved) 223 if resolved_is_explicit: 224 chat_id = parsed_chat_id 225 if parsed_thread_id is not None: 226 thread_id = parsed_thread_id 227 else: 228 chat_id = resolved 229 except Exception: 230 pass 231 232 return { 233 "platform": platform_name, 234 "chat_id": chat_id, 235 "thread_id": thread_id, 236 } 237 238 platform_name = deliver_value 239 if origin and origin.get("platform") == platform_name: 240 return { 241 "platform": platform_name, 242 "chat_id": str(origin["chat_id"]), 243 "thread_id": origin.get("thread_id"), 244 } 245 246 if platform_name.lower() not in _KNOWN_DELIVERY_PLATFORMS: 247 return None 248 chat_id = _get_home_target_chat_id(platform_name) 249 if not chat_id: 250 return None 251 252 return { 253 "platform": platform_name, 254 "chat_id": chat_id, 255 "thread_id": _get_home_target_thread_id(platform_name), 256 } 257 258 259 def _normalize_deliver_value(deliver) -> str: 260 """Normalize a stored/submitted ``deliver`` value to its canonical string form. 261 262 The contract is that ``deliver`` is a string (``"local"``, ``"origin"``, 263 ``"telegram"``, ``"telegram:-1001:17"``, or comma-separated combinations). 264 Historically some callers — MCP clients passing an array, direct edits of 265 ``jobs.json``, or stale code paths — have stored a list/tuple like 266 ``["telegram"]``. ``str(["telegram"])`` would serialize to the literal 267 string ``"['telegram']"``, which is not a known platform and fails 268 resolution silently. Flatten lists/tuples into a comma-separated string 269 so both forms work. Returns ``"local"`` for anything falsy. 270 """ 271 if deliver is None or deliver == "": 272 return "local" 273 if isinstance(deliver, (list, tuple)): 274 parts = [str(p).strip() for p in deliver if str(p).strip()] 275 return ",".join(parts) if parts else "local" 276 return str(deliver) 277 278 279 def _resolve_delivery_targets(job: dict) -> List[dict]: 280 """Resolve all concrete auto-delivery targets for a cron job (supports comma-separated deliver).""" 281 deliver = _normalize_deliver_value(job.get("deliver", "local")) 282 if deliver == "local": 283 return [] 284 parts = [p.strip() for p in deliver.split(",") if p.strip()] 285 seen = set() 286 targets = [] 287 for part in parts: 288 target = _resolve_single_delivery_target(job, part) 289 if target: 290 key = (target["platform"].lower(), str(target["chat_id"]), target.get("thread_id")) 291 if key not in seen: 292 seen.add(key) 293 targets.append(target) 294 return targets 295 296 297 def _resolve_delivery_target(job: dict) -> Optional[dict]: 298 """Resolve the concrete auto-delivery target for a cron job, if any.""" 299 targets = _resolve_delivery_targets(job) 300 return targets[0] if targets else None 301 302 303 # Media extension sets — audio routing is centralized in gateway.platforms.base 304 # via should_send_media_as_audio() so Telegram-specific rules stay in one place. 305 _VIDEO_EXTS = frozenset({'.mp4', '.mov', '.avi', '.mkv', '.webm', '.3gp'}) 306 _IMAGE_EXTS = frozenset({'.jpg', '.jpeg', '.png', '.webp', '.gif'}) 307 308 309 def _send_media_via_adapter( 310 adapter, 311 chat_id: str, 312 media_files: list, 313 metadata: dict | None, 314 loop, 315 job: dict, 316 platform=None, 317 ) -> None: 318 """Send extracted MEDIA files as native platform attachments via a live adapter. 319 320 Routes each file to the appropriate adapter method (send_voice, send_image_file, 321 send_video, send_document) based on file extension — mirroring the routing logic 322 in ``BasePlatformAdapter._process_message_background``. 323 """ 324 from pathlib import Path 325 326 from gateway.platforms.base import should_send_media_as_audio 327 328 for media_path, _is_voice in media_files: 329 try: 330 ext = Path(media_path).suffix.lower() 331 route_platform = platform if platform is not None else getattr(adapter, "platform", None) 332 if should_send_media_as_audio(route_platform, ext, is_voice=_is_voice): 333 coro = adapter.send_voice(chat_id=chat_id, audio_path=media_path, metadata=metadata) 334 elif ext in _VIDEO_EXTS: 335 coro = adapter.send_video(chat_id=chat_id, video_path=media_path, metadata=metadata) 336 elif ext in _IMAGE_EXTS: 337 coro = adapter.send_image_file(chat_id=chat_id, image_path=media_path, metadata=metadata) 338 else: 339 coro = adapter.send_document(chat_id=chat_id, file_path=media_path, metadata=metadata) 340 341 future = asyncio.run_coroutine_threadsafe(coro, loop) 342 try: 343 result = future.result(timeout=30) 344 except TimeoutError: 345 future.cancel() 346 raise 347 if result and not getattr(result, "success", True): 348 logger.warning( 349 "Job '%s': media send failed for %s: %s", 350 job.get("id", "?"), media_path, getattr(result, "error", "unknown"), 351 ) 352 except Exception as e: 353 logger.warning("Job '%s': failed to send media %s: %s", job.get("id", "?"), media_path, e) 354 355 356 def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Optional[str]: 357 """ 358 Deliver job output to the configured target(s) (origin chat, specific platform, etc.). 359 360 When ``adapters`` and ``loop`` are provided (gateway is running), tries to 361 use the live adapter first — this supports E2EE rooms (e.g. Matrix) where 362 the standalone HTTP path cannot encrypt. Falls back to standalone send if 363 the adapter path fails or is unavailable. 364 365 Returns None on success, or an error string on failure. 366 """ 367 targets = _resolve_delivery_targets(job) 368 if not targets: 369 if job.get("deliver", "local") != "local": 370 msg = f"no delivery target resolved for deliver={job.get('deliver', 'local')}" 371 logger.warning("Job '%s': %s", job["id"], msg) 372 return msg 373 return None # local-only jobs don't deliver — not a failure 374 375 from tools.send_message_tool import _send_to_platform 376 from gateway.config import load_gateway_config, Platform 377 378 # Optionally wrap the content with a header/footer so the user knows this 379 # is a cron delivery. Wrapping is on by default; set cron.wrap_response: false 380 # in config.yaml for clean output. 381 wrap_response = True 382 try: 383 user_cfg = load_config() 384 wrap_response = user_cfg.get("cron", {}).get("wrap_response", True) 385 except Exception: 386 pass 387 388 if wrap_response: 389 task_name = job.get("name", job["id"]) 390 job_id = job.get("id", "") 391 delivery_content = ( 392 f"Cronjob Response: {task_name}\n" 393 f"(job_id: {job_id})\n" 394 f"-------------\n\n" 395 f"{content}\n\n" 396 f"To stop or manage this job, send me a new message (e.g. \"stop reminder {task_name}\")." 397 ) 398 else: 399 delivery_content = content 400 401 # Extract MEDIA: tags so attachments are forwarded as files, not raw text 402 from gateway.platforms.base import BasePlatformAdapter 403 media_files, cleaned_delivery_content = BasePlatformAdapter.extract_media(delivery_content) 404 405 try: 406 config = load_gateway_config() 407 except Exception as e: 408 msg = f"failed to load gateway config: {e}" 409 logger.error("Job '%s': %s", job["id"], msg) 410 return msg 411 412 delivery_errors = [] 413 414 for target in targets: 415 platform_name = target["platform"] 416 chat_id = target["chat_id"] 417 thread_id = target.get("thread_id") 418 419 # Diagnostic: log thread_id for topic-aware delivery debugging 420 origin = _resolve_origin(job) or {} 421 origin_thread = origin.get("thread_id") 422 if origin_thread and not thread_id: 423 logger.warning( 424 "Job '%s': origin has thread_id=%s but delivery target lost it " 425 "(deliver=%s, target=%s)", 426 job["id"], origin_thread, job.get("deliver", "local"), target, 427 ) 428 elif thread_id: 429 logger.debug( 430 "Job '%s': delivering to %s:%s thread_id=%s", 431 job["id"], platform_name, chat_id, thread_id, 432 ) 433 434 # Built-in names resolve to their enum member; plugin platform names 435 # create dynamic members via Platform._missing_(). 436 try: 437 platform = Platform(platform_name.lower()) 438 except (ValueError, KeyError): 439 msg = f"unknown platform '{platform_name}'" 440 logger.warning("Job '%s': %s", job["id"], msg) 441 delivery_errors.append(msg) 442 continue 443 444 pconfig = config.platforms.get(platform) 445 if not pconfig or not pconfig.enabled: 446 msg = f"platform '{platform_name}' not configured/enabled" 447 logger.warning("Job '%s': %s", job["id"], msg) 448 delivery_errors.append(msg) 449 continue 450 451 # Prefer the live adapter when the gateway is running — this supports E2EE 452 # rooms (e.g. Matrix) where the standalone HTTP path cannot encrypt. 453 runtime_adapter = (adapters or {}).get(platform) 454 delivered = False 455 if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)(): 456 send_metadata = {"thread_id": thread_id} if thread_id else None 457 try: 458 # Send cleaned text (MEDIA tags stripped) — not the raw content 459 text_to_send = cleaned_delivery_content.strip() 460 adapter_ok = True 461 if text_to_send: 462 future = asyncio.run_coroutine_threadsafe( 463 runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata), 464 loop, 465 ) 466 try: 467 send_result = future.result(timeout=60) 468 except TimeoutError: 469 future.cancel() 470 raise 471 if send_result and not getattr(send_result, "success", True): 472 err = getattr(send_result, "error", "unknown") 473 logger.warning( 474 "Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone", 475 job["id"], platform_name, chat_id, err, 476 ) 477 adapter_ok = False # fall through to standalone path 478 479 # Send extracted media files as native attachments via the live adapter 480 if adapter_ok and media_files: 481 _send_media_via_adapter( 482 runtime_adapter, 483 chat_id, 484 media_files, 485 send_metadata, 486 loop, 487 job, 488 platform=platform, 489 ) 490 491 if adapter_ok: 492 logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id) 493 delivered = True 494 except Exception as e: 495 logger.warning( 496 "Job '%s': live adapter delivery to %s:%s failed (%s), falling back to standalone", 497 job["id"], platform_name, chat_id, e, 498 ) 499 500 if not delivered: 501 # Standalone path: run the async send in a fresh event loop (safe from any thread) 502 coro = _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files) 503 try: 504 result = asyncio.run(coro) 505 except RuntimeError: 506 # asyncio.run() checks for a running loop before awaiting the coroutine; 507 # when it raises, the original coro was never started — close it to 508 # prevent "coroutine was never awaited" RuntimeWarning, then retry in a 509 # fresh thread that has no running loop. 510 coro.close() 511 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: 512 future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files)) 513 result = future.result(timeout=30) 514 except Exception as e: 515 msg = f"delivery to {platform_name}:{chat_id} failed: {e}" 516 logger.error("Job '%s': %s", job["id"], msg) 517 delivery_errors.append(msg) 518 continue 519 520 if result and result.get("error"): 521 msg = f"delivery error: {result['error']}" 522 logger.error("Job '%s': %s", job["id"], msg) 523 delivery_errors.append(msg) 524 continue 525 526 logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id) 527 528 if delivery_errors: 529 return "; ".join(delivery_errors) 530 return None 531 532 533 _DEFAULT_SCRIPT_TIMEOUT = 120 # seconds 534 # Backward-compatible module override used by tests and emergency monkeypatches. 535 _SCRIPT_TIMEOUT = _DEFAULT_SCRIPT_TIMEOUT 536 537 538 def _get_script_timeout() -> int: 539 """Resolve cron pre-run script timeout from module/env/config with a safe default.""" 540 if _SCRIPT_TIMEOUT != _DEFAULT_SCRIPT_TIMEOUT: 541 try: 542 timeout = int(float(_SCRIPT_TIMEOUT)) 543 if timeout > 0: 544 return timeout 545 except Exception: 546 logger.warning("Invalid patched _SCRIPT_TIMEOUT=%r; using env/config/default", _SCRIPT_TIMEOUT) 547 548 env_value = os.getenv("HERMES_CRON_SCRIPT_TIMEOUT", "").strip() 549 if env_value: 550 try: 551 timeout = int(float(env_value)) 552 if timeout > 0: 553 return timeout 554 except Exception: 555 logger.warning("Invalid HERMES_CRON_SCRIPT_TIMEOUT=%r; using config/default", env_value) 556 557 try: 558 cfg = load_config() or {} 559 cron_cfg = cfg.get("cron", {}) if isinstance(cfg, dict) else {} 560 configured = cron_cfg.get("script_timeout_seconds") 561 if configured is not None: 562 timeout = int(float(configured)) 563 if timeout > 0: 564 return timeout 565 except Exception as exc: 566 logger.debug("Failed to load cron script timeout from config: %s", exc) 567 568 return _DEFAULT_SCRIPT_TIMEOUT 569 570 571 def _run_job_script(script_path: str) -> tuple[bool, str]: 572 """Execute a cron job's data-collection script and capture its output. 573 574 Scripts must reside within HERMES_HOME/scripts/. Both relative and 575 absolute paths are resolved and validated against this directory to 576 prevent arbitrary script execution via path traversal or absolute 577 path injection. 578 579 Args: 580 script_path: Path to a Python script. Relative paths are resolved 581 against HERMES_HOME/scripts/. Absolute and ~-prefixed paths 582 are also validated to ensure they stay within the scripts dir. 583 584 Returns: 585 (success, output) — on failure *output* contains the error message so the 586 LLM can report the problem to the user. 587 """ 588 from hermes_constants import get_hermes_home 589 590 scripts_dir = get_hermes_home() / "scripts" 591 scripts_dir.mkdir(parents=True, exist_ok=True) 592 scripts_dir_resolved = scripts_dir.resolve() 593 594 raw = Path(script_path).expanduser() 595 if raw.is_absolute(): 596 path = raw.resolve() 597 else: 598 path = (scripts_dir / raw).resolve() 599 600 # Guard against path traversal, absolute path injection, and symlink 601 # escape — scripts MUST reside within HERMES_HOME/scripts/. 602 try: 603 path.relative_to(scripts_dir_resolved) 604 except ValueError: 605 return False, ( 606 f"Blocked: script path resolves outside the scripts directory " 607 f"({scripts_dir_resolved}): {script_path!r}" 608 ) 609 610 if not path.exists(): 611 return False, f"Script not found: {path}" 612 if not path.is_file(): 613 return False, f"Script path is not a file: {path}" 614 615 script_timeout = _get_script_timeout() 616 617 try: 618 result = subprocess.run( 619 [sys.executable, str(path)], 620 capture_output=True, 621 text=True, 622 timeout=script_timeout, 623 cwd=str(path.parent), 624 ) 625 stdout = (result.stdout or "").strip() 626 stderr = (result.stderr or "").strip() 627 628 # Redact secrets from both stdout and stderr before any return path. 629 try: 630 from agent.redact import redact_sensitive_text 631 stdout = redact_sensitive_text(stdout) 632 stderr = redact_sensitive_text(stderr) 633 except Exception: 634 pass 635 636 if result.returncode != 0: 637 parts = [f"Script exited with code {result.returncode}"] 638 if stderr: 639 parts.append(f"stderr:\n{stderr}") 640 if stdout: 641 parts.append(f"stdout:\n{stdout}") 642 return False, "\n".join(parts) 643 644 return True, stdout 645 646 except subprocess.TimeoutExpired: 647 return False, f"Script timed out after {script_timeout}s: {path}" 648 except Exception as exc: 649 return False, f"Script execution failed: {exc}" 650 651 652 def _parse_wake_gate(script_output: str) -> bool: 653 """Parse the last non-empty stdout line of a cron job's pre-check script 654 as a wake gate. 655 656 The convention (ported from nanoclaw #1232): if the last stdout line is 657 JSON like ``{"wakeAgent": false}``, the agent is skipped entirely — no 658 LLM run, no delivery. Any other output (non-JSON, missing flag, gate 659 absent, or ``wakeAgent: true``) means wake the agent normally. 660 661 Returns True if the agent should wake, False to skip. 662 """ 663 if not script_output: 664 return True 665 stripped_lines = [line for line in script_output.splitlines() if line.strip()] 666 if not stripped_lines: 667 return True 668 last_line = stripped_lines[-1].strip() 669 try: 670 gate = json.loads(last_line) 671 except (json.JSONDecodeError, ValueError): 672 return True 673 if not isinstance(gate, dict): 674 return True 675 return gate.get("wakeAgent", True) is not False 676 677 678 def _build_job_prompt(job: dict, prerun_script: Optional[tuple] = None) -> str: 679 """Build the effective prompt for a cron job, optionally loading one or more skills first. 680 681 Args: 682 job: The cron job dict. 683 prerun_script: Optional ``(success, stdout)`` from a script that has 684 already been executed by the caller (e.g. for a wake-gate check). 685 When provided, the script is not re-executed and the cached 686 result is used for prompt injection. When omitted, the script 687 (if any) runs inline as before. 688 """ 689 prompt = job.get("prompt", "") 690 skills = job.get("skills") 691 692 # Run data-collection script if configured, inject output as context. 693 script_path = job.get("script") 694 if script_path: 695 if prerun_script is not None: 696 success, script_output = prerun_script 697 else: 698 success, script_output = _run_job_script(script_path) 699 if success: 700 if script_output: 701 prompt = ( 702 "## Script Output\n" 703 "The following data was collected by a pre-run script. " 704 "Use it as context for your analysis.\n\n" 705 f"```\n{script_output}\n```\n\n" 706 f"{prompt}" 707 ) 708 else: 709 # Script produced no output — nothing to report, skip AI call. 710 return None 711 else: 712 prompt = ( 713 "## Script Error\n" 714 "The data-collection script failed. Report this to the user.\n\n" 715 f"```\n{script_output}\n```\n\n" 716 f"{prompt}" 717 ) 718 719 # Inject output from referenced cron jobs as context. 720 context_from = job.get("context_from") 721 if context_from: 722 from cron.jobs import OUTPUT_DIR 723 if isinstance(context_from, str): 724 context_from = [context_from] 725 for source_job_id in context_from: 726 # Guard against path traversal — valid job IDs are 12-char hex strings 727 if not source_job_id or not all(c in "0123456789abcdef" for c in source_job_id): 728 logger.warning("context_from: skipping invalid job_id %r", source_job_id) 729 continue 730 try: 731 job_output_dir = OUTPUT_DIR / source_job_id 732 if not job_output_dir.exists(): 733 continue # silent skip — no output yet 734 output_files = sorted( 735 job_output_dir.glob("*.md"), 736 key=lambda f: f.stat().st_mtime, 737 reverse=True, 738 ) 739 if not output_files: 740 continue # silent skip — no output yet 741 latest_output = output_files[0].read_text(encoding="utf-8").strip() 742 # Truncate to 8K characters to avoid prompt bloat 743 _MAX_CONTEXT_CHARS = 8000 744 if len(latest_output) > _MAX_CONTEXT_CHARS: 745 latest_output = latest_output[:_MAX_CONTEXT_CHARS] + "\n\n[... output truncated ...]" 746 if latest_output: 747 prompt = ( 748 f"## Output from job '{source_job_id}'\n" 749 "The following is the most recent output from a preceding " 750 "cron job. Use it as context for your analysis.\n\n" 751 f"```\n{latest_output}\n```\n\n" 752 f"{prompt}" 753 ) 754 else: 755 continue # silent skip — empty output 756 except (OSError, PermissionError) as e: 757 logger.warning("context_from: failed to read output for job %r: %s", source_job_id, e) 758 # silent skip — do not pollute the prompt with error messages 759 760 # Always prepend cron execution guidance so the agent knows how 761 # delivery works and can suppress delivery when appropriate. 762 cron_hint = ( 763 "[IMPORTANT: You are running as a scheduled cron job. " 764 "DELIVERY: Your final response will be automatically delivered " 765 "to the user — do NOT use send_message or try to deliver " 766 "the output yourself. Just produce your report/output as your " 767 "final response and the system handles the rest. " 768 "SILENT: If there is genuinely nothing new to report, respond " 769 "with exactly \"[SILENT]\" (nothing else) to suppress delivery. " 770 "Never combine [SILENT] with content — either report your " 771 "findings normally, or say [SILENT] and nothing more.]\n\n" 772 ) 773 prompt = cron_hint + prompt 774 if skills is None: 775 legacy = job.get("skill") 776 skills = [legacy] if legacy else [] 777 778 skill_names = [str(name).strip() for name in skills if str(name).strip()] 779 if not skill_names: 780 return prompt 781 782 from tools.skills_tool import skill_view 783 from tools.skill_usage import bump_use 784 785 parts = [] 786 skipped: list[str] = [] 787 for skill_name in skill_names: 788 loaded = json.loads(skill_view(skill_name)) 789 if not loaded.get("success"): 790 error = loaded.get("error") or f"Failed to load skill '{skill_name}'" 791 logger.warning("Cron job '%s': skill not found, skipping — %s", job.get("name", job.get("id")), error) 792 skipped.append(skill_name) 793 continue 794 795 # Bump usage so the curator sees this skill as actively used. 796 try: 797 bump_use(skill_name) 798 except Exception: 799 logger.debug("Cron job: failed to bump skill usage for '%s'", skill_name, exc_info=True) 800 801 content = str(loaded.get("content") or "").strip() 802 if parts: 803 parts.append("") 804 parts.extend( 805 [ 806 f'[IMPORTANT: The user has invoked the "{skill_name}" skill, indicating they want you to follow its instructions. The full skill content is loaded below.]', 807 "", 808 content, 809 ] 810 ) 811 812 if skipped: 813 notice = ( 814 f"[IMPORTANT: The following skill(s) were listed for this job but could not be found " 815 f"and were skipped: {', '.join(skipped)}. " 816 f"Start your response with a brief notice so the user is aware, e.g.: " 817 f"'⚠️ Skill(s) not found and skipped: {', '.join(skipped)}']" 818 ) 819 parts.insert(0, notice) 820 821 if prompt: 822 parts.extend(["", f"The user has provided the following instruction alongside the skill invocation: {prompt}"]) 823 return "\n".join(parts) 824 825 826 def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: 827 """ 828 Execute a single cron job. 829 830 Returns: 831 Tuple of (success, full_output_doc, final_response, error_message) 832 """ 833 from run_agent import AIAgent 834 835 # Initialize SQLite session store so cron job messages are persisted 836 # and discoverable via session_search (same pattern as gateway/run.py). 837 _session_db = None 838 try: 839 from hermes_state import SessionDB 840 _session_db = SessionDB() 841 except Exception as e: 842 logger.debug("Job '%s': SQLite session store not available: %s", job.get("id", "?"), e) 843 844 job_id = job["id"] 845 job_name = job["name"] 846 847 # Wake-gate: if this job has a pre-check script, run it BEFORE building 848 # the prompt so a ``{"wakeAgent": false}`` response can short-circuit 849 # the whole agent run. We pass the result into _build_job_prompt so 850 # the script is only executed once. 851 prerun_script = None 852 script_path = job.get("script") 853 if script_path: 854 prerun_script = _run_job_script(script_path) 855 _ran_ok, _script_output = prerun_script 856 if _ran_ok and not _parse_wake_gate(_script_output): 857 logger.info( 858 "Job '%s' (ID: %s): wakeAgent=false, skipping agent run", 859 job_name, job_id, 860 ) 861 silent_doc = ( 862 f"# Cron Job: {job_name}\n\n" 863 f"**Job ID:** {job_id}\n" 864 f"**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" 865 "Script gate returned `wakeAgent=false` — agent skipped.\n" 866 ) 867 return True, silent_doc, SILENT_MARKER, None 868 869 prompt = _build_job_prompt(job, prerun_script=prerun_script) 870 if prompt is None: 871 logger.info("Job '%s': script produced no output, skipping AI call.", job_name) 872 return True, "", SILENT_MARKER, None 873 origin = _resolve_origin(job) 874 _cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}" 875 876 logger.info("Running job '%s' (ID: %s)", job_name, job_id) 877 logger.info("Prompt: %s", prompt[:100]) 878 879 agent = None 880 881 # Mark this as a cron session so the approval system can apply cron_mode. 882 # This env var is process-wide and persists for the lifetime of the 883 # scheduler process — every job this process runs is a cron job. 884 os.environ["HERMES_CRON_SESSION"] = "1" 885 886 # Use ContextVars for per-job session/delivery state so parallel jobs 887 # don't clobber each other's targets (os.environ is process-global). 888 from gateway.session_context import set_session_vars, clear_session_vars, _VAR_MAP 889 890 _ctx_tokens = set_session_vars( 891 platform=origin["platform"] if origin else "", 892 chat_id=str(origin["chat_id"]) if origin else "", 893 chat_name=origin.get("chat_name", "") if origin else "", 894 ) 895 _cron_delivery_vars = ( 896 "HERMES_CRON_AUTO_DELIVER_PLATFORM", 897 "HERMES_CRON_AUTO_DELIVER_CHAT_ID", 898 "HERMES_CRON_AUTO_DELIVER_THREAD_ID", 899 ) 900 for _var_name in _cron_delivery_vars: 901 _VAR_MAP[_var_name].set("") 902 903 # Per-job working directory. When set (and validated at create/update 904 # time), we point TERMINAL_CWD at it so: 905 # - build_context_files_prompt() picks up AGENTS.md / CLAUDE.md / 906 # .cursorrules from the job's project dir, AND 907 # - the terminal, file, and code-exec tools run commands from there. 908 # 909 # tick() serializes workdir-jobs outside the parallel pool, so mutating 910 # os.environ["TERMINAL_CWD"] here is safe for those jobs. For workdir-less 911 # jobs we leave TERMINAL_CWD untouched — preserves the original behaviour 912 # (skip_context_files=True, tools use whatever cwd the scheduler has). 913 _job_workdir = (job.get("workdir") or "").strip() or None 914 if _job_workdir and not Path(_job_workdir).is_dir(): 915 # Directory was removed between create-time validation and now. Log 916 # and drop back to old behaviour rather than crashing the job. 917 logger.warning( 918 "Job '%s': configured workdir %r no longer exists — running without it", 919 job_id, _job_workdir, 920 ) 921 _job_workdir = None 922 _prior_terminal_cwd = os.environ.get("TERMINAL_CWD", "_UNSET_") 923 if _job_workdir: 924 os.environ["TERMINAL_CWD"] = _job_workdir 925 logger.info("Job '%s': using workdir %s", job_id, _job_workdir) 926 927 try: 928 # Re-read .env and config.yaml fresh every run so provider/key 929 # changes take effect without a gateway restart. 930 from dotenv import load_dotenv 931 try: 932 load_dotenv(str(_hermes_home / ".env"), override=True, encoding="utf-8") 933 except UnicodeDecodeError: 934 load_dotenv(str(_hermes_home / ".env"), override=True, encoding="latin-1") 935 936 delivery_target = _resolve_delivery_target(job) 937 if delivery_target: 938 _VAR_MAP["HERMES_CRON_AUTO_DELIVER_PLATFORM"].set(delivery_target["platform"]) 939 _VAR_MAP["HERMES_CRON_AUTO_DELIVER_CHAT_ID"].set(str(delivery_target["chat_id"])) 940 _VAR_MAP["HERMES_CRON_AUTO_DELIVER_THREAD_ID"].set( 941 "" 942 if delivery_target.get("thread_id") is None 943 else str(delivery_target["thread_id"]) 944 ) 945 946 model = job.get("model") or os.getenv("HERMES_MODEL") or "" 947 948 # Load config.yaml for model, reasoning, prefill, toolsets, provider routing 949 _cfg = {} 950 try: 951 import yaml 952 _cfg_path = str(_hermes_home / "config.yaml") 953 if os.path.exists(_cfg_path): 954 with open(_cfg_path) as _f: 955 _cfg = yaml.safe_load(_f) or {} 956 _model_cfg = _cfg.get("model", {}) 957 if not job.get("model"): 958 if isinstance(_model_cfg, str): 959 model = _model_cfg 960 elif isinstance(_model_cfg, dict): 961 model = _model_cfg.get("default", model) 962 except Exception as e: 963 logger.warning("Job '%s': failed to load config.yaml, using defaults: %s", job_id, e) 964 965 # Apply IPv4 preference if configured. 966 try: 967 from hermes_constants import apply_ipv4_preference 968 _net_cfg = _cfg.get("network", {}) 969 if isinstance(_net_cfg, dict) and _net_cfg.get("force_ipv4"): 970 apply_ipv4_preference(force=True) 971 except Exception: 972 pass 973 974 # Reasoning config from config.yaml 975 from hermes_constants import parse_reasoning_effort 976 effort = str(_cfg.get("agent", {}).get("reasoning_effort", "")).strip() 977 reasoning_config = parse_reasoning_effort(effort) 978 979 # Prefill messages from env or config.yaml 980 prefill_messages = None 981 prefill_file = os.getenv("HERMES_PREFILL_MESSAGES_FILE", "") or _cfg.get("prefill_messages_file", "") 982 if prefill_file: 983 pfpath = Path(prefill_file).expanduser() 984 if not pfpath.is_absolute(): 985 pfpath = _hermes_home / pfpath 986 if pfpath.exists(): 987 try: 988 with open(pfpath, "r", encoding="utf-8") as _pf: 989 prefill_messages = json.load(_pf) 990 if not isinstance(prefill_messages, list): 991 prefill_messages = None 992 except Exception as e: 993 logger.warning("Job '%s': failed to parse prefill messages file '%s': %s", job_id, pfpath, e) 994 prefill_messages = None 995 996 # Max iterations 997 max_iterations = _cfg.get("agent", {}).get("max_turns") or _cfg.get("max_turns") or 90 998 999 # Provider routing 1000 pr = _cfg.get("provider_routing", {}) 1001 1002 from hermes_cli.runtime_provider import ( 1003 resolve_runtime_provider, 1004 format_runtime_provider_error, 1005 ) 1006 from hermes_cli.auth import AuthError 1007 try: 1008 # Do not inject HERMES_INFERENCE_PROVIDER here. resolve_runtime_provider() 1009 # already prefers persisted config over stale shell/env overrides when 1010 # no explicit provider is requested. Passing the env var here short- 1011 # circuits that precedence and can resurrect old providers (for 1012 # example DeepSeek) for cron jobs that do not pin provider/model. 1013 runtime_kwargs = { 1014 "requested": job.get("provider"), 1015 } 1016 if job.get("base_url"): 1017 runtime_kwargs["explicit_base_url"] = job.get("base_url") 1018 runtime = resolve_runtime_provider(**runtime_kwargs) 1019 except AuthError as auth_exc: 1020 # Primary provider auth failed — try fallback chain before giving up. 1021 logger.warning("Job '%s': primary auth failed (%s), trying fallback", job_id, auth_exc) 1022 fb = _cfg.get("fallback_providers") or _cfg.get("fallback_model") 1023 fb_list = (fb if isinstance(fb, list) else [fb]) if fb else [] 1024 runtime = None 1025 for entry in fb_list: 1026 if not isinstance(entry, dict): 1027 continue 1028 try: 1029 fb_kwargs = {"requested": entry.get("provider")} 1030 if entry.get("base_url"): 1031 fb_kwargs["explicit_base_url"] = entry["base_url"] 1032 if entry.get("api_key"): 1033 fb_kwargs["explicit_api_key"] = entry["api_key"] 1034 runtime = resolve_runtime_provider(**fb_kwargs) 1035 logger.info("Job '%s': fallback resolved to %s", job_id, runtime.get("provider")) 1036 break 1037 except Exception as fb_exc: 1038 logger.debug("Job '%s': fallback %s failed: %s", job_id, entry.get("provider"), fb_exc) 1039 if runtime is None: 1040 raise RuntimeError(format_runtime_provider_error(auth_exc)) from auth_exc 1041 except Exception as exc: 1042 message = format_runtime_provider_error(exc) 1043 raise RuntimeError(message) from exc 1044 1045 fallback_model = _cfg.get("fallback_providers") or _cfg.get("fallback_model") or None 1046 credential_pool = None 1047 runtime_provider = str(runtime.get("provider") or "").strip().lower() 1048 if runtime_provider: 1049 try: 1050 from agent.credential_pool import load_pool 1051 pool = load_pool(runtime_provider) 1052 if pool.has_credentials(): 1053 credential_pool = pool 1054 logger.info( 1055 "Job '%s': loaded credential pool for provider %s with %d entries", 1056 job_id, 1057 runtime_provider, 1058 len(pool.entries()), 1059 ) 1060 except Exception as e: 1061 logger.debug("Job '%s': failed to load credential pool for %s: %s", job_id, runtime_provider, e) 1062 1063 agent = AIAgent( 1064 model=model, 1065 api_key=runtime.get("api_key"), 1066 base_url=runtime.get("base_url"), 1067 provider=runtime.get("provider"), 1068 api_mode=runtime.get("api_mode"), 1069 acp_command=runtime.get("command"), 1070 acp_args=runtime.get("args"), 1071 max_iterations=max_iterations, 1072 reasoning_config=reasoning_config, 1073 prefill_messages=prefill_messages, 1074 fallback_model=fallback_model, 1075 credential_pool=credential_pool, 1076 providers_allowed=pr.get("only"), 1077 providers_ignored=pr.get("ignore"), 1078 providers_order=pr.get("order"), 1079 provider_sort=pr.get("sort"), 1080 enabled_toolsets=_resolve_cron_enabled_toolsets(job, _cfg), 1081 disabled_toolsets=["cronjob", "messaging", "clarify"], 1082 quiet_mode=True, 1083 # Cron jobs should always inherit the user's SOUL.md identity from 1084 # HERMES_HOME. When a workdir is configured, also inject project 1085 # context files (AGENTS.md / CLAUDE.md / .cursorrules) from there. 1086 # Without a workdir, keep cwd context discovery disabled. 1087 skip_context_files=not bool(_job_workdir), 1088 load_soul_identity=True, 1089 skip_memory=True, # Cron system prompts would corrupt user representations 1090 platform="cron", 1091 session_id=_cron_session_id, 1092 session_db=_session_db, 1093 ) 1094 1095 # Run the agent with an *inactivity*-based timeout: the job can run 1096 # for hours if it's actively calling tools / receiving stream tokens, 1097 # but a hung API call or stuck tool with no activity for the configured 1098 # duration is caught and killed. Default 600s (10 min inactivity); 1099 # override via HERMES_CRON_TIMEOUT env var. 0 = unlimited. 1100 # 1101 # Uses the agent's built-in activity tracker (updated by 1102 # _touch_activity() on every tool call, API call, and stream delta). 1103 _raw_cron_timeout = os.getenv("HERMES_CRON_TIMEOUT", "").strip() 1104 if _raw_cron_timeout: 1105 try: 1106 _cron_timeout = float(_raw_cron_timeout) 1107 except (ValueError, TypeError): 1108 logger.warning( 1109 "Invalid HERMES_CRON_TIMEOUT=%r; using default 600s", 1110 _raw_cron_timeout, 1111 ) 1112 _cron_timeout = 600.0 1113 else: 1114 _cron_timeout = 600.0 1115 _cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None 1116 _POLL_INTERVAL = 5.0 1117 _cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) 1118 # Preserve scheduler-scoped ContextVar state (for example skill-declared 1119 # env passthrough registrations) when the cron run hops into the worker 1120 # thread used for inactivity timeout monitoring. 1121 _cron_context = contextvars.copy_context() 1122 _cron_future = _cron_pool.submit(_cron_context.run, agent.run_conversation, prompt) 1123 _inactivity_timeout = False 1124 try: 1125 if _cron_inactivity_limit is None: 1126 # Unlimited — just wait for the result. 1127 result = _cron_future.result() 1128 else: 1129 result = None 1130 while True: 1131 done, _ = concurrent.futures.wait( 1132 {_cron_future}, timeout=_POLL_INTERVAL, 1133 ) 1134 if done: 1135 result = _cron_future.result() 1136 break 1137 # Agent still running — check inactivity. 1138 _idle_secs = 0.0 1139 if hasattr(agent, "get_activity_summary"): 1140 try: 1141 _act = agent.get_activity_summary() 1142 _idle_secs = _act.get("seconds_since_activity", 0.0) 1143 except Exception: 1144 pass 1145 if _idle_secs >= _cron_inactivity_limit: 1146 _inactivity_timeout = True 1147 break 1148 except Exception: 1149 _cron_pool.shutdown(wait=False, cancel_futures=True) 1150 raise 1151 finally: 1152 _cron_pool.shutdown(wait=False, cancel_futures=True) 1153 1154 if _inactivity_timeout: 1155 # Build diagnostic summary from the agent's activity tracker. 1156 _activity = {} 1157 if hasattr(agent, "get_activity_summary"): 1158 try: 1159 _activity = agent.get_activity_summary() 1160 except Exception: 1161 pass 1162 _last_desc = _activity.get("last_activity_desc", "unknown") 1163 _secs_ago = _activity.get("seconds_since_activity", 0) 1164 _cur_tool = _activity.get("current_tool") 1165 _iter_n = _activity.get("api_call_count", 0) 1166 _iter_max = _activity.get("max_iterations", 0) 1167 1168 logger.error( 1169 "Job '%s' idle for %.0fs (inactivity limit %.0fs) " 1170 "| last_activity=%s | iteration=%s/%s | tool=%s", 1171 job_name, _secs_ago, _cron_inactivity_limit, 1172 _last_desc, _iter_n, _iter_max, 1173 _cur_tool or "none", 1174 ) 1175 if hasattr(agent, "interrupt"): 1176 agent.interrupt("Cron job timed out (inactivity)") 1177 raise TimeoutError( 1178 f"Cron job '{job_name}' idle for " 1179 f"{int(_secs_ago)}s (limit {int(_cron_inactivity_limit)}s) " 1180 f"— last activity: {_last_desc}" 1181 ) 1182 1183 # Guard against non-dict returns from run_conversation under error conditions 1184 if not isinstance(result, dict): 1185 raise RuntimeError( 1186 f"agent.run_conversation returned {type(result).__name__} instead of dict: {result!r}" 1187 ) 1188 1189 # If the agent itself reported failure (e.g. all retries exhausted on 1190 # API errors, model abort, mid-run interrupt), do not silently mark the 1191 # job as successful. run_agent populates `failed=True`/`completed=False` 1192 # on these paths and may put the error into `final_response`, which 1193 # would otherwise be delivered as if it were the agent's reply and the 1194 # job's `last_status` set to "ok". Raise so the except handler below 1195 # builds the proper failure tuple. (issue #17855) 1196 if result.get("failed") is True or result.get("completed") is False: 1197 _err_text = ( 1198 result.get("error") 1199 or (result.get("final_response") or "").strip() 1200 or "agent reported failure" 1201 ) 1202 raise RuntimeError(_err_text) 1203 1204 final_response = result.get("final_response", "") or "" 1205 # Strip leaked placeholder text that upstream may inject on empty completions. 1206 if final_response.strip() == "(No response generated)": 1207 final_response = "" 1208 # Use a separate variable for log display; keep final_response clean 1209 # for delivery logic (empty response = no delivery). 1210 logged_response = final_response if final_response else "(No response generated)" 1211 1212 output = f"""# Cron Job: {job_name} 1213 1214 **Job ID:** {job_id} 1215 **Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')} 1216 **Schedule:** {job.get('schedule_display', 'N/A')} 1217 1218 ## Prompt 1219 1220 {prompt} 1221 1222 ## Response 1223 1224 {logged_response} 1225 """ 1226 1227 logger.info("Job '%s' completed successfully", job_name) 1228 return True, output, final_response, None 1229 1230 except Exception as e: 1231 error_msg = f"{type(e).__name__}: {str(e)}" 1232 logger.exception("Job '%s' failed: %s", job_name, error_msg) 1233 1234 output = f"""# Cron Job: {job_name} (FAILED) 1235 1236 **Job ID:** {job_id} 1237 **Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')} 1238 **Schedule:** {job.get('schedule_display', 'N/A')} 1239 1240 ## Prompt 1241 1242 {prompt} 1243 1244 ## Error 1245 1246 ``` 1247 {error_msg} 1248 ``` 1249 """ 1250 return False, output, "", error_msg 1251 1252 finally: 1253 # Restore TERMINAL_CWD to whatever it was before this job ran. We 1254 # only ever mutate it when the job has a workdir; see the setup block 1255 # at the top of run_job for the serialization guarantee. 1256 if _job_workdir: 1257 if _prior_terminal_cwd == "_UNSET_": 1258 os.environ.pop("TERMINAL_CWD", None) 1259 else: 1260 os.environ["TERMINAL_CWD"] = _prior_terminal_cwd 1261 # Clean up ContextVar session/delivery state for this job. 1262 clear_session_vars(_ctx_tokens) 1263 for _var_name in _cron_delivery_vars: 1264 _VAR_MAP[_var_name].set("") 1265 if _session_db: 1266 try: 1267 _session_db.end_session(_cron_session_id, "cron_complete") 1268 except (Exception, KeyboardInterrupt) as e: 1269 logger.debug("Job '%s': failed to end session: %s", job_id, e) 1270 try: 1271 _session_db.close() 1272 except (Exception, KeyboardInterrupt) as e: 1273 logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e) 1274 # Release subprocesses, terminal sandboxes, browser daemons, and the 1275 # main OpenAI/httpx client held by this ephemeral cron agent. Without 1276 # this, a gateway that ticks cron every N minutes leaks fds per job 1277 # until it hits EMFILE (#10200 / "too many open files"). 1278 try: 1279 if agent is not None: 1280 agent.close() 1281 except (Exception, KeyboardInterrupt) as e: 1282 logger.debug("Job '%s': failed to close agent resources: %s", job_id, e) 1283 # Each cron run spins up a short-lived worker thread whose event loop 1284 # dies as soon as the ``ThreadPoolExecutor`` shuts down. Any async 1285 # httpx clients cached under that loop are now unusable — reap them 1286 # so their transports don't accumulate in the process-global cache. 1287 try: 1288 from agent.auxiliary_client import cleanup_stale_async_clients 1289 cleanup_stale_async_clients() 1290 except Exception as e: 1291 logger.debug("Job '%s': failed to reap stale auxiliary clients: %s", job_id, e) 1292 1293 1294 def tick(verbose: bool = True, adapters=None, loop=None) -> int: 1295 """ 1296 Check and run all due jobs. 1297 1298 Uses a file lock so only one tick runs at a time, even if the gateway's 1299 in-process ticker and a standalone daemon or manual tick overlap. 1300 1301 Args: 1302 verbose: Whether to print status messages 1303 adapters: Optional dict mapping Platform → live adapter (from gateway) 1304 loop: Optional asyncio event loop (from gateway) for live adapter sends 1305 1306 Returns: 1307 Number of jobs executed (0 if another tick is already running) 1308 """ 1309 _LOCK_DIR.mkdir(parents=True, exist_ok=True) 1310 1311 # Cross-platform file locking: fcntl on Unix, msvcrt on Windows 1312 lock_fd = None 1313 try: 1314 lock_fd = open(_LOCK_FILE, "w") 1315 if fcntl: 1316 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 1317 elif msvcrt: 1318 msvcrt.locking(lock_fd.fileno(), msvcrt.LK_NBLCK, 1) 1319 except (OSError, IOError): 1320 logger.debug("Tick skipped — another instance holds the lock") 1321 if lock_fd is not None: 1322 lock_fd.close() 1323 return 0 1324 1325 try: 1326 due_jobs = get_due_jobs() 1327 1328 if verbose and not due_jobs: 1329 logger.info("%s - No jobs due", _hermes_now().strftime('%H:%M:%S')) 1330 return 0 1331 1332 if verbose: 1333 logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs)) 1334 1335 # Advance next_run_at for all recurring jobs FIRST, under the file lock, 1336 # before any execution begins. This preserves at-most-once semantics. 1337 for job in due_jobs: 1338 advance_next_run(job["id"]) 1339 1340 # Resolve max parallel workers: env var > config.yaml > unbounded. 1341 # Set HERMES_CRON_MAX_PARALLEL=1 to restore old serial behaviour. 1342 _max_workers: Optional[int] = None 1343 try: 1344 _env_par = os.getenv("HERMES_CRON_MAX_PARALLEL", "").strip() 1345 if _env_par: 1346 _max_workers = int(_env_par) or None 1347 except (ValueError, TypeError): 1348 logger.warning("Invalid HERMES_CRON_MAX_PARALLEL value; defaulting to unbounded") 1349 if _max_workers is None: 1350 try: 1351 _ucfg = load_config() or {} 1352 _cfg_par = ( 1353 _ucfg.get("cron", {}) if isinstance(_ucfg, dict) else {} 1354 ).get("max_parallel_jobs") 1355 if _cfg_par is not None: 1356 _max_workers = int(_cfg_par) or None 1357 except Exception: 1358 pass 1359 1360 if verbose: 1361 logger.info( 1362 "Running %d job(s) in parallel (max_workers=%s)", 1363 len(due_jobs), 1364 _max_workers if _max_workers else "unbounded", 1365 ) 1366 1367 def _process_job(job: dict) -> bool: 1368 """Run one due job end-to-end: execute, save, deliver, mark.""" 1369 try: 1370 success, output, final_response, error = run_job(job) 1371 1372 output_file = save_job_output(job["id"], output) 1373 if verbose: 1374 logger.info("Output saved to: %s", output_file) 1375 1376 # Deliver the final response to the origin/target chat. 1377 # If the agent responded with [SILENT], skip delivery (but 1378 # output is already saved above). Failed jobs always deliver. 1379 deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}" 1380 should_deliver = bool(deliver_content) 1381 if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper(): 1382 logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER) 1383 should_deliver = False 1384 1385 delivery_error = None 1386 if should_deliver: 1387 try: 1388 delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop) 1389 except Exception as de: 1390 delivery_error = str(de) 1391 logger.error("Delivery failed for job %s: %s", job["id"], de) 1392 1393 # Treat empty final_response as a soft failure so last_status 1394 # is not "ok" — the agent ran but produced nothing useful. 1395 # (issue #8585) 1396 if success and not final_response: 1397 success = False 1398 error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)" 1399 1400 mark_job_run(job["id"], success, error, delivery_error=delivery_error) 1401 return True 1402 1403 except Exception as e: 1404 logger.error("Error processing job %s: %s", job['id'], e) 1405 mark_job_run(job["id"], False, str(e)) 1406 return False 1407 1408 # Partition due jobs: those with a per-job workdir mutate 1409 # os.environ["TERMINAL_CWD"] inside run_job, which is process-global — 1410 # so they MUST run sequentially to avoid corrupting each other. Jobs 1411 # without a workdir leave env untouched and stay parallel-safe. 1412 workdir_jobs = [j for j in due_jobs if (j.get("workdir") or "").strip()] 1413 parallel_jobs = [j for j in due_jobs if not (j.get("workdir") or "").strip()] 1414 1415 _results: list = [] 1416 1417 # Sequential pass for workdir jobs. 1418 for job in workdir_jobs: 1419 _ctx = contextvars.copy_context() 1420 _results.append(_ctx.run(_process_job, job)) 1421 1422 # Parallel pass for the rest — same behaviour as before. 1423 if parallel_jobs: 1424 with concurrent.futures.ThreadPoolExecutor(max_workers=_max_workers) as _tick_pool: 1425 _futures = [] 1426 for job in parallel_jobs: 1427 _ctx = contextvars.copy_context() 1428 _futures.append(_tick_pool.submit(_ctx.run, _process_job, job)) 1429 _results.extend(f.result() for f in _futures) 1430 1431 # Best-effort sweep of MCP stdio subprocesses that survived their 1432 # session teardown during this tick. Runs AFTER every job has 1433 # finished so active sessions (including live user chats) are 1434 # never touched — only PIDs explicitly detected as orphans in 1435 # tools.mcp_tool._run_stdio's finally block are reaped. 1436 try: 1437 from tools.mcp_tool import _kill_orphaned_mcp_children 1438 _kill_orphaned_mcp_children() 1439 except Exception as _e: 1440 logger.debug("Post-tick MCP orphan cleanup failed: %s", _e) 1441 1442 return sum(_results) 1443 finally: 1444 if fcntl: 1445 fcntl.flock(lock_fd, fcntl.LOCK_UN) 1446 elif msvcrt: 1447 try: 1448 msvcrt.locking(lock_fd.fileno(), msvcrt.LK_UNLCK, 1) 1449 except (OSError, IOError): 1450 pass 1451 lock_fd.close() 1452 1453 1454 if __name__ == "__main__": 1455 tick(verbose=True)