/ hermes_cli / dingtalk_auth.py
dingtalk_auth.py
  1  """
  2  DingTalk Device Flow authorization.
  3  
  4  Implements the same 3-step registration flow as dingtalk-openclaw-connector:
  5    1. POST /app/registration/init   → get nonce
  6    2. POST /app/registration/begin  → get device_code + verification_uri_complete
  7    3. POST /app/registration/poll   → poll until SUCCESS → get client_id + client_secret
  8  
  9  The verification_uri_complete is rendered as a QR code in the terminal so the
 10  user can scan it with DingTalk to authorize, yielding AppKey + AppSecret
 11  automatically.
 12  """
 13  
 14  from __future__ import annotations
 15  
 16  import os
 17  import sys
 18  import time
 19  import logging
 20  from typing import Optional, Tuple
 21  
 22  import requests
 23  
 24  logger = logging.getLogger(__name__)
 25  
 26  # ── Configuration ──────────────────────────────────────────────────────────
 27  
 28  REGISTRATION_BASE_URL = os.environ.get(
 29      "DINGTALK_REGISTRATION_BASE_URL", "https://oapi.dingtalk.com"
 30  ).rstrip("/")
 31  
 32  REGISTRATION_SOURCE = os.environ.get("DINGTALK_REGISTRATION_SOURCE", "openClaw")
 33  
 34  
 35  # ── API helpers ────────────────────────────────────────────────────────────
 36  
 37  class RegistrationError(Exception):
 38      """Raised when a DingTalk registration API call fails."""
 39  
 40  
 41  def _api_post(path: str, payload: dict) -> dict:
 42      """POST to the registration API and return the parsed JSON body."""
 43      url = f"{REGISTRATION_BASE_URL}{path}"
 44      try:
 45          resp = requests.post(url, json=payload, timeout=15)
 46          resp.raise_for_status()
 47          data = resp.json()
 48      except requests.RequestException as exc:
 49          raise RegistrationError(f"Network error calling {url}: {exc}") from exc
 50  
 51      errcode = data.get("errcode", -1)
 52      if errcode != 0:
 53          errmsg = data.get("errmsg", "unknown error")
 54          raise RegistrationError(f"API error [{path}]: {errmsg} (errcode={errcode})")
 55      return data
 56  
 57  
 58  # ── Core flow ──────────────────────────────────────────────────────────────
 59  
 60  def begin_registration() -> dict:
 61      """Start a device-flow registration.
 62  
 63      Returns a dict with keys:
 64          device_code, verification_uri_complete, expires_in, interval
 65      """
 66      # Step 1: init → nonce
 67      init_data = _api_post("/app/registration/init", {"source": REGISTRATION_SOURCE})
 68      nonce = str(init_data.get("nonce", "")).strip()
 69      if not nonce:
 70          raise RegistrationError("init response missing nonce")
 71  
 72      # Step 2: begin → device_code, verification_uri_complete
 73      begin_data = _api_post("/app/registration/begin", {"nonce": nonce})
 74      device_code = str(begin_data.get("device_code", "")).strip()
 75      verification_uri_complete = str(begin_data.get("verification_uri_complete", "")).strip()
 76      if not device_code:
 77          raise RegistrationError("begin response missing device_code")
 78      if not verification_uri_complete:
 79          raise RegistrationError("begin response missing verification_uri_complete")
 80  
 81      return {
 82          "device_code": device_code,
 83          "verification_uri_complete": verification_uri_complete,
 84          "expires_in": int(begin_data.get("expires_in", 7200)),
 85          "interval": max(int(begin_data.get("interval", 3)), 2),
 86      }
 87  
 88  
 89  def poll_registration(device_code: str) -> dict:
 90      """Poll the registration status once.
 91  
 92      Returns a dict with keys:  status, client_id?, client_secret?, fail_reason?
 93      """
 94      data = _api_post("/app/registration/poll", {"device_code": device_code})
 95      status_raw = str(data.get("status", "")).strip().upper()
 96      if status_raw not in ("WAITING", "SUCCESS", "FAIL", "EXPIRED"):
 97          status_raw = "UNKNOWN"
 98      return {
 99          "status": status_raw,
100          "client_id": str(data.get("client_id", "")).strip() or None,
101          "client_secret": str(data.get("client_secret", "")).strip() or None,
102          "fail_reason": str(data.get("fail_reason", "")).strip() or None,
103      }
104  
105  
106  def wait_for_registration_success(
107      device_code: str,
108      interval: int = 3,
109      expires_in: int = 7200,
110      on_waiting: Optional[callable] = None,
111  ) -> Tuple[str, str]:
112      """Block until the registration succeeds or times out.
113  
114      Returns (client_id, client_secret).
115      """
116      deadline = time.monotonic() + expires_in
117      retry_window = 120  # 2 minutes for transient errors
118      retry_start = 0.0
119  
120      while time.monotonic() < deadline:
121          time.sleep(interval)
122          try:
123              result = poll_registration(device_code)
124          except RegistrationError:
125              if retry_start == 0:
126                  retry_start = time.monotonic()
127              if time.monotonic() - retry_start < retry_window:
128                  continue
129              raise
130  
131          status = result["status"]
132          if status == "WAITING":
133              retry_start = 0
134              if on_waiting:
135                  on_waiting()
136              continue
137          if status == "SUCCESS":
138              cid = result["client_id"]
139              csecret = result["client_secret"]
140              if not cid or not csecret:
141                  raise RegistrationError("authorization succeeded but credentials are missing")
142              return cid, csecret
143          # FAIL / EXPIRED / UNKNOWN
144          if retry_start == 0:
145              retry_start = time.monotonic()
146          if time.monotonic() - retry_start < retry_window:
147              continue
148          reason = result.get("fail_reason") or status
149          raise RegistrationError(f"authorization failed: {reason}")
150  
151      raise RegistrationError("authorization timed out, please retry")
152  
153  
154  # ── QR code rendering ─────────────────────────────────────────────────────
155  
156  def _ensure_qrcode_installed() -> bool:
157      """Try to import qrcode; if missing, auto-install it via pip/uv."""
158      try:
159          import qrcode  # noqa: F401
160          return True
161      except ImportError:
162          pass
163  
164      import subprocess
165  
166      # Try uv first (Hermes convention), then pip
167      for cmd in (
168          [sys.executable, "-m", "uv", "pip", "install", "qrcode"],
169          [sys.executable, "-m", "pip", "install", "-q", "qrcode"],
170      ):
171          try:
172              subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
173              import qrcode  # noqa: F401,F811
174              return True
175          except (subprocess.CalledProcessError, ImportError, FileNotFoundError):
176              continue
177      return False
178  
179  
180  def render_qr_to_terminal(url: str) -> bool:
181      """Render *url* as a compact QR code in the terminal.
182  
183      Returns True if the QR code was printed, False if the library is missing.
184      """
185      try:
186          import qrcode
187      except ImportError:
188          return False
189  
190      qr = qrcode.QRCode(
191          version=1,
192          error_correction=qrcode.constants.ERROR_CORRECT_L,
193          box_size=1,
194          border=1,
195      )
196      qr.add_data(url)
197      qr.make(fit=True)
198  
199      # Use half-block characters for compact rendering (2 rows per character)
200      matrix = qr.get_matrix()
201      rows = len(matrix)
202      lines: list[str] = []
203  
204      TOP_HALF = "\u2580"      # ▀
205      BOTTOM_HALF = "\u2584"   # ▄
206      FULL_BLOCK = "\u2588"    # █
207      EMPTY = " "
208  
209      for r in range(0, rows, 2):
210          line_chars: list[str] = []
211          for c in range(len(matrix[r])):
212              top = matrix[r][c]
213              bottom = matrix[r + 1][c] if r + 1 < rows else False
214              if top and bottom:
215                  line_chars.append(FULL_BLOCK)
216              elif top:
217                  line_chars.append(TOP_HALF)
218              elif bottom:
219                  line_chars.append(BOTTOM_HALF)
220              else:
221                  line_chars.append(EMPTY)
222          lines.append("    " + "".join(line_chars))
223  
224      print("\n".join(lines))
225      return True
226  
227  
228  # ── High-level entry point for the setup wizard ───────────────────────────
229  
230  def dingtalk_qr_auth() -> Optional[Tuple[str, str]]:
231      """Run the interactive QR-code device-flow authorization.
232  
233      Returns (client_id, client_secret) on success, or None if the user
234      cancelled or the flow failed.
235      """
236      from hermes_cli.setup import print_info, print_success, print_warning, print_error
237  
238      print()
239      print_info("  Initializing DingTalk device authorization...")
240      print_info("  Note: the scan page is branded 'OpenClaw' — DingTalk's")
241      print_info("        ecosystem onboarding bridge. Safe to use.")
242  
243      try:
244          reg = begin_registration()
245      except RegistrationError as exc:
246          print_error(f"  Authorization init failed: {exc}")
247          return None
248  
249      url = reg["verification_uri_complete"]
250  
251      # Ensure qrcode library is available (auto-install if missing)
252      if not _ensure_qrcode_installed():
253          print_warning("  qrcode library install failed, will show link only.")
254  
255      print()
256      print_info("  Please scan the QR code below with DingTalk to authorize:")
257      print()
258  
259      if not render_qr_to_terminal(url):
260          print_warning(f"  QR code render failed, please open the link below to authorize:")
261  
262      print()
263      print_info(f"  Or open this link manually: {url}")
264      print()
265      print_info("  Waiting for QR scan authorization... (timeout: 2 hours)")
266  
267      dot_count = 0
268  
269      def _on_waiting():
270          nonlocal dot_count
271          dot_count += 1
272          if dot_count % 10 == 0:
273              sys.stdout.write(".")
274              sys.stdout.flush()
275  
276      try:
277          client_id, client_secret = wait_for_registration_success(
278              device_code=reg["device_code"],
279              interval=reg["interval"],
280              expires_in=reg["expires_in"],
281              on_waiting=_on_waiting,
282          )
283      except RegistrationError as exc:
284          print()
285          print_error(f"  Authorization failed: {exc}")
286          return None
287  
288      print()
289      print_success("  QR scan authorization successful!")
290      print_success(f"  Client ID:     {client_id}")
291      print_success(f"  Client Secret: {client_secret[:8]}{'*' * (len(client_secret) - 8)}")
292  
293      return client_id, client_secret