/ agent / credential_sources.py
credential_sources.py
  1  """Unified removal contract for every credential source Hermes reads from.
  2  
  3  Hermes seeds its credential pool from many places:
  4  
  5      env:<VAR>     — os.environ / ~/.hermes/.env
  6      claude_code   — ~/.claude/.credentials.json
  7      hermes_pkce   — ~/.hermes/.anthropic_oauth.json
  8      device_code   — auth.json providers.<provider> (nous, openai-codex, ...)
  9      qwen-cli      — ~/.qwen/oauth_creds.json
 10      gh_cli        — gh auth token
 11      config:<name> — custom_providers config entry
 12      model_config  — model.api_key when model.provider == "custom"
 13      manual        — user ran `hermes auth add`
 14  
 15  Each source has its own reader inside ``agent.credential_pool._seed_from_*``
 16  (which keep their existing shape — we haven't restructured them).  What we
 17  unify here is **removal**:
 18  
 19      ``hermes auth remove <provider> <N>`` must make the pool entry stay gone.
 20  
 21  Before this module, every source had an ad-hoc removal branch in
 22  ``auth_remove_command``, and several sources had no branch at all — so
 23  ``auth remove`` silently reverted on the next ``load_pool()`` call for
 24  qwen-cli, nous device_code (partial), hermes_pkce, copilot gh_cli, and
 25  custom-config sources.
 26  
 27  Now every source registers a ``RemovalStep`` that does exactly three things
 28  in the same shape:
 29  
 30      1. Clean up whatever externally-readable state the source reads from
 31         (.env line, auth.json block, OAuth file, etc.)
 32      2. Suppress the ``(provider, source_id)`` in auth.json so the
 33         corresponding ``_seed_from_*`` branch skips the upsert on re-load
 34      3. Return ``RemovalResult`` describing what was cleaned and any
 35         diagnostic hints the user should see (shell-exported env vars,
 36         external credential files we deliberately don't delete, etc.)
 37  
 38  Adding a new credential source is:
 39      - wire up a reader branch in ``_seed_from_*`` (existing pattern)
 40      - gate that reader behind ``is_source_suppressed(provider, source_id)``
 41      - register a ``RemovalStep`` here
 42  
 43  No more per-source if/elif chain in ``auth_remove_command``.
 44  """
 45  
 46  from __future__ import annotations
 47  
 48  import os
 49  from dataclasses import dataclass, field
 50  from typing import Callable, List, Optional
 51  
 52  
 53  @dataclass
 54  class RemovalResult:
 55      """Outcome of removing a credential source.
 56  
 57      Attributes:
 58          cleaned: Short strings describing external state that was actually
 59              mutated (``"Cleared XAI_API_KEY from .env"``,
 60              ``"Cleared openai-codex OAuth tokens from auth store"``).
 61              Printed as plain lines to the user.
 62          hints: Diagnostic lines ABOUT state the user may need to clean up
 63              themselves or is deliberately left intact (shell-exported env
 64              var, Claude Code credential file we don't delete, etc.).
 65              Printed as plain lines to the user.  Always non-destructive.
 66          suppress: Whether to call ``suppress_credential_source`` after
 67              cleanup so future ``load_pool`` calls skip this source.
 68              Default True — almost every source needs this to stay sticky.
 69              The only legitimate False is ``manual`` entries, which aren't
 70              seeded from anywhere external.
 71      """
 72  
 73      cleaned: List[str] = field(default_factory=list)
 74      hints: List[str] = field(default_factory=list)
 75      suppress: bool = True
 76  
 77  
 78  @dataclass
 79  class RemovalStep:
 80      """How to remove one specific credential source cleanly.
 81  
 82      Attributes:
 83          provider: Provider pool key (``"xai"``, ``"anthropic"``, ``"nous"``, ...).
 84              Special value ``"*"`` means "matches any provider" — used for
 85              sources like ``manual`` that aren't provider-specific.
 86          source_id: Source identifier as it appears in
 87              ``PooledCredential.source``.  May be a literal (``"claude_code"``)
 88              or a prefix pattern matched via ``match_fn``.
 89          match_fn: Optional predicate overriding literal ``source_id``
 90              matching.  Gets the removed entry's source string.  Used for
 91              ``env:*`` (any env-seeded key), ``config:*`` (any custom
 92              pool), and ``manual:*`` (any manual-source variant).
 93          remove_fn: ``(provider, removed_entry) -> RemovalResult``.  Does the
 94              actual cleanup and returns what happened for the user.
 95          description: One-line human-readable description for docs / tests.
 96      """
 97  
 98      provider: str
 99      source_id: str
