/ cron / scheduler.py
scheduler.py
   1  """
   2  Cron job scheduler - executes due jobs.
   3  
   4  Provides tick() which checks for due jobs and runs them. The gateway
   5  calls this every 60 seconds from a background thread.
   6  
   7  Uses a file-based lock (~/.hermes/cron/.tick.lock) so only one tick
   8  runs at a time if multiple processes overlap.
   9  """
  10  
  11  import asyncio
  12  import concurrent.futures
  13  import contextvars
  14  import json
  15  import logging
  16  import os
  17  import subprocess
  18  import sys
  19  
  20  # fcntl is Unix-only; on Windows use msvcrt for file locking
  21  try:
  22      import fcntl
  23  except ImportError:
  24      fcntl = None
  25      try:
  26          import msvcrt
  27      except ImportError:
  28          msvcrt = None
  29  from pathlib import Path
  30  from typing import List, Optional
  31  
  32  # Add parent directory to path for imports BEFORE repo-level imports.
  33  # Without this, standalone invocations (e.g. after `hermes update` reloads
  34  # the module) fail with ModuleNotFoundError for hermes_time et al.
  35  sys.path.insert(0, str(Path(__file__).parent.parent))
  36  
  37  from hermes_constants import get_hermes_home
  38  from hermes_cli.config import load_config
  39  from hermes_time import now as _hermes_now
  40  
  41  logger = logging.getLogger(__name__)
  42  
  43  
  44  def _resolve_cron_enabled_toolsets(job: dict, cfg: dict) -> list[str] | None:
  45      """Resolve the toolset list for a cron job.
  46  
  47      Precedence:
  48      1. Per-job ``enabled_toolsets`` (set via ``cronjob`` tool on create/update).
  49         Keeps the agent's job-scoped toolset override intact — #6130.
  50      2. Per-platform ``hermes tools`` config for the ``cron`` platform.
  51         Mirrors gateway behavior (``_get_platform_tools(cfg, platform_key)``)
  52         so users can gate cron toolsets globally without recreating every job.
  53      3. ``None`` on any lookup failure — AIAgent loads the full default set
  54         (legacy behavior before this change, preserved as the safety net).
  55  
  56      _DEFAULT_OFF_TOOLSETS ({moa, homeassistant, rl}) are removed by
  57      ``_get_platform_tools`` for unconfigured platforms, so fresh installs
  58      get cron WITHOUT ``moa`` by default (issue reported by Norbert —
  59      surprise $4.63 run).
  60      """
  61      per_job = job.get("enabled_toolsets")
  62      if per_job:
  63          return per_job
  64      try:
  65          from hermes_cli.tools_config import _get_platform_tools  # lazy: avoid heavy import at cron module load
  66          return sorted(_get_platform_tools(cfg or {}, "cron"))
  67      except Exception as exc:
  68          logger.warning(
  69              "Cron toolset resolution failed, falling back to full default toolset: %s",
  70              exc,
  71          )
  72          return None
  73  
  74  # Valid delivery platforms — used to validate user-supplied platform names
  75  # in cron delivery targets, preventing env var enumeration via crafted names.
  76  _KNOWN_DELIVERY_PLATFORMS = frozenset({
  77      "telegram", "discord", "slack", "whatsapp", "signal",
  78      "matrix", "mattermost", "homeassistant", "dingtalk", "feishu",
  79      "wecom", "wecom_callback", "weixin", "sms", "email", "webhook", "bluebubbles",
  80      "qqbot", "yuanbao",
  81  })
  82  
  83  # Platforms that support a configured cron/notification home target, mapped to
  84  # the environment variable used by gateway setup/runtime config.
  85  _HOME_TARGET_ENV_VARS = {
  86      "matrix": "MATRIX_HOME_ROOM",
  87      "telegram": "TELEGRAM_HOME_CHANNEL",
  88      "discord": "DISCORD_HOME_CHANNEL",
  89      "slack": "SLACK_HOME_CHANNEL",
  90      "signal": "SIGNAL_HOME_CHANNEL",
  91      "mattermost": "MATTERMOST_HOME_CHANNEL",
  92      "sms": "SMS_HOME_CHANNEL",
  93      "email": "EMAIL_HOME_ADDRESS",
  94      "dingtalk": "DINGTALK_HOME_CHANNEL",
  95      "feishu": "FEISHU_HOME_CHANNEL",
  96      "wecom": "WECOM_HOME_CHANNEL",
  97      "weixin": "WEIXIN_HOME_CHANNEL",
  98      "bluebubbles": "BLUEBUBBLES_HOME_CHANNEL",
  99      "qqbot": "QQBOT_HOME_CHANNEL",
 100  }
 101  
 102  # Legacy env var names kept for back-compat.  Each entry is the current
 103  # primary env var → the previous name.  _get_home_target_chat_id falls
 104  # back to the legacy name if the primary is unset, so users who set the
 105  # old name before the rename keep working until they migrate.
 106  _LEGACY_HOME_TARGET_ENV_VARS = {
 107      "QQBOT_HOME_CHANNEL": "QQ_HOME_CHANNEL",
 108  }
 109  
 110  from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
 111  
 112  # Sentinel: when a cron agent has nothing new to report, it can start its
 113  # response with this marker to suppress delivery.  Output is still saved
 114  # locally for audit.
 115  SILENT_MARKER = "[SILENT]"
 116  
 117  # Resolve Hermes home directory (respects HERMES_HOME override)
 118  _hermes_home = get_hermes_home()
 119  
 120  # File-based lock prevents concurrent ticks from gateway + daemon + systemd timer
 121  _LOCK_DIR = _hermes_home / "cron"
 122  _LOCK_FILE = _LOCK_DIR / ".tick.lock"
 123  
 124  
 125  def _resolve_origin(job: dict) -> Optional[dict]:
 126      """Extract origin info from a job, preserving any extra routing metadata.
 127  
 128      Treats non-dict origins (free-form provenance strings, ints, lists from
 129      migration scripts or hand-edited jobs.json) as missing instead of
 130      crashing with ``AttributeError`` on ``origin.get(...)``. Without this
 131      guard, a job tagged with e.g. ``"combined-digest-replaces-x-and-y"``
 132      crashed every fire attempt with
 133      ``'str' object has no attribute 'get'`` — ``mark_job_run`` recorded the
 134      failure, but the next tick re-loaded the same poisoned origin and
 135      crashed identically until the field was patched manually (#18722).
 136      """
 137      origin = job.get("origin")
 138      if not isinstance(origin, dict):
 139          return None
 140      platform = origin.get("platform")
 141      chat_id = origin.get("chat_id")
 142      if platform and chat_id:
 143          return origin
 144      return None
 145  
 146  
 147  def _get_home_target_chat_id(platform_name: str) -> str:
 148      """Return the configured home target chat/room ID for a delivery platform."""
 149      env_var = _HOME_TARGET_ENV_VARS.get(platform_name.lower())
 150      if not env_var:
 151          return ""
 152      value = os.getenv(env_var, "")
 153      if not value:
 154          legacy = _LEGACY_HOME_TARGET_ENV_VARS.get(env_var)
 155          if legacy:
 156              value = os.getenv(legacy, "")
 157      return value
 158  
 159  
 160  def _get_home_target_thread_id(platform_name: str) -> Optional[str]:
 161      """Return the optional thread/topic ID for a platform home target."""
 162      env_var = _HOME_TARGET_ENV_VARS.get(platform_name.lower())
 163      if not env_var:
 164          return None
 165      value = os.getenv(f"{env_var}_THREAD_ID", "").strip()
 166      if not value:
 167          legacy = _LEGACY_HOME_TARGET_ENV_VARS.get(env_var)
 168          if legacy:
 169              value = os.getenv(f"{legacy}_THREAD_ID", "").strip()
 170      return value or None
 171  
 172  
 173  def _resolve_single_delivery_target(job: dict, deliver_value: str) -> Optional[dict]:
 174      """Resolve one concrete auto-delivery target for a cron job."""
 175  
 176      origin = _resolve_origin(job)
 177  
 178      if deliver_value == "local":
 179          return None
 180  
 181      if deliver_value == "origin":
 182          if origin:
 183              return {
 184                  "platform": origin["platform"],
 185                  "chat_id": str(origin["chat_id"]),
 186                  "thread_id": origin.get("thread_id"),
 187              }
 188          # Origin missing (e.g. job created via API/script) — try each
 189          # platform's home channel as a fallback instead of silently dropping.
 190          for platform_name in _HOME_TARGET_ENV_VARS:
 191              chat_id = _get_home_target_chat_id(platform_name)
 192              if chat_id:
 193                  logger.info(
 194                      "Job '%s' has deliver=origin but no origin; falling back to %s home channel",
 195                      job.get("name", job.get("id", "?")),
 196                      platform_name,
 197                  )
 198                  return {
 199                      "platform": platform_name,
 200                      "chat_id": chat_id,
 201                      "thread_id": _get_home_target_thread_id(platform_name),
 202                  }
 203          return None
 204  
 205      if ":" in deliver_value:
 206          platform_name, rest = deliver_value.split(":", 1)
 207          platform_key = platform_name.lower()
 208  
 209          from tools.send_message_tool import _parse_target_ref
 210  
 211          parsed_chat_id, parsed_thread_id, is_explicit = _parse_target_ref(platform_key, rest)
 212          if is_explicit:
 213              chat_id, thread_id = parsed_chat_id, parsed_thread_id
 214          else:
 215              chat_id, thread_id = rest, None
 216  
 217          # Resolve human-friendly labels like "Alice (dm)" to real IDs.
 218          try:
 219              from gateway.channel_directory import resolve_channel_name
 220              resolved = resolve_channel_name(platform_key, chat_id)
 221              if resolved:
 222                  parsed_chat_id, parsed_thread_id, resolved_is_explicit = _parse_target_ref(platform_key, resolved)
 223                  if resolved_is_explicit:
 224                      chat_id = parsed_chat_id
 225                      if parsed_thread_id is not None:
 226                          thread_id = parsed_thread_id
 227                  else:
 228                      chat_id = resolved
 229          except Exception:
 230              pass
 231  
 232          return {
 233              "platform": platform_name,
 234              "chat_id": chat_id,
 235              "thread_id": thread_id,
 236          }
 237  
 238      platform_name = deliver_value
 239      if origin and origin.get("platform") == platform_name:
 240          return {
 241              "platform": platform_name,
 242              "chat_id": str(origin["chat_id"]),
 243              "thread_id": origin.get("thread_id"),
 244          }
 245  
 246      if platform_name.lower() not in _KNOWN_DELIVERY_PLATFORMS:
 247          return None
 248      chat_id = _get_home_target_chat_id(platform_name)
 249      if not chat_id:
 250          return None
 251  
 252      return {
 253          "platform": platform_name,
 254          "chat_id": chat_id,
 255          "thread_id": _get_home_target_thread_id(platform_name),
 256      }
 257  
 258  
 259  def _normalize_deliver_value(deliver) -> str:
 260      """Normalize a stored/submitted ``deliver`` value to its canonical string form.
 261  
 262      The contract is that ``deliver`` is a string (``"local"``, ``"origin"``,
 263      ``"telegram"``, ``"telegram:-1001:17"``, or comma-separated combinations).
 264      Historically some callers — MCP clients passing an array, direct edits of
 265      ``jobs.json``, or stale code paths — have stored a list/tuple like
 266      ``["telegram"]``.  ``str(["telegram"])`` would serialize to the literal
 267      string ``"['telegram']"``, which is not a known platform and fails
 268      resolution silently.  Flatten lists/tuples into a comma-separated string
 269      so both forms work.  Returns ``"local"`` for anything falsy.
 270      """
 271      if deliver is None or deliver == "":
 272          return "local"
 273      if isinstance(deliver, (list, tuple)):
 274          parts = [str(p).strip() for p in deliver if str(p).strip()]
 275          return ",".join(parts) if parts else "local"
 276      return str(deliver)
 277  
 278  
 279  def _resolve_delivery_targets(job: dict) -> List[dict]:
 280      """Resolve all concrete auto-delivery targets for a cron job (supports comma-separated deliver)."""
 281      deliver = _normalize_deliver_value(job.get("deliver", "local"))
 282      if deliver == "local":
 283          return []
 284      parts = [p.strip() for p in deliver.split(",") if p.strip()]
 285      seen = set()
 286      targets = []
 287      for part in parts:
 288          target = _resolve_single_delivery_target(job, part)
 289          if target:
 290              key = (target["platform"].lower(), str(target["chat_id"]), target.get("thread_id"))
 291              if key not in seen:
 292                  seen.add(key)
 293                  targets.append(target)
 294      return targets
 295  
 296  
 297  def _resolve_delivery_target(job: dict) -> Optional[dict]:
 298      """Resolve the concrete auto-delivery target for a cron job, if any."""
 299      targets = _resolve_delivery_targets(job)
 300      return targets[0] if targets else None
 301  
 302  
 303  # Media extension sets — audio routing is centralized in gateway.platforms.base
 304  # via should_send_media_as_audio() so Telegram-specific rules stay in one place.
 305  _VIDEO_EXTS = frozenset({'.mp4', '.mov', '.avi', '.mkv', '.webm', '.3gp'})
 306  _IMAGE_EXTS = frozenset({'.jpg', '.jpeg', '.png', '.webp', '.gif'})
 307  
 308  
 309  def _send_media_via_adapter(
 310      adapter,
 311      chat_id: str,
 312      media_files: list,
 313      metadata: dict | None,
 314      loop,
 315      job: dict,
 316      platform=None,
 317  ) -> None:
 318      """Send extracted MEDIA files as native platform attachments via a live adapter.
 319  
 320      Routes each file to the appropriate adapter method (send_voice, send_image_file,
 321      send_video, send_document) based on file extension — mirroring the routing logic
 322      in ``BasePlatformAdapter._process_message_background``.
 323      """
 324      from pathlib import Path
 325  
 326      from gateway.platforms.base import should_send_media_as_audio
 327  
 328      for media_path, _is_voice in media_files:
 329          try:
 330              ext = Path(media_path).suffix.lower()
 331              route_platform = platform if platform is not None else getattr(adapter, "platform", None)
 332              if should_send_media_as_audio(route_platform, ext, is_voice=_is_voice):
 333                  coro = adapter.send_voice(chat_id=chat_id, audio_path=media_path, metadata=metadata)
 334              elif ext in _VIDEO_EXTS:
 335                  coro = adapter.send_video(chat_id=chat_id, video_path=media_path, metadata=metadata)
 336              elif ext in _IMAGE_EXTS:
 337                  coro = adapter.send_image_file(chat_id=chat_id, image_path=media_path, metadata=metadata)
 338              else:
 339                  coro = adapter.send_document(chat_id=chat_id, file_path=media_path, metadata=metadata)
 340  
 341              future = asyncio.run_coroutine_threadsafe(coro, loop)
 342              try:
 343                  result = future.result(timeout=30)
 344              except TimeoutError:
 345                  future.cancel()
 346                  raise
 347              if result and not getattr(result, "success", True):
 348                  logger.warning(
 349                      "Job '%s': media send failed for %s: %s",
 350                      job.get("id", "?"), media_path, getattr(result, "error", "unknown"),
 351                  )
 352          except Exception as e:
 353              logger.warning("Job '%s': failed to send media %s: %s", job.get("id", "?"), media_path, e)
 354  
 355  
 356  def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Optional[str]:
 357      """
 358      Deliver job output to the configured target(s) (origin chat, specific platform, etc.).
 359  
 360      When ``adapters`` and ``loop`` are provided (gateway is running), tries to
 361      use the live adapter first — this supports E2EE rooms (e.g. Matrix) where
 362      the standalone HTTP path cannot encrypt.  Falls back to standalone send if
 363      the adapter path fails or is unavailable.
 364  
 365      Returns None on success, or an error string on failure.
 366      """
 367      targets = _resolve_delivery_targets(job)
 368      if not targets:
 369          if job.get("deliver", "local") != "local":
 370              msg = f"no delivery target resolved for deliver={job.get('deliver', 'local')}"
 371              logger.warning("Job '%s': %s", job["id"], msg)
 372              return msg
 373          return None  # local-only jobs don't deliver — not a failure
 374  
 375      from tools.send_message_tool import _send_to_platform
 376      from gateway.config import load_gateway_config, Platform
 377  
 378      # Optionally wrap the content with a header/footer so the user knows this
 379      # is a cron delivery.  Wrapping is on by default; set cron.wrap_response: false
 380      # in config.yaml for clean output.
 381      wrap_response = True
 382      try:
 383          user_cfg = load_config()
 384          wrap_response = user_cfg.get("cron", {}).get("wrap_response", True)
 385      except Exception:
 386          pass
 387  
 388      if wrap_response:
 389          task_name = job.get("name", job["id"])
 390          job_id = job.get("id", "")
 391          delivery_content = (
 392              f"Cronjob Response: {task_name}\n"
 393              f"(job_id: {job_id})\n"
 394              f"-------------\n\n"
 395              f"{content}\n\n"
 396              f"To stop or manage this job, send me a new message (e.g. \"stop reminder {task_name}\")."
 397          )
 398      else:
 399          delivery_content = content
 400  
 401      # Extract MEDIA: tags so attachments are forwarded as files, not raw text
 402      from gateway.platforms.base import BasePlatformAdapter
 403      media_files, cleaned_delivery_content = BasePlatformAdapter.extract_media(delivery_content)
 404  
 405      try:
 406          config = load_gateway_config()
 407      except Exception as e:
 408          msg = f"failed to load gateway config: {e}"
 409          logger.error("Job '%s': %s", job["id"], msg)
 410          return msg
 411  
 412      delivery_errors = []
 413  
 414      for target in targets:
 415          platform_name = target["platform"]
 416          chat_id = target["chat_id"]
 417          thread_id = target.get("thread_id")
 418  
 419          # Diagnostic: log thread_id for topic-aware delivery debugging
 420          origin = _resolve_origin(job) or {}
 421          origin_thread = origin.get("thread_id")
 422          if origin_thread and not thread_id:
 423              logger.warning(
 424                  "Job '%s': origin has thread_id=%s but delivery target lost it "
 425                  "(deliver=%s, target=%s)",
 426                  job["id"], origin_thread, job.get("deliver", "local"), target,
 427              )
 428          elif thread_id:
 429              logger.debug(
 430                  "Job '%s': delivering to %s:%s thread_id=%s",
 431                  job["id"], platform_name, chat_id, thread_id,
 432              )
 433  
 434          # Built-in names resolve to their enum member; plugin platform names
 435          # create dynamic members via Platform._missing_().
 436          try:
 437              platform = Platform(platform_name.lower())
 438          except (ValueError, KeyError):
 439              msg = f"unknown platform '{platform_name}'"
 440              logger.warning("Job '%s': %s", job["id"], msg)
 441              delivery_errors.append(msg)
 442              continue
 443  
 444          pconfig = config.platforms.get(platform)
 445          if not pconfig or not pconfig.enabled:
 446              msg = f"platform '{platform_name}' not configured/enabled"
 447              logger.warning("Job '%s': %s", job["id"], msg)
 448              delivery_errors.append(msg)
 449              continue
 450  
 451          # Prefer the live adapter when the gateway is running — this supports E2EE
 452          # rooms (e.g. Matrix) where the standalone HTTP path cannot encrypt.
 453          runtime_adapter = (adapters or {}).get(platform)
 454          delivered = False
 455          if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
 456              send_metadata = {"thread_id": thread_id} if thread_id else None
 457              try:
 458                  # Send cleaned text (MEDIA tags stripped) — not the raw content
 459                  text_to_send = cleaned_delivery_content.strip()
 460                  adapter_ok = True
 461                  if text_to_send:
 462                      future = asyncio.run_coroutine_threadsafe(
 463                          runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata),
 464                          loop,
 465                      )
 466                      try:
 467                          send_result = future.result(timeout=60)
 468                      except TimeoutError:
 469                          future.cancel()
 470                          raise
 471                      if send_result and not getattr(send_result, "success", True):
 472                          err = getattr(send_result, "error", "unknown")
 473                          logger.warning(
 474                              "Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone",
 475                              job["id"], platform_name, chat_id, err,
 476                          )
 477                          adapter_ok = False  # fall through to standalone path
 478  
 479                  # Send extracted media files as native attachments via the live adapter
 480                  if adapter_ok and media_files:
 481                      _send_media_via_adapter(
 482                          runtime_adapter,
 483                          chat_id,
 484                          media_files,
 485                          send_metadata,
 486                          loop,
 487                          job,
 488                          platform=platform,
 489                      )
 490  
 491                  if adapter_ok:
 492                      logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id)
 493                      delivered = True
 494              except Exception as e:
 495                  logger.warning(
 496                      "Job '%s': live adapter delivery to %s:%s failed (%s), falling back to standalone",
 497                      job["id"], platform_name, chat_id, e,
 498                  )
 499  
 500          if not delivered:
 501              # Standalone path: run the async send in a fresh event loop (safe from any thread)
 502              coro = _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files)
 503              try:
 504                  result = asyncio.run(coro)
 505              except RuntimeError:
 506                  # asyncio.run() checks for a running loop before awaiting the coroutine;
 507                  # when it raises, the original coro was never started — close it to
 508                  # prevent "coroutine was never awaited" RuntimeWarning, then retry in a
 509                  # fresh thread that has no running loop.
 510                  coro.close()
 511                  with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
 512                      future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files))
 513                      result = future.result(timeout=30)
 514              except Exception as e:
 515                  msg = f"delivery to {platform_name}:{chat_id} failed: {e}"
 516                  logger.error("Job '%s': %s", job["id"], msg)
 517                  delivery_errors.append(msg)
 518                  continue
 519  
 520              if result and result.get("error"):
 521                  msg = f"delivery error: {result['error']}"
 522                  logger.error("Job '%s': %s", job["id"], msg)
 523                  delivery_errors.append(msg)
 524                  continue
 525  
 526              logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
 527  
 528      if delivery_errors:
 529          return "; ".join(delivery_errors)
 530      return None
 531  
 532  
 533  _DEFAULT_SCRIPT_TIMEOUT = 120  # seconds
 534  # Backward-compatible module override used by tests and emergency monkeypatches.
 535  _SCRIPT_TIMEOUT = _DEFAULT_SCRIPT_TIMEOUT
 536  
 537  
 538  def _get_script_timeout() -> int:
 539      """Resolve cron pre-run script timeout from module/env/config with a safe default."""
 540      if _SCRIPT_TIMEOUT != _DEFAULT_SCRIPT_TIMEOUT:
 541          try:
 542              timeout = int(float(_SCRIPT_TIMEOUT))
 543              if timeout > 0:
 544                  return timeout
 545          except Exception:
 546              logger.warning("Invalid patched _SCRIPT_TIMEOUT=%r; using env/config/default", _SCRIPT_TIMEOUT)
 547  
 548      env_value = os.getenv("HERMES_CRON_SCRIPT_TIMEOUT", "").strip()
 549      if env_value:
 550          try:
 551              timeout = int(float(env_value))
 552              if timeout > 0:
 553                  return timeout
 554          except Exception:
 555              logger.warning("Invalid HERMES_CRON_SCRIPT_TIMEOUT=%r; using config/default", env_value)
 556  
 557      try:
 558          cfg = load_config() or {}
 559          cron_cfg = cfg.get("cron", {}) if isinstance(cfg, dict) else {}
 560          configured = cron_cfg.get("script_timeout_seconds")
 561          if configured is not None:
 562              timeout = int(float(configured))
 563              if timeout > 0:
 564                  return timeout
 565      except Exception as exc:
 566          logger.debug("Failed to load cron script timeout from config: %s", exc)
 567  
 568      return _DEFAULT_SCRIPT_TIMEOUT
 569  
 570  
 571  def _run_job_script(script_path: str) -> tuple[bool, str]:
 572      """Execute a cron job's data-collection script and capture its output.
 573  
 574      Scripts must reside within HERMES_HOME/scripts/.  Both relative and
 575      absolute paths are resolved and validated against this directory to
 576      prevent arbitrary script execution via path traversal or absolute
 577      path injection.
 578  
 579      Args:
 580          script_path: Path to a Python script.  Relative paths are resolved
 581              against HERMES_HOME/scripts/.  Absolute and ~-prefixed paths
 582              are also validated to ensure they stay within the scripts dir.
 583  
 584      Returns:
 585          (success, output) — on failure *output* contains the error message so the
 586          LLM can report the problem to the user.
 587      """
 588      from hermes_constants import get_hermes_home
 589  
 590      scripts_dir = get_hermes_home() / "scripts"
 591      scripts_dir.mkdir(parents=True, exist_ok=True)
 592      scripts_dir_resolved = scripts_dir.resolve()
 593  
 594      raw = Path(script_path).expanduser()
 595      if raw.is_absolute():
 596          path = raw.resolve()
 597      else:
 598          path = (scripts_dir / raw).resolve()
 599  
 600      # Guard against path traversal, absolute path injection, and symlink
 601      # escape — scripts MUST reside within HERMES_HOME/scripts/.
 602      try:
 603          path.relative_to(scripts_dir_resolved)
 604      except ValueError:
 605          return False, (
 606              f"Blocked: script path resolves outside the scripts directory "
 607              f"({scripts_dir_resolved}): {script_path!r}"
 608          )
 609  
 610      if not path.exists():
 611          return False, f"Script not found: {path}"
 612      if not path.is_file():
 613          return False, f"Script path is not a file: {path}"
 614  
 615      script_timeout = _get_script_timeout()
 616  
 617      try:
 618          result = subprocess.run(
 619              [sys.executable, str(path)],
 620              capture_output=True,
 621              text=True,
 622              timeout=script_timeout,
 623              cwd=str(path.parent),
 624          )
 625          stdout = (result.stdout or "").strip()
 626          stderr = (result.stderr or "").strip()
 627  
 628          # Redact secrets from both stdout and stderr before any return path.
 629          try:
 630              from agent.redact import redact_sensitive_text
 631              stdout = redact_sensitive_text(stdout)
 632              stderr = redact_sensitive_text(stderr)
 633          except Exception:
 634              pass
 635  
 636          if result.returncode != 0:
 637              parts = [f"Script exited with code {result.returncode}"]
 638              if stderr:
 639                  parts.append(f"stderr:\n{stderr}")
 640              if stdout:
 641                  parts.append(f"stdout:\n{stdout}")
 642              return False, "\n".join(parts)
 643  
 644          return True, stdout
 645  
 646      except subprocess.TimeoutExpired:
 647          return False, f"Script timed out after {script_timeout}s: {path}"
 648      except Exception as exc:
 649          return False, f"Script execution failed: {exc}"
 650  
 651  
 652  def _parse_wake_gate(script_output: str) -> bool:
 653      """Parse the last non-empty stdout line of a cron job's pre-check script
 654      as a wake gate.
 655  
 656      The convention (ported from nanoclaw #1232): if the last stdout line is
 657      JSON like ``{"wakeAgent": false}``, the agent is skipped entirely — no
 658      LLM run, no delivery. Any other output (non-JSON, missing flag, gate
 659      absent, or ``wakeAgent: true``) means wake the agent normally.
 660  
 661      Returns True if the agent should wake, False to skip.
 662      """
 663      if not script_output:
 664          return True
 665      stripped_lines = [line for line in script_output.splitlines() if line.strip()]
 666      if not stripped_lines:
 667          return True
 668      last_line = stripped_lines[-1].strip()
 669      try:
 670          gate = json.loads(last_line)
 671      except (json.JSONDecodeError, ValueError):
 672          return True
 673      if not isinstance(gate, dict):
 674          return True
 675      return gate.get("wakeAgent", True) is not False
 676  
 677  
 678  def _build_job_prompt(job: dict, prerun_script: Optional[tuple] = None) -> str:
 679      """Build the effective prompt for a cron job, optionally loading one or more skills first.
 680  
 681      Args:
 682          job: The cron job dict.
 683          prerun_script: Optional ``(success, stdout)`` from a script that has
 684              already been executed by the caller (e.g. for a wake-gate check).
 685              When provided, the script is not re-executed and the cached
 686              result is used for prompt injection. When omitted, the script
 687              (if any) runs inline as before.
 688      """
 689      prompt = job.get("prompt", "")
 690      skills = job.get("skills")
 691  
 692      # Run data-collection script if configured, inject output as context.
 693      script_path = job.get("script")
 694      if script_path:
 695          if prerun_script is not None:
 696              success, script_output = prerun_script
 697          else:
 698              success, script_output = _run_job_script(script_path)
 699          if success:
 700              if script_output:
 701                  prompt = (
 702                      "## Script Output\n"
 703                      "The following data was collected by a pre-run script. "
 704                      "Use it as context for your analysis.\n\n"
 705                      f"```\n{script_output}\n```\n\n"
 706                      f"{prompt}"
 707                  )
 708              else:
 709                  # Script produced no output — nothing to report, skip AI call.
 710                  return None
 711          else:
 712              prompt = (
 713                  "## Script Error\n"
 714                  "The data-collection script failed. Report this to the user.\n\n"
 715                  f"```\n{script_output}\n```\n\n"
 716                  f"{prompt}"
 717              )
 718  
 719      # Inject output from referenced cron jobs as context.
 720      context_from = job.get("context_from")
 721      if context_from:
 722          from cron.jobs import OUTPUT_DIR
 723          if isinstance(context_from, str):
 724              context_from = [context_from]
 725          for source_job_id in context_from:
 726              # Guard against path traversal — valid job IDs are 12-char hex strings
 727              if not source_job_id or not all(c in "0123456789abcdef" for c in source_job_id):
 728                  logger.warning("context_from: skipping invalid job_id %r", source_job_id)
 729                  continue
 730              try:
 731                  job_output_dir = OUTPUT_DIR / source_job_id
 732                  if not job_output_dir.exists():
 733                      continue  # silent skip — no output yet
 734                  output_files = sorted(
 735                      job_output_dir.glob("*.md"),
 736                      key=lambda f: f.stat().st_mtime,
 737                      reverse=True,
 738                  )
 739                  if not output_files:
 740                      continue  # silent skip — no output yet
 741                  latest_output = output_files[0].read_text(encoding="utf-8").strip()
 742                  # Truncate to 8K characters to avoid prompt bloat
 743                  _MAX_CONTEXT_CHARS = 8000
 744                  if len(latest_output) > _MAX_CONTEXT_CHARS:
 745                      latest_output = latest_output[:_MAX_CONTEXT_CHARS] + "\n\n[... output truncated ...]"
 746                  if latest_output:
 747                      prompt = (
 748                          f"## Output from job '{source_job_id}'\n"
 749                          "The following is the most recent output from a preceding "
 750                          "cron job. Use it as context for your analysis.\n\n"
 751                          f"```\n{latest_output}\n```\n\n"
 752                          f"{prompt}"
 753                      )
 754                  else:
 755                      continue  # silent skip — empty output
 756              except (OSError, PermissionError) as e:
 757                  logger.warning("context_from: failed to read output for job %r: %s", source_job_id, e)
 758                  # silent skip — do not pollute the prompt with error messages
 759  
 760      # Always prepend cron execution guidance so the agent knows how
 761      # delivery works and can suppress delivery when appropriate.
 762      cron_hint = (
 763          "[IMPORTANT: You are running as a scheduled cron job. "
 764          "DELIVERY: Your final response will be automatically delivered "
 765          "to the user — do NOT use send_message or try to deliver "
 766          "the output yourself. Just produce your report/output as your "
 767          "final response and the system handles the rest. "
 768          "SILENT: If there is genuinely nothing new to report, respond "
 769          "with exactly \"[SILENT]\" (nothing else) to suppress delivery. "
 770          "Never combine [SILENT] with content — either report your "
 771          "findings normally, or say [SILENT] and nothing more.]\n\n"
 772      )
 773      prompt = cron_hint + prompt
 774      if skills is None:
 775          legacy = job.get("skill")
 776          skills = [legacy] if legacy else []
 777  
 778      skill_names = [str(name).strip() for name in skills if str(name).strip()]
 779      if not skill_names:
 780          return prompt
 781  
 782      from tools.skills_tool import skill_view
 783      from tools.skill_usage import bump_use
 784  
 785      parts = []
 786      skipped: list[str] = []
 787      for skill_name in skill_names:
 788          loaded = json.loads(skill_view(skill_name))
 789          if not loaded.get("success"):
 790              error = loaded.get("error") or f"Failed to load skill '{skill_name}'"
 791              logger.warning("Cron job '%s': skill not found, skipping — %s", job.get("name", job.get("id")), error)
 792              skipped.append(skill_name)
 793              continue
 794  
 795          # Bump usage so the curator sees this skill as actively used.
 796          try:
 797              bump_use(skill_name)
 798          except Exception:
 799              logger.debug("Cron job: failed to bump skill usage for '%s'", skill_name, exc_info=True)
 800  
 801          content = str(loaded.get("content") or "").strip()
 802          if parts:
 803              parts.append("")
 804          parts.extend(
 805              [
 806                  f'[IMPORTANT: The user has invoked the "{skill_name}" skill, indicating they want you to follow its instructions. The full skill content is loaded below.]',
 807                  "",
 808                  content,
 809              ]
 810          )
 811  
 812      if skipped:
 813          notice = (
 814              f"[IMPORTANT: The following skill(s) were listed for this job but could not be found "
 815              f"and were skipped: {', '.join(skipped)}. "
 816              f"Start your response with a brief notice so the user is aware, e.g.: "
 817              f"'⚠️ Skill(s) not found and skipped: {', '.join(skipped)}']"
 818          )
 819          parts.insert(0, notice)
 820  
 821      if prompt:
 822          parts.extend(["", f"The user has provided the following instruction alongside the skill invocation: {prompt}"])
 823      return "\n".join(parts)
 824  
 825  
 826  def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
 827      """
 828      Execute a single cron job.
 829      
 830      Returns:
 831          Tuple of (success, full_output_doc, final_response, error_message)
 832      """
 833      from run_agent import AIAgent
 834      
 835      # Initialize SQLite session store so cron job messages are persisted
 836      # and discoverable via session_search (same pattern as gateway/run.py).
 837      _session_db = None
 838      try:
 839          from hermes_state import SessionDB
 840          _session_db = SessionDB()
 841      except Exception as e:
 842          logger.debug("Job '%s': SQLite session store not available: %s", job.get("id", "?"), e)
 843      
 844      job_id = job["id"]
 845      job_name = job["name"]
 846  
 847      # Wake-gate: if this job has a pre-check script, run it BEFORE building
 848      # the prompt so a ``{"wakeAgent": false}`` response can short-circuit
 849      # the whole agent run. We pass the result into _build_job_prompt so
 850      # the script is only executed once.
 851      prerun_script = None
 852      script_path = job.get("script")
 853      if script_path:
 854          prerun_script = _run_job_script(script_path)
 855          _ran_ok, _script_output = prerun_script
 856          if _ran_ok and not _parse_wake_gate(_script_output):
 857              logger.info(
 858                  "Job '%s' (ID: %s): wakeAgent=false, skipping agent run",
 859                  job_name, job_id,
 860              )
 861              silent_doc = (
 862                  f"# Cron Job: {job_name}\n\n"
 863                  f"**Job ID:** {job_id}\n"
 864                  f"**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
 865                  "Script gate returned `wakeAgent=false` — agent skipped.\n"
 866              )
 867              return True, silent_doc, SILENT_MARKER, None
 868  
 869      prompt = _build_job_prompt(job, prerun_script=prerun_script)
 870      if prompt is None:
 871          logger.info("Job '%s': script produced no output, skipping AI call.", job_name)
 872          return True, "", SILENT_MARKER, None
 873      origin = _resolve_origin(job)
 874      _cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
 875  
 876      logger.info("Running job '%s' (ID: %s)", job_name, job_id)
 877      logger.info("Prompt: %s", prompt[:100])
 878  
 879      agent = None
 880  
 881      # Mark this as a cron session so the approval system can apply cron_mode.
 882      # This env var is process-wide and persists for the lifetime of the
 883      # scheduler process — every job this process runs is a cron job.
 884      os.environ["HERMES_CRON_SESSION"] = "1"
 885  
 886      # Use ContextVars for per-job session/delivery state so parallel jobs
 887      # don't clobber each other's targets (os.environ is process-global).
 888      from gateway.session_context import set_session_vars, clear_session_vars, _VAR_MAP
 889  
 890      _ctx_tokens = set_session_vars(
 891          platform=origin["platform"] if origin else "",
 892          chat_id=str(origin["chat_id"]) if origin else "",
 893          chat_name=origin.get("chat_name", "") if origin else "",
 894      )
 895      _cron_delivery_vars = (
 896          "HERMES_CRON_AUTO_DELIVER_PLATFORM",
 897          "HERMES_CRON_AUTO_DELIVER_CHAT_ID",
 898          "HERMES_CRON_AUTO_DELIVER_THREAD_ID",
 899      )
 900      for _var_name in _cron_delivery_vars:
 901          _VAR_MAP[_var_name].set("")
 902  
 903      # Per-job working directory.  When set (and validated at create/update
 904      # time), we point TERMINAL_CWD at it so:
 905      #   - build_context_files_prompt() picks up AGENTS.md / CLAUDE.md /
 906      #     .cursorrules from the job's project dir, AND
 907      #   - the terminal, file, and code-exec tools run commands from there.
 908      #
 909      # tick() serializes workdir-jobs outside the parallel pool, so mutating
 910      # os.environ["TERMINAL_CWD"] here is safe for those jobs.  For workdir-less
 911      # jobs we leave TERMINAL_CWD untouched — preserves the original behaviour
 912      # (skip_context_files=True, tools use whatever cwd the scheduler has).
 913      _job_workdir = (job.get("workdir") or "").strip() or None
 914      if _job_workdir and not Path(_job_workdir).is_dir():
 915          # Directory was removed between create-time validation and now.  Log
 916          # and drop back to old behaviour rather than crashing the job.
 917          logger.warning(
 918              "Job '%s': configured workdir %r no longer exists — running without it",
 919              job_id, _job_workdir,
 920          )
 921          _job_workdir = None
 922      _prior_terminal_cwd = os.environ.get("TERMINAL_CWD", "_UNSET_")
 923      if _job_workdir:
 924          os.environ["TERMINAL_CWD"] = _job_workdir
 925          logger.info("Job '%s': using workdir %s", job_id, _job_workdir)
 926  
 927      try:
 928          # Re-read .env and config.yaml fresh every run so provider/key
 929          # changes take effect without a gateway restart.
 930          from dotenv import load_dotenv
 931          try:
 932              load_dotenv(str(_hermes_home / ".env"), override=True, encoding="utf-8")
 933          except UnicodeDecodeError:
 934              load_dotenv(str(_hermes_home / ".env"), override=True, encoding="latin-1")
 935  
 936          delivery_target = _resolve_delivery_target(job)
 937          if delivery_target:
 938              _VAR_MAP["HERMES_CRON_AUTO_DELIVER_PLATFORM"].set(delivery_target["platform"])
 939              _VAR_MAP["HERMES_CRON_AUTO_DELIVER_CHAT_ID"].set(str(delivery_target["chat_id"]))
 940              _VAR_MAP["HERMES_CRON_AUTO_DELIVER_THREAD_ID"].set(
 941                  ""
 942                  if delivery_target.get("thread_id") is None
 943                  else str(delivery_target["thread_id"])
 944              )
 945  
 946          model = job.get("model") or os.getenv("HERMES_MODEL") or ""
 947  
 948          # Load config.yaml for model, reasoning, prefill, toolsets, provider routing
 949          _cfg = {}
 950          try:
 951              import yaml
 952              _cfg_path = str(_hermes_home / "config.yaml")
 953              if os.path.exists(_cfg_path):
 954                  with open(_cfg_path) as _f:
 955                      _cfg = yaml.safe_load(_f) or {}
 956                  _model_cfg = _cfg.get("model", {})
 957                  if not job.get("model"):
 958                      if isinstance(_model_cfg, str):
 959                          model = _model_cfg
 960                      elif isinstance(_model_cfg, dict):
 961                          model = _model_cfg.get("default", model)
 962          except Exception as e:
 963              logger.warning("Job '%s': failed to load config.yaml, using defaults: %s", job_id, e)
 964  
 965          # Apply IPv4 preference if configured.
 966          try:
 967              from hermes_constants import apply_ipv4_preference
 968              _net_cfg = _cfg.get("network", {})
 969              if isinstance(_net_cfg, dict) and _net_cfg.get("force_ipv4"):
 970                  apply_ipv4_preference(force=True)
 971          except Exception:
 972              pass
 973  
 974          # Reasoning config from config.yaml
 975          from hermes_constants import parse_reasoning_effort
 976          effort = str(_cfg.get("agent", {}).get("reasoning_effort", "")).strip()
 977          reasoning_config = parse_reasoning_effort(effort)
 978  
 979          # Prefill messages from env or config.yaml
 980          prefill_messages = None
 981          prefill_file = os.getenv("HERMES_PREFILL_MESSAGES_FILE", "") or _cfg.get("prefill_messages_file", "")
 982          if prefill_file:
 983              pfpath = Path(prefill_file).expanduser()
 984              if not pfpath.is_absolute():
 985                  pfpath = _hermes_home / pfpath
 986              if pfpath.exists():
 987                  try:
 988                      with open(pfpath, "r", encoding="utf-8") as _pf:
 989                          prefill_messages = json.load(_pf)
 990                      if not isinstance(prefill_messages, list):
 991                          prefill_messages = None
 992                  except Exception as e:
 993                      logger.warning("Job '%s': failed to parse prefill messages file '%s': %s", job_id, pfpath, e)
 994                      prefill_messages = None
 995  
 996          # Max iterations
 997          max_iterations = _cfg.get("agent", {}).get("max_turns") or _cfg.get("max_turns") or 90
 998  
 999          # Provider routing
