/ hermes_cli / dingtalk_auth.py
dingtalk_auth.py
1 """ 2 DingTalk Device Flow authorization. 3 4 Implements the same 3-step registration flow as dingtalk-openclaw-connector: 5 1. POST /app/registration/init → get nonce 6 2. POST /app/registration/begin → get device_code + verification_uri_complete 7 3. POST /app/registration/poll → poll until SUCCESS → get client_id + client_secret 8 9 The verification_uri_complete is rendered as a QR code in the terminal so the 10 user can scan it with DingTalk to authorize, yielding AppKey + AppSecret 11 automatically. 12 """ 13 14 from __future__ import annotations 15 16 import os 17 import sys 18 import time 19 import logging 20 from typing import Optional, Tuple 21 22 import requests 23 24 logger = logging.getLogger(__name__) 25 26 # ── Configuration ────────────────────────────────────────────────────────── 27 28 REGISTRATION_BASE_URL = os.environ.get( 29 "DINGTALK_REGISTRATION_BASE_URL", "https://oapi.dingtalk.com" 30 ).rstrip("/") 31 32 REGISTRATION_SOURCE = os.environ.get("DINGTALK_REGISTRATION_SOURCE", "openClaw") 33 34 35 # ── API helpers ──────────────────────────────────────────────────────────── 36 37 class RegistrationError(Exception): 38 """Raised when a DingTalk registration API call fails.""" 39 40 41 def _api_post(path: str, payload: dict) -> dict: 42 """POST to the registration API and return the parsed JSON body.""" 43 url = f"{REGISTRATION_BASE_URL}{path}" 44 try: 45 resp = requests.post(url, json=payload, timeout=15) 46 resp.raise_for_status() 47 data = resp.json() 48 except requests.RequestException as exc: 49 raise RegistrationError(f"Network error calling {url}: {exc}") from exc 50 51 errcode = data.get("errcode", -1) 52 if errcode != 0: 53 errmsg = data.get("errmsg", "unknown error") 54 raise RegistrationError(f"API error [{path}]: {errmsg} (errcode={errcode})") 55 return data 56 57 58 # ── Core flow ────────────────────────────────────────────────────────────── 59 60 def begin_registration() -> dict: 61 """Start a device-flow registration. 62 63 Returns a dict with keys: 64 device_code, verification_uri_complete, expires_in, interval 65 """ 66 # Step 1: init → nonce 67 init_data = _api_post("/app/registration/init", {"source": REGISTRATION_SOURCE}) 68 nonce = str(init_data.get("nonce", "")).strip() 69 if not nonce: 70 raise RegistrationError("init response missing nonce") 71 72 # Step 2: begin → device_code, verification_uri_complete 73 begin_data = _api_post("/app/registration/begin", {"nonce": nonce}) 74 device_code = str(begin_data.get("device_code", "")).strip() 75 verification_uri_complete = str(begin_data.get("verification_uri_complete", "")).strip() 76 if not device_code: 77 raise RegistrationError("begin response missing device_code") 78 if not verification_uri_complete: 79 raise RegistrationError("begin response missing verification_uri_complete") 80 81 return { 82 "device_code": device_code, 83 "verification_uri_complete": verification_uri_complete, 84 "expires_in": int(begin_data.get("expires_in", 7200)), 85 "interval": max(int(begin_data.get("interval", 3)), 2), 86 } 87 88 89 def poll_registration(device_code: str) -> dict: 90 """Poll the registration status once. 91 92 Returns a dict with keys: status, client_id?, client_secret?, fail_reason? 93 """ 94 data = _api_post("/app/registration/poll", {"device_code": device_code}) 95 status_raw = str(data.get("status", "")).strip().upper() 96 if status_raw not in ("WAITING", "SUCCESS", "FAIL", "EXPIRED"): 97 status_raw = "UNKNOWN" 98 return { 99 "status": status_raw, 100 "client_id": str(data.get("client_id", "")).strip() or None, 101 "client_secret": str(data.get("client_secret", "")).strip() or None, 102 "fail_reason": str(data.get("fail_reason", "")).strip() or None, 103 } 104 105 106 def wait_for_registration_success( 107 device_code: str, 108 interval: int = 3, 109 expires_in: int = 7200, 110 on_waiting: Optional[callable] = None, 111 ) -> Tuple[str, str]: 112 """Block until the registration succeeds or times out. 113 114 Returns (client_id, client_secret). 115 """ 116 deadline = time.monotonic() + expires_in 117 retry_window = 120 # 2 minutes for transient errors 118 retry_start = 0.0 119 120 while time.monotonic() < deadline: 121 time.sleep(interval) 122 try: 123 result = poll_registration(device_code) 124 except RegistrationError: 125 if retry_start == 0: 126 retry_start = time.monotonic() 127 if time.monotonic() - retry_start < retry_window: 128 continue 129 raise 130 131 status = result["status"] 132 if status == "WAITING": 133 retry_start = 0 134 if on_waiting: 135 on_waiting() 136 continue 137 if status == "SUCCESS": 138 cid = result["client_id"] 139 csecret = result["client_secret"] 140 if not cid or not csecret: 141 raise RegistrationError("authorization succeeded but credentials are missing") 142 return cid, csecret 143 # FAIL / EXPIRED / UNKNOWN 144 if retry_start == 0: 145 retry_start = time.monotonic() 146 if time.monotonic() - retry_start < retry_window: 147 continue 148 reason = result.get("fail_reason") or status 149 raise RegistrationError(f"authorization failed: {reason}") 150 151 raise RegistrationError("authorization timed out, please retry") 152 153 154 # ── QR code rendering ───────────────────────────────────────────────────── 155 156 def _ensure_qrcode_installed() -> bool: 157 """Try to import qrcode; if missing, auto-install it via pip/uv.""" 158 try: 159 import qrcode # noqa: F401 160 return True 161 except ImportError: 162 pass 163 164 import subprocess 165 166 # Try uv first (Hermes convention), then pip 167 for cmd in ( 168 [sys.executable, "-m", "uv", "pip", "install", "qrcode"], 169 [sys.executable, "-m", "pip", "install", "-q", "qrcode"], 170 ): 171 try: 172 subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) 173 import qrcode # noqa: F401,F811 174 return True 175 except (subprocess.CalledProcessError, ImportError, FileNotFoundError): 176 continue 177 return False 178 179 180 def render_qr_to_terminal(url: str) -> bool: 181 """Render *url* as a compact QR code in the terminal. 182 183 Returns True if the QR code was printed, False if the library is missing. 184 """ 185 try: 186 import qrcode 187 except ImportError: 188 return False 189 190 qr = qrcode.QRCode( 191 version=1, 192 error_correction=qrcode.constants.ERROR_CORRECT_L, 193 box_size=1, 194 border=1, 195 ) 196 qr.add_data(url) 197 qr.make(fit=True) 198 199 # Use half-block characters for compact rendering (2 rows per character) 200 matrix = qr.get_matrix() 201 rows = len(matrix) 202 lines: list[str] = [] 203 204 TOP_HALF = "\u2580" # ▀ 205 BOTTOM_HALF = "\u2584" # ▄ 206 FULL_BLOCK = "\u2588" # █ 207 EMPTY = " " 208 209 for r in range(0, rows, 2): 210 line_chars: list[str] = [] 211 for c in range(len(matrix[r])): 212 top = matrix[r][c] 213 bottom = matrix[r + 1][c] if r + 1 < rows else False 214 if top and bottom: 215 line_chars.append(FULL_BLOCK) 216 elif top: 217 line_chars.append(TOP_HALF) 218 elif bottom: 219 line_chars.append(BOTTOM_HALF) 220 else: 221 line_chars.append(EMPTY) 222 lines.append(" " + "".join(line_chars)) 223 224 print("\n".join(lines)) 225 return True 226 227 228 # ── High-level entry point for the setup wizard ─────────────────────────── 229 230 def dingtalk_qr_auth() -> Optional[Tuple[str, str]]: 231 """Run the interactive QR-code device-flow authorization. 232 233 Returns (client_id, client_secret) on success, or None if the user 234 cancelled or the flow failed. 235 """ 236 from hermes_cli.setup import print_info, print_success, print_warning, print_error 237 238 print() 239 print_info(" Initializing DingTalk device authorization...") 240 print_info(" Note: the scan page is branded 'OpenClaw' — DingTalk's") 241 print_info(" ecosystem onboarding bridge. Safe to use.") 242 243 try: 244 reg = begin_registration() 245 except RegistrationError as exc: 246 print_error(f" Authorization init failed: {exc}") 247 return None 248 249 url = reg["verification_uri_complete"] 250 251 # Ensure qrcode library is available (auto-install if missing) 252 if not _ensure_qrcode_installed(): 253 print_warning(" qrcode library install failed, will show link only.") 254 255 print() 256 print_info(" Please scan the QR code below with DingTalk to authorize:") 257 print() 258 259 if not render_qr_to_terminal(url): 260 print_warning(f" QR code render failed, please open the link below to authorize:") 261 262 print() 263 print_info(f" Or open this link manually: {url}") 264 print() 265 print_info(" Waiting for QR scan authorization... (timeout: 2 hours)") 266 267 dot_count = 0 268 269 def _on_waiting(): 270 nonlocal dot_count 271 dot_count += 1 272 if dot_count % 10 == 0: 273 sys.stdout.write(".") 274 sys.stdout.flush() 275 276 try: 277 client_id, client_secret = wait_for_registration_success( 278 device_code=reg["device_code"], 279 interval=reg["interval"], 280 expires_in=reg["expires_in"], 281 on_waiting=_on_waiting, 282 ) 283 except RegistrationError as exc: 284 print() 285 print_error(f" Authorization failed: {exc}") 286 return None 287 288 print() 289 print_success(" QR scan authorization successful!") 290 print_success(f" Client ID: {client_id}") 291 print_success(f" Client Secret: {client_secret[:8]}{'*' * (len(client_secret) - 8)}") 292 293 return client_id, client_secret