managed_tool_gateway.py
1 """Generic managed-tool gateway helpers for Nous-hosted vendor passthroughs.""" 2 3 from __future__ import annotations 4 5 import json 6 import logging 7 import os 8 from datetime import datetime, timezone 9 from dataclasses import dataclass 10 from typing import Callable, Optional 11 12 logger = logging.getLogger(__name__) 13 14 from hermes_constants import get_hermes_home 15 from tools.tool_backend_helpers import managed_nous_tools_enabled 16 17 _DEFAULT_TOOL_GATEWAY_DOMAIN = "nousresearch.com" 18 _DEFAULT_TOOL_GATEWAY_SCHEME = "https" 19 _NOUS_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120 20 21 22 @dataclass(frozen=True) 23 class ManagedToolGatewayConfig: 24 vendor: str 25 gateway_origin: str 26 nous_user_token: str 27 managed_mode: bool 28 29 30 def auth_json_path(): 31 """Return the Hermes auth store path, respecting HERMES_HOME overrides.""" 32 return get_hermes_home() / "auth.json" 33 34 35 def _read_nous_provider_state() -> Optional[dict]: 36 try: 37 path = auth_json_path() 38 if not path.is_file(): 39 return None 40 data = json.loads(path.read_text()) 41 providers = data.get("providers", {}) 42 if not isinstance(providers, dict): 43 return None 44 nous_provider = providers.get("nous", {}) 45 if isinstance(nous_provider, dict): 46 return nous_provider 47 except Exception: 48 pass 49 return None 50 51 52 def _parse_timestamp(value: object) -> Optional[datetime]: 53 if not isinstance(value, str) or not value.strip(): 54 return None 55 normalized = value.strip() 56 if normalized.endswith("Z"): 57 normalized = normalized[:-1] + "+00:00" 58 try: 59 parsed = datetime.fromisoformat(normalized) 60 except ValueError: 61 return None 62 if parsed.tzinfo is None: 63 parsed = parsed.replace(tzinfo=timezone.utc) 64 return parsed.astimezone(timezone.utc) 65 66 67 def _access_token_is_expiring(expires_at: object, skew_seconds: int) -> bool: 68 expires = _parse_timestamp(expires_at) 69 if expires is None: 70 return True 71 remaining = (expires - datetime.now(timezone.utc)).total_seconds() 72 return remaining <= max(0, int(skew_seconds)) 73 74 75 def read_nous_access_token() -> Optional[str]: 76 """Read a Nous Subscriber OAuth access token from auth store or env override.""" 77 explicit = os.getenv("TOOL_GATEWAY_USER_TOKEN") 78 if isinstance(explicit, str) and explicit.strip(): 79 return explicit.strip() 80 81 nous_provider = _read_nous_provider_state() or {} 82 access_token = nous_provider.get("access_token") 83 cached_token = access_token.strip() if isinstance(access_token, str) and access_token.strip() else None 84 85 if cached_token and not _access_token_is_expiring( 86 nous_provider.get("expires_at"), 87 _NOUS_ACCESS_TOKEN_REFRESH_SKEW_SECONDS, 88 ): 89 return cached_token 90 91 try: 92 from hermes_cli.auth import resolve_nous_access_token 93 94 refreshed_token = resolve_nous_access_token( 95 refresh_skew_seconds=_NOUS_ACCESS_TOKEN_REFRESH_SKEW_SECONDS, 96 ) 97 if isinstance(refreshed_token, str) and refreshed_token.strip(): 98 return refreshed_token.strip() 99 except Exception as exc: 100 logger.debug("Nous access token refresh failed: %s", exc) 101 102 return cached_token 103 104 105 def get_tool_gateway_scheme() -> str: 106 """Return configured shared gateway URL scheme.""" 107 scheme = os.getenv("TOOL_GATEWAY_SCHEME", "").strip().lower() 108 if not scheme: 109 return _DEFAULT_TOOL_GATEWAY_SCHEME 110 111 if scheme in {"http", "https"}: 112 return scheme 113 114 raise ValueError("TOOL_GATEWAY_SCHEME must be 'http' or 'https'") 115 116 117 def build_vendor_gateway_url(vendor: str) -> str: 118 """Return the gateway origin for a specific vendor.""" 119 vendor_key = f"{vendor.upper().replace('-', '_')}_GATEWAY_URL" 120 explicit_vendor_url = os.getenv(vendor_key, "").strip().rstrip("/") 121 if explicit_vendor_url: 122 return explicit_vendor_url 123 124 shared_scheme = get_tool_gateway_scheme() 125 shared_domain = os.getenv("TOOL_GATEWAY_DOMAIN", "").strip().strip("/") 126 if shared_domain: 127 return f"{shared_scheme}://{vendor}-gateway.{shared_domain}" 128 129 return f"{shared_scheme}://{vendor}-gateway.{_DEFAULT_TOOL_GATEWAY_DOMAIN}" 130 131 132 def resolve_managed_tool_gateway( 133 vendor: str, 134 gateway_builder: Optional[Callable[[str], str]] = None, 135 token_reader: Optional[Callable[[], Optional[str]]] = None, 136 ) -> Optional[ManagedToolGatewayConfig]: 137 """Resolve shared managed-tool gateway config for a vendor.""" 138 if not managed_nous_tools_enabled(): 139 return None 140 141 resolved_gateway_builder = gateway_builder or build_vendor_gateway_url 142 resolved_token_reader = token_reader or read_nous_access_token 143 144 gateway_origin = resolved_gateway_builder(vendor) 145 nous_user_token = resolved_token_reader() 146 if not gateway_origin or not nous_user_token: 147 return None 148 149 return ManagedToolGatewayConfig( 150 vendor=vendor, 151 gateway_origin=gateway_origin, 152 nous_user_token=nous_user_token, 153 managed_mode=True, 154 ) 155 156 157 def is_managed_tool_gateway_ready( 158 vendor: str, 159 gateway_builder: Optional[Callable[[str], str]] = None, 160 token_reader: Optional[Callable[[], Optional[str]]] = None, 161 ) -> bool: 162 """Return True when gateway URL and Nous access token are available.""" 163 return resolve_managed_tool_gateway( 164 vendor, 165 gateway_builder=gateway_builder, 166 token_reader=token_reader, 167 ) is not None