1000          pr = _cfg.get("provider_routing", {})
1001  
1002          from hermes_cli.runtime_provider import (
1003              resolve_runtime_provider,
1004              format_runtime_provider_error,
1005          )
1006          from hermes_cli.auth import AuthError
1007          try:
1008              # Do not inject HERMES_INFERENCE_PROVIDER here. resolve_runtime_provider()
1009              # already prefers persisted config over stale shell/env overrides when
1010              # no explicit provider is requested. Passing the env var here short-
1011              # circuits that precedence and can resurrect old providers (for
1012              # example DeepSeek) for cron jobs that do not pin provider/model.
1013              runtime_kwargs = {
1014                  "requested": job.get("provider"),
1015              }
1016              if job.get("base_url"):
1017                  runtime_kwargs["explicit_base_url"] = job.get("base_url")
1018              runtime = resolve_runtime_provider(**runtime_kwargs)
1019          except AuthError as auth_exc:
1020              # Primary provider auth failed — try fallback chain before giving up.
1021              logger.warning("Job '%s': primary auth failed (%s), trying fallback", job_id, auth_exc)
1022              fb = _cfg.get("fallback_providers") or _cfg.get("fallback_model")
1023              fb_list = (fb if isinstance(fb, list) else [fb]) if fb else []
1024              runtime = None
1025              for entry in fb_list:
1026                  if not isinstance(entry, dict):
1027                      continue
1028                  try:
1029                      fb_kwargs = {"requested": entry.get("provider")}
1030                      if entry.get("base_url"):
1031                          fb_kwargs["explicit_base_url"] = entry["base_url"]
1032                      if entry.get("api_key"):
1033                          fb_kwargs["explicit_api_key"] = entry["api_key"]
1034                      runtime = resolve_runtime_provider(**fb_kwargs)
1035                      logger.info("Job '%s': fallback resolved to %s", job_id, runtime.get("provider"))
1036                      break
1037                  except Exception as fb_exc:
1038                      logger.debug("Job '%s': fallback %s failed: %s", job_id, entry.get("provider"), fb_exc)
1039              if runtime is None:
1040                  raise RuntimeError(format_runtime_provider_error(auth_exc)) from auth_exc
1041          except Exception as exc:
1042              message = format_runtime_provider_error(exc)
1043              raise RuntimeError(message) from exc
1044  
1045          fallback_model = _cfg.get("fallback_providers") or _cfg.get("fallback_model") or None
1046          credential_pool = None
1047          runtime_provider = str(runtime.get("provider") or "").strip().lower()
1048          if runtime_provider:
1049              try:
1050                  from agent.credential_pool import load_pool
1051                  pool = load_pool(runtime_provider)
1052                  if pool.has_credentials():
1053                      credential_pool = pool
1054                      logger.info(
1055                          "Job '%s': loaded credential pool for provider %s with %d entries",
1056                          job_id,
1057                          runtime_provider,
1058                          len(pool.entries()),
1059                      )
1060              except Exception as e:
1061                  logger.debug("Job '%s': failed to load credential pool for %s: %s", job_id, runtime_provider, e)
1062  
1063          agent = AIAgent(
1064              model=model,
1065              api_key=runtime.get("api_key"),
1066              base_url=runtime.get("base_url"),
1067              provider=runtime.get("provider"),
1068              api_mode=runtime.get("api_mode"),
1069              acp_command=runtime.get("command"),
1070              acp_args=runtime.get("args"),
1071              max_iterations=max_iterations,
1072              reasoning_config=reasoning_config,
1073              prefill_messages=prefill_messages,
1074              fallback_model=fallback_model,
1075              credential_pool=credential_pool,
1076              providers_allowed=pr.get("only"),
1077              providers_ignored=pr.get("ignore"),
1078              providers_order=pr.get("order"),
1079              provider_sort=pr.get("sort"),
1080              enabled_toolsets=_resolve_cron_enabled_toolsets(job, _cfg),
1081              disabled_toolsets=["cronjob", "messaging", "clarify"],
1082              quiet_mode=True,
1083              # Cron jobs should always inherit the user's SOUL.md identity from
1084              # HERMES_HOME. When a workdir is configured, also inject project
1085              # context files (AGENTS.md / CLAUDE.md / .cursorrules) from there.
1086              # Without a workdir, keep cwd context discovery disabled.
1087              skip_context_files=not bool(_job_workdir),
1088              load_soul_identity=True,
1089              skip_memory=True,  # Cron system prompts would corrupt user representations
1090              platform="cron",
1091              session_id=_cron_session_id,
1092              session_db=_session_db,
1093          )
1094          
1095          # Run the agent with an *inactivity*-based timeout: the job can run
1096          # for hours if it's actively calling tools / receiving stream tokens,
1097          # but a hung API call or stuck tool with no activity for the configured
1098          # duration is caught and killed.  Default 600s (10 min inactivity);
1099          # override via HERMES_CRON_TIMEOUT env var.  0 = unlimited.
1100          #
1101          # Uses the agent's built-in activity tracker (updated by
1102          # _touch_activity() on every tool call, API call, and stream delta).
1103          _raw_cron_timeout = os.getenv("HERMES_CRON_TIMEOUT", "").strip()
1104          if _raw_cron_timeout:
1105              try:
1106                  _cron_timeout = float(_raw_cron_timeout)
1107              except (ValueError, TypeError):
1108                  logger.warning(
1109                      "Invalid HERMES_CRON_TIMEOUT=%r; using default 600s",
1110                      _raw_cron_timeout,
1111                  )
1112                  _cron_timeout = 600.0
1113          else:
1114              _cron_timeout = 600.0
1115          _cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
1116          _POLL_INTERVAL = 5.0
1117          _cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
1118          # Preserve scheduler-scoped ContextVar state (for example skill-declared
1119          # env passthrough registrations) when the cron run hops into the worker
1120          # thread used for inactivity timeout monitoring.
1121          _cron_context = contextvars.copy_context()
1122          _cron_future = _cron_pool.submit(_cron_context.run, agent.run_conversation, prompt)
1123          _inactivity_timeout = False
1124          try:
1125              if _cron_inactivity_limit is None:
1126                  # Unlimited — just wait for the result.
1127                  result = _cron_future.result()
1128              else:
1129                  result = None
1130                  while True:
1131                      done, _ = concurrent.futures.wait(
1132                          {_cron_future}, timeout=_POLL_INTERVAL,
1133                      )
1134                      if done:
1135                          result = _cron_future.result()
1136                          break
1137                      # Agent still running — check inactivity.
1138                      _idle_secs = 0.0
1139                      if hasattr(agent, "get_activity_summary"):
1140                          try:
1141                              _act = agent.get_activity_summary()
1142                              _idle_secs = _act.get("seconds_since_activity", 0.0)
1143                          except Exception:
1144                              pass
1145                      if _idle_secs >= _cron_inactivity_limit:
1146                          _inactivity_timeout = True
1147                          break
1148          except Exception:
1149              _cron_pool.shutdown(wait=False, cancel_futures=True)
1150              raise
1151          finally:
1152              _cron_pool.shutdown(wait=False, cancel_futures=True)
1153  
1154          if _inactivity_timeout:
1155              # Build diagnostic summary from the agent's activity tracker.
1156              _activity = {}
1157              if hasattr(agent, "get_activity_summary"):
1158                  try:
1159                      _activity = agent.get_activity_summary()
1160                  except Exception:
1161                      pass
1162              _last_desc = _activity.get("last_activity_desc", "unknown")
1163              _secs_ago = _activity.get("seconds_since_activity", 0)
1164              _cur_tool = _activity.get("current_tool")
1165              _iter_n = _activity.get("api_call_count", 0)
1166              _iter_max = _activity.get("max_iterations", 0)
1167  
1168              logger.error(
1169                  "Job '%s' idle for %.0fs (inactivity limit %.0fs) "
1170                  "| last_activity=%s | iteration=%s/%s | tool=%s",
1171                  job_name, _secs_ago, _cron_inactivity_limit,
1172                  _last_desc, _iter_n, _iter_max,
1173                  _cur_tool or "none",
1174              )
1175              if hasattr(agent, "interrupt"):
1176                  agent.interrupt("Cron job timed out (inactivity)")
1177              raise TimeoutError(
1178                  f"Cron job '{job_name}' idle for "
1179                  f"{int(_secs_ago)}s (limit {int(_cron_inactivity_limit)}s) "
1180                  f"— last activity: {_last_desc}"
1181              )
1182  
1183          # Guard against non-dict returns from run_conversation under error conditions
1184          if not isinstance(result, dict):
1185              raise RuntimeError(
1186                  f"agent.run_conversation returned {type(result).__name__} instead of dict: {result!r}"
1187              )
1188  
1189          # If the agent itself reported failure (e.g. all retries exhausted on
1190          # API errors, model abort, mid-run interrupt), do not silently mark the
1191          # job as successful. run_agent populates `failed=True`/`completed=False`
1192          # on these paths and may put the error into `final_response`, which
1193          # would otherwise be delivered as if it were the agent's reply and the
1194          # job's `last_status` set to "ok". Raise so the except handler below
1195          # builds the proper failure tuple. (issue #17855)
1196          if result.get("failed") is True or result.get("completed") is False:
1197              _err_text = (
1198                  result.get("error")
1199                  or (result.get("final_response") or "").strip()
1200                  or "agent reported failure"
1201              )
1202              raise RuntimeError(_err_text)
1203  
1204          final_response = result.get("final_response", "") or ""
1205          # Strip leaked placeholder text that upstream may inject on empty completions.
1206          if final_response.strip() == "(No response generated)":
1207              final_response = ""
1208          # Use a separate variable for log display; keep final_response clean
1209          # for delivery logic (empty response = no delivery).
1210          logged_response = final_response if final_response else "(No response generated)"
1211          
1212          output = f"""# Cron Job: {job_name}
1213  
1214  **Job ID:** {job_id}
1215  **Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
1216  **Schedule:** {job.get('schedule_display', 'N/A')}
1217  
1218  ## Prompt
1219  
1220  {prompt}
1221  
1222  ## Response
1223  
1224  {logged_response}
1225  """
1226          
1227          logger.info("Job '%s' completed successfully", job_name)
1228          return True, output, final_response, None
1229          
1230      except Exception as e:
1231          error_msg = f"{type(e).__name__}: {str(e)}"
1232          logger.exception("Job '%s' failed: %s", job_name, error_msg)
1233          
1234          output = f"""# Cron Job: {job_name} (FAILED)
1235  
1236  **Job ID:** {job_id}
1237  **Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
1238  **Schedule:** {job.get('schedule_display', 'N/A')}
1239  
1240  ## Prompt
1241  
1242  {prompt}
1243  
1244  ## Error
1245  
1246  ```
1247  {error_msg}
1248  ```
1249  """
1250          return False, output, "", error_msg
1251  
1252      finally:
1253          # Restore TERMINAL_CWD to whatever it was before this job ran.  We
1254          # only ever mutate it when the job has a workdir; see the setup block
1255          # at the top of run_job for the serialization guarantee.
1256          if _job_workdir:
1257              if _prior_terminal_cwd == "_UNSET_":
1258                  os.environ.pop("TERMINAL_CWD", None)
1259              else:
1260                  os.environ["TERMINAL_CWD"] = _prior_terminal_cwd
1261          # Clean up ContextVar session/delivery state for this job.
1262          clear_session_vars(_ctx_tokens)
1263          for _var_name in _cron_delivery_vars:
1264              _VAR_MAP[_var_name].set("")
1265          if _session_db:
1266              try:
1267                  _session_db.end_session(_cron_session_id, "cron_complete")
1268              except (Exception, KeyboardInterrupt) as e:
1269                  logger.debug("Job '%s': failed to end session: %s", job_id, e)
1270              try:
1271                  _session_db.close()
1272              except (Exception, KeyboardInterrupt) as e:
1273                  logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
1274          # Release subprocesses, terminal sandboxes, browser daemons, and the
1275          # main OpenAI/httpx client held by this ephemeral cron agent. Without
1276          # this, a gateway that ticks cron every N minutes leaks fds per job
1277          # until it hits EMFILE (#10200 / "too many open files").
1278          try:
1279              if agent is not None:
1280                  agent.close()
1281          except (Exception, KeyboardInterrupt) as e:
1282              logger.debug("Job '%s': failed to close agent resources: %s", job_id, e)
1283          # Each cron run spins up a short-lived worker thread whose event loop
1284          # dies as soon as the ``ThreadPoolExecutor`` shuts down. Any async
1285          # httpx clients cached under that loop are now unusable — reap them
1286          # so their transports don't accumulate in the process-global cache.
1287          try:
1288              from agent.auxiliary_client import cleanup_stale_async_clients
1289              cleanup_stale_async_clients()
1290          except Exception as e:
1291              logger.debug("Job '%s': failed to reap stale auxiliary clients: %s", job_id, e)
1292  
1293  
1294  def tick(verbose: bool = True, adapters=None, loop=None) -> int:
1295      """
1296      Check and run all due jobs.
1297      
1298      Uses a file lock so only one tick runs at a time, even if the gateway's
1299      in-process ticker and a standalone daemon or manual tick overlap.
1300      
1301      Args:
1302          verbose: Whether to print status messages
1303          adapters: Optional dict mapping Platform → live adapter (from gateway)
1304          loop: Optional asyncio event loop (from gateway) for live adapter sends
1305      
1306      Returns:
1307          Number of jobs executed (0 if another tick is already running)
1308      """
1309      _LOCK_DIR.mkdir(parents=True, exist_ok=True)
1310  
1311      # Cross-platform file locking: fcntl on Unix, msvcrt on Windows
1312      lock_fd = None
1313      try:
1314          lock_fd = open(_LOCK_FILE, "w")
1315          if fcntl:
1316              fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
1317          elif msvcrt:
1318              msvcrt.locking(lock_fd.fileno(), msvcrt.LK_NBLCK, 1)
1319      except (OSError, IOError):
1320          logger.debug("Tick skipped — another instance holds the lock")
1321          if lock_fd is not None:
1322              lock_fd.close()
1323          return 0
1324  
1325      try:
1326          due_jobs = get_due_jobs()
1327  
1328          if verbose and not due_jobs:
1329              logger.info("%s - No jobs due", _hermes_now().strftime('%H:%M:%S'))
1330              return 0
1331  
1332          if verbose:
1333              logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
1334  
1335          # Advance next_run_at for all recurring jobs FIRST, under the file lock,
1336          # before any execution begins.  This preserves at-most-once semantics.
1337          for job in due_jobs:
1338              advance_next_run(job["id"])
1339  
1340          # Resolve max parallel workers: env var > config.yaml > unbounded.
1341          # Set HERMES_CRON_MAX_PARALLEL=1 to restore old serial behaviour.
1342          _max_workers: Optional[int] = None
1343          try:
1344              _env_par = os.getenv("HERMES_CRON_MAX_PARALLEL", "").strip()
1345              if _env_par:
1346                  _max_workers = int(_env_par) or None
1347          except (ValueError, TypeError):
1348              logger.warning("Invalid HERMES_CRON_MAX_PARALLEL value; defaulting to unbounded")
1349          if _max_workers is None:
1350              try:
1351                  _ucfg = load_config() or {}
1352                  _cfg_par = (
1353                      _ucfg.get("cron", {}) if isinstance(_ucfg, dict) else {}
1354                  ).get("max_parallel_jobs")
1355                  if _cfg_par is not None:
1356                      _max_workers = int(_cfg_par) or None
1357              except Exception:
1358                  pass
1359  
1360          if verbose:
1361              logger.info(
1362                  "Running %d job(s) in parallel (max_workers=%s)",
1363                  len(due_jobs),
1364                  _max_workers if _max_workers else "unbounded",
1365              )
1366  
1367          def _process_job(job: dict) -> bool:
1368              """Run one due job end-to-end: execute, save, deliver, mark."""
1369              try:
1370                  success, output, final_response, error = run_job(job)
1371  
1372                  output_file = save_job_output(job["id"], output)
1373                  if verbose:
1374                      logger.info("Output saved to: %s", output_file)
1375  
1376                  # Deliver the final response to the origin/target chat.
1377                  # If the agent responded with [SILENT], skip delivery (but
1378                  # output is already saved above).  Failed jobs always deliver.
1379                  deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
1380                  should_deliver = bool(deliver_content)
1381                  if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
1382                      logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
1383                      should_deliver = False
1384  
1385                  delivery_error = None
1386                  if should_deliver:
1387                      try:
1388                          delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
1389                      except Exception as de:
1390                          delivery_error = str(de)
1391                          logger.error("Delivery failed for job %s: %s", job["id"], de)
1392  
1393                  # Treat empty final_response as a soft failure so last_status
1394                  # is not "ok" — the agent ran but produced nothing useful.
1395                  # (issue #8585)
1396                  if success and not final_response:
1397                      success = False
1398                      error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)"
1399  
1400                  mark_job_run(job["id"], success, error, delivery_error=delivery_error)
1401                  return True
1402  
1403              except Exception as e:
1404                  logger.error("Error processing job %s: %s", job['id'], e)
1405                  mark_job_run(job["id"], False, str(e))
1406                  return False
1407  
1408          # Partition due jobs: those with a per-job workdir mutate
1409          # os.environ["TERMINAL_CWD"] inside run_job, which is process-global —
1410          # so they MUST run sequentially to avoid corrupting each other.  Jobs
1411          # without a workdir leave env untouched and stay parallel-safe.
1412          workdir_jobs = [j for j in due_jobs if (j.get("workdir") or "").strip()]
1413          parallel_jobs = [j for j in due_jobs if not (j.get("workdir") or "").strip()]
1414  
1415          _results: list = []
1416  
1417          # Sequential pass for workdir jobs.
1418          for job in workdir_jobs:
1419              _ctx = contextvars.copy_context()
1420              _results.append(_ctx.run(_process_job, job))
1421  
1422          # Parallel pass for the rest — same behaviour as before.
1423          if parallel_jobs:
1424              with concurrent.futures.ThreadPoolExecutor(max_workers=_max_workers) as _tick_pool:
1425                  _futures = []
1426                  for job in parallel_jobs:
1427                      _ctx = contextvars.copy_context()
1428                      _futures.append(_tick_pool.submit(_ctx.run, _process_job, job))
1429                  _results.extend(f.result() for f in _futures)
1430  
1431          # Best-effort sweep of MCP stdio subprocesses that survived their
1432          # session teardown during this tick.  Runs AFTER every job has
1433          # finished so active sessions (including live user chats) are
1434          # never touched — only PIDs explicitly detected as orphans in
1435          # tools.mcp_tool._run_stdio's finally block are reaped.
1436          try:
1437              from tools.mcp_tool import _kill_orphaned_mcp_children
1438              _kill_orphaned_mcp_children()
1439          except Exception as _e:
1440              logger.debug("Post-tick MCP orphan cleanup failed: %s", _e)
1441  
1442          return sum(_results)
1443      finally:
1444          if fcntl:
1445              fcntl.flock(lock_fd, fcntl.LOCK_UN)
1446          elif msvcrt:
1447              try:
1448                  msvcrt.locking(lock_fd.fileno(), msvcrt.LK_UNLCK, 1)
1449              except (OSError, IOError):
1450                  pass
1451          lock_fd.close()
1452  
1453  
1454  if __name__ == "__main__":
1455      tick(verbose=True)