100      remove_fn: Callable[..., RemovalResult]
101      match_fn: Optional[Callable[[str], bool]] = None
102      description: str = ""
103  
104      def matches(self, provider: str, source: str) -> bool:
105          if self.provider != "*" and self.provider != provider:
106              return False
107          if self.match_fn is not None:
108              return self.match_fn(source)
109          return source == self.source_id
110  
111  
112  _REGISTRY: List[RemovalStep] = []
113  
114  
115  def register(step: RemovalStep) -> RemovalStep:
116      _REGISTRY.append(step)
117      return step
118  
119  
120  def find_removal_step(provider: str, source: str) -> Optional[RemovalStep]:
121      """Return the first matching RemovalStep, or None if unregistered.
122  
123      Unregistered sources fall through to the default remove path in
124      ``auth_remove_command``: the pool entry is already gone (that happens
125      before dispatch), no external cleanup, no suppression.  This is the
126      correct behaviour for ``manual`` entries — they were only ever stored
127      in the pool, nothing external to clean up.
128      """
129      for step in _REGISTRY:
130          if step.matches(provider, source):
131              return step
132      return None
133  
134  
135  # ---------------------------------------------------------------------------
136  # Individual RemovalStep implementations — one per source.
137  # ---------------------------------------------------------------------------
138  # Each remove_fn is intentionally small and single-purpose.  Adding a new
139  # credential source means adding ONE entry here — no other changes to
140  # auth_remove_command.
141  
142  
143  def _remove_env_source(provider: str, removed) -> RemovalResult:
144      """env:<VAR> — the most common case.
145  
146      Handles three user situations:
147        1. Var lives only in ~/.hermes/.env  → clear it
148        2. Var lives only in the user's shell (shell profile, systemd
149           EnvironmentFile, launchd plist) → hint them where to unset it
150        3. Var lives in both → clear from .env, hint about shell
151      """
152      from hermes_cli.config import get_env_path, remove_env_value
153  
154      result = RemovalResult()
155      env_var = removed.source[len("env:"):]
156      if not env_var:
157          return result
158  
159      # Detect shell vs .env BEFORE remove_env_value pops os.environ.
160      env_in_process = bool(os.getenv(env_var))
161      env_in_dotenv = False
162      try:
163          env_path = get_env_path()
164          if env_path.exists():
165              env_in_dotenv = any(
166                  line.strip().startswith(f"{env_var}=")
167                  for line in env_path.read_text(errors="replace").splitlines()
168              )
169      except OSError:
170          pass
171      shell_exported = env_in_process and not env_in_dotenv
172  
173      cleared = remove_env_value(env_var)
174      if cleared:
175          result.cleaned.append(f"Cleared {env_var} from .env")
176  
177      if shell_exported:
178          result.hints.extend([
179              f"Note: {env_var} is still set in your shell environment "
180              f"(not in ~/.hermes/.env).",
181              "  Unset it there (shell profile, systemd EnvironmentFile, "
182              "launchd plist, etc.) or it will keep being visible to Hermes.",
183              f"  The pool entry is now suppressed — Hermes will ignore "
184              f"{env_var} until you run `hermes auth add {provider}`.",
185          ])
186      else:
187          result.hints.append(
188              f"Suppressed env:{env_var} — it will not be re-seeded even "
189              f"if the variable is re-exported later."
190          )
191      return result
192  
193  
194  def _remove_claude_code(provider: str, removed) -> RemovalResult:
195      """~/.claude/.credentials.json is owned by Claude Code itself.
196  
197      We don't delete it — the user's Claude Code install still needs to
198      work.  We just suppress it so Hermes stops reading it.
199      """
200      return RemovalResult(hints=[
201          "Suppressed claude_code credential — it will not be re-seeded.",
202          "Note: Claude Code credentials still live in ~/.claude/.credentials.json",
203          "Run `hermes auth add anthropic` to re-enable if needed.",
204      ])
205  
206  
207  def _remove_hermes_pkce(provider: str, removed) -> RemovalResult:
208      """~/.hermes/.anthropic_oauth.json is ours — delete it outright."""
209      from hermes_constants import get_hermes_home
210  
211      result = RemovalResult()
212      oauth_file = get_hermes_home() / ".anthropic_oauth.json"
213      if oauth_file.exists():
214          try:
215              oauth_file.unlink()
216              result.cleaned.append("Cleared Hermes Anthropic OAuth credentials")
217          except OSError as exc:
218              result.hints.append(f"Could not delete {oauth_file}: {exc}")
219      return result
220  
221  
222  def _clear_auth_store_provider(provider: str) -> bool:
223      """Delete auth_store.providers[provider].  Returns True if deleted."""
224      from hermes_cli.auth import (
225          _auth_store_lock,
226          _load_auth_store,
227          _save_auth_store,
228      )
229  
230      with _auth_store_lock():
231          auth_store = _load_auth_store()
232          providers_dict = auth_store.get("providers")
233          if isinstance(providers_dict, dict) and provider in providers_dict:
234              del providers_dict[provider]
235              _save_auth_store(auth_store)
236              return True
237      return False
238  
239  
240  def _remove_nous_device_code(provider: str, removed) -> RemovalResult:
241      """Nous OAuth lives in auth.json providers.nous — clear it and suppress.
242  
243      We suppress in addition to clearing because nothing else stops the
244      user's next `hermes login` run from writing providers.nous again
245      before they decide to.  Suppression forces them to go through
246      `hermes auth add nous` to re-engage, which is the documented re-add
247      path and clears the suppression atomically.
248      """
249      result = RemovalResult()
250      if _clear_auth_store_provider(provider):
251          result.cleaned.append(f"Cleared {provider} OAuth tokens from auth store")
252      return result
253  
254  
255  def _remove_minimax_oauth(provider: str, removed) -> RemovalResult:
256      """MiniMax OAuth lives in auth.json providers.minimax-oauth — clear it.
257  
258      Same pattern as Nous: single-source OAuth state with refresh tokens.
259      Suppression of the `oauth` source ensures the pool reseed path
260      (_seed_from_singletons) doesn't instantly undo the removal.
261      """
262      result = RemovalResult()
263      if _clear_auth_store_provider(provider):
264          result.cleaned.append(f"Cleared {provider} OAuth tokens from auth store")
265      return result
266  
267  
268  def _remove_codex_device_code(provider: str, removed) -> RemovalResult:
269      """Codex tokens live in TWO places: our auth store AND ~/.codex/auth.json.
270  
271      refresh_codex_oauth_pure() writes both every time, so clearing only
272      the Hermes auth store is not enough — _seed_from_singletons() would
273      re-import from ~/.codex/auth.json on the next load_pool() call and
274      the removal would be instantly undone.  We suppress instead of
275      deleting Codex CLI's file, so the Codex CLI itself keeps working.
276  
277      The canonical source name in ``_seed_from_singletons`` is
278      ``"device_code"`` (no prefix).  Entries may show up in the pool as
279      either ``"device_code"`` (seeded) or ``"manual:device_code"`` (added
280      via ``hermes auth add openai-codex``), but in both cases the re-seed
281      gate lives at the ``"device_code"`` suppression key.  We suppress
282      that canonical key here; the central dispatcher also suppresses
283      ``removed.source`` which is fine — belt-and-suspenders, idempotent.
284      """
285      from hermes_cli.auth import suppress_credential_source
286  
287      result = RemovalResult()
288      if _clear_auth_store_provider(provider):
289          result.cleaned.append(f"Cleared {provider} OAuth tokens from auth store")
290      # Suppress the canonical re-seed source, not just whatever source the
291      # removed entry had.  Otherwise `manual:device_code` removals wouldn't
292      # block the `device_code` re-seed path.
293      suppress_credential_source(provider, "device_code")
294      result.hints.extend([
295          "Suppressed openai-codex device_code source — it will not be re-seeded.",
296          "Note: Codex CLI credentials still live in ~/.codex/auth.json",
297          "Run `hermes auth add openai-codex` to re-enable if needed.",
298      ])
299      return result
300  
301  
302  def _remove_qwen_cli(provider: str, removed) -> RemovalResult:
303      """~/.qwen/oauth_creds.json is owned by the Qwen CLI.
304  
305      Same pattern as claude_code — suppress, don't delete.  The user's
306      Qwen CLI install still reads from that file.
307      """
308      return RemovalResult(hints=[
309          "Suppressed qwen-cli credential — it will not be re-seeded.",
310          "Note: Qwen CLI credentials still live in ~/.qwen/oauth_creds.json",
311          "Run `hermes auth add qwen-oauth` to re-enable if needed.",
312      ])
313  
314  
315  def _remove_copilot_gh(provider: str, removed) -> RemovalResult:
316      """Copilot token comes from `gh auth token` or COPILOT_GITHUB_TOKEN / GH_TOKEN / GITHUB_TOKEN.
317  
318      Copilot is special: the same token can be seeded as multiple source
319      entries (gh_cli from ``_seed_from_singletons`` plus env:<VAR> from
320      ``_seed_from_env``), so removing one entry without suppressing the
321      others lets the duplicates resurrect.  We suppress ALL known copilot
322      sources here so removal is stable regardless of which entry the
323      user clicked.
324  
325      We don't touch the user's gh CLI or shell state — just suppress so
326      Hermes stops picking the token up.
327      """
328      # Suppress ALL copilot source variants up-front so no path resurrects
329      # the pool entry.  The central dispatcher in auth_remove_command will
330      # ALSO suppress removed.source, but it's idempotent so double-calling
331      # is harmless.
332      from hermes_cli.auth import suppress_credential_source
333      suppress_credential_source(provider, "gh_cli")
334      for env_var in ("COPILOT_GITHUB_TOKEN", "GH_TOKEN", "GITHUB_TOKEN"):
335          suppress_credential_source(provider, f"env:{env_var}")
336  
337      return RemovalResult(hints=[
338          "Suppressed all copilot token sources (gh_cli + env vars) — they will not be re-seeded.",
339          "Note: Your gh CLI / shell environment is unchanged.",
340          "Run `hermes auth add copilot` to re-enable if needed.",
341      ])
342  
343  
344  def _remove_custom_config(provider: str, removed) -> RemovalResult:
345      """Custom provider pools are seeded from custom_providers config or
346      model.api_key.  Both are in config.yaml — modifying that from here
347      is more invasive than suppression.  We suppress; the user can edit
348      config.yaml if they want to remove the key from disk entirely.
349      """
350      source_label = removed.source
351      return RemovalResult(hints=[
352          f"Suppressed {source_label} — it will not be re-seeded.",
353          "Note: The underlying value in config.yaml is unchanged.  Edit it "
354          "directly if you want to remove the credential from disk.",
355      ])
356  
357  
358  def _register_all_sources() -> None:
359      """Called once on module import.
360  
361      ORDER MATTERS — ``find_removal_step`` returns the first match.  Put
362      provider-specific steps before the generic ``env:*`` step so that e.g.
363      copilot's ``env:GH_TOKEN`` goes through the copilot removal (which
364      doesn't touch the user's shell), not the generic env-var removal
365      (which would try to clear .env).
366      """
367      register(RemovalStep(
368          provider="copilot", source_id="gh_cli",
369          match_fn=lambda src: src == "gh_cli" or src.startswith("env:"),
370          remove_fn=_remove_copilot_gh,
371          description="gh auth token / COPILOT_GITHUB_TOKEN / GH_TOKEN",
372      ))
373      register(RemovalStep(
374          provider="*", source_id="env:",
375          match_fn=lambda src: src.startswith("env:"),
376          remove_fn=_remove_env_source,
377          description="Any env-seeded credential (XAI_API_KEY, DEEPSEEK_API_KEY, etc.)",
378      ))
379      register(RemovalStep(
380          provider="anthropic", source_id="claude_code",
381          remove_fn=_remove_claude_code,
382          description="~/.claude/.credentials.json",
383      ))
384      register(RemovalStep(
385          provider="anthropic", source_id="hermes_pkce",
386          remove_fn=_remove_hermes_pkce,
387          description="~/.hermes/.anthropic_oauth.json",
388      ))
389      register(RemovalStep(
390          provider="nous", source_id="device_code",
391          remove_fn=_remove_nous_device_code,
392          description="auth.json providers.nous",
393      ))
394      register(RemovalStep(
395          provider="openai-codex", source_id="device_code",
396          match_fn=lambda src: src == "device_code" or src.endswith(":device_code"),
397          remove_fn=_remove_codex_device_code,
398          description="auth.json providers.openai-codex + ~/.codex/auth.json",
399      ))
400      register(RemovalStep(
401          provider="qwen-oauth", source_id="qwen-cli",
402          remove_fn=_remove_qwen_cli,
403          description="~/.qwen/oauth_creds.json",
404      ))
405      register(RemovalStep(
406          provider="minimax-oauth", source_id="oauth",
407          remove_fn=_remove_minimax_oauth,
408          description="auth.json providers.minimax-oauth",
409      ))
410      register(RemovalStep(
411          provider="*", source_id="config:",
412          match_fn=lambda src: src.startswith("config:") or src == "model_config",
413          remove_fn=_remove_custom_config,
414          description="Custom provider config.yaml api_key field",
415      ))
416  
417  
418  _register_all_sources()