client.py
1 """Thin Spotify Web API helper used by Hermes native tools.""" 2 3 from __future__ import annotations 4 5 import json 6 from typing import Any, Dict, Iterable, Optional 7 from urllib.parse import urlparse 8 9 import httpx 10 11 from hermes_cli.auth import ( 12 AuthError, 13 resolve_spotify_runtime_credentials, 14 ) 15 16 17 class SpotifyError(RuntimeError): 18 """Base Spotify tool error.""" 19 20 21 class SpotifyAuthRequiredError(SpotifyError): 22 """Raised when the user needs to authenticate with Spotify first.""" 23 24 25 class SpotifyAPIError(SpotifyError): 26 """Structured Spotify API failure.""" 27 28 def __init__( 29 self, 30 message: str, 31 *, 32 status_code: Optional[int] = None, 33 response_body: Optional[str] = None, 34 ) -> None: 35 super().__init__(message) 36 self.status_code = status_code 37 self.response_body = response_body 38 self.path = None 39 40 41 class SpotifyClient: 42 def __init__(self) -> None: 43 self._runtime = self._resolve_runtime(refresh_if_expiring=True) 44 45 def _resolve_runtime(self, *, force_refresh: bool = False, refresh_if_expiring: bool = True) -> Dict[str, Any]: 46 try: 47 return resolve_spotify_runtime_credentials( 48 force_refresh=force_refresh, 49 refresh_if_expiring=refresh_if_expiring, 50 ) 51 except AuthError as exc: 52 raise SpotifyAuthRequiredError(str(exc)) from exc 53 54 @property 55 def base_url(self) -> str: 56 return str(self._runtime.get("base_url") or "").rstrip("/") 57 58 def _headers(self) -> Dict[str, str]: 59 return { 60 "Authorization": f"Bearer {self._runtime['access_token']}", 61 "Content-Type": "application/json", 62 } 63 64 def request( 65 self, 66 method: str, 67 path: str, 68 *, 69 params: Optional[Dict[str, Any]] = None, 70 json_body: Optional[Dict[str, Any]] = None, 71 allow_retry_on_401: bool = True, 72 empty_response: Optional[Dict[str, Any]] = None, 73 ) -> Any: 74 url = f"{self.base_url}{path}" 75 response = httpx.request( 76 method, 77 url, 78 headers=self._headers(), 79 params=_strip_none(params), 80 json=_strip_none(json_body) if json_body is not None else None, 81 timeout=30.0, 82 ) 83 if response.status_code == 401 and allow_retry_on_401: 84 self._runtime = self._resolve_runtime(force_refresh=True, refresh_if_expiring=True) 85 return self.request( 86 method, 87 path, 88 params=params, 89 json_body=json_body, 90 allow_retry_on_401=False, 91 ) 92 if response.status_code >= 400: 93 self._raise_api_error(response, method=method, path=path) 94 if response.status_code == 204 or not response.content: 95 return empty_response or {"success": True, "status_code": response.status_code, "empty": True} 96 if "application/json" in response.headers.get("content-type", ""): 97 return response.json() 98 return {"success": True, "text": response.text} 99 100 def _raise_api_error(self, response: httpx.Response, *, method: str, path: str) -> None: 101 detail = response.text.strip() 102 message = _friendly_spotify_error_message( 103 status_code=response.status_code, 104 detail=_extract_spotify_error_detail(response, fallback=detail), 105 method=method, 106 path=path, 107 retry_after=response.headers.get("Retry-After"), 108 ) 109 error = SpotifyAPIError(message, status_code=response.status_code, response_body=detail) 110 error.path = path 111 raise error 112 113 def get_devices(self) -> Any: 114 return self.request("GET", "/me/player/devices") 115 116 def transfer_playback(self, *, device_id: str, play: bool = False) -> Any: 117 return self.request("PUT", "/me/player", json_body={ 118 "device_ids": [device_id], 119 "play": play, 120 }) 121 122 def get_playback_state(self, *, market: Optional[str] = None) -> Any: 123 return self.request( 124 "GET", 125 "/me/player", 126 params={"market": market}, 127 empty_response={ 128 "status_code": 204, 129 "empty": True, 130 "message": "No active Spotify playback session was found. Open Spotify on a device and start playback, or transfer playback to an available device.", 131 }, 132 ) 133 134 def get_currently_playing(self, *, market: Optional[str] = None) -> Any: 135 return self.request( 136 "GET", 137 "/me/player/currently-playing", 138 params={"market": market}, 139 empty_response={ 140 "status_code": 204, 141 "empty": True, 142 "message": "Spotify is not currently playing anything. Start playback in Spotify and try again.", 143 }, 144 ) 145 146 def start_playback( 147 self, 148 *, 149 device_id: Optional[str] = None, 150 context_uri: Optional[str] = None, 151 uris: Optional[list[str]] = None, 152 offset: Optional[Dict[str, Any]] = None, 153 position_ms: Optional[int] = None, 154 ) -> Any: 155 return self.request( 156 "PUT", 157 "/me/player/play", 158 params={"device_id": device_id}, 159 json_body={ 160 "context_uri": context_uri, 161 "uris": uris, 162 "offset": offset, 163 "position_ms": position_ms, 164 }, 165 ) 166 167 def pause_playback(self, *, device_id: Optional[str] = None) -> Any: 168 return self.request("PUT", "/me/player/pause", params={"device_id": device_id}) 169 170 def skip_next(self, *, device_id: Optional[str] = None) -> Any: 171 return self.request("POST", "/me/player/next", params={"device_id": device_id}) 172 173 def skip_previous(self, *, device_id: Optional[str] = None) -> Any: 174 return self.request("POST", "/me/player/previous", params={"device_id": device_id}) 175 176 def seek(self, *, position_ms: int, device_id: Optional[str] = None) -> Any: 177 return self.request("PUT", "/me/player/seek", params={ 178 "position_ms": position_ms, 179 "device_id": device_id, 180 }) 181 182 def set_repeat(self, *, state: str, device_id: Optional[str] = None) -> Any: 183 return self.request("PUT", "/me/player/repeat", params={"state": state, "device_id": device_id}) 184 185 def set_shuffle(self, *, state: bool, device_id: Optional[str] = None) -> Any: 186 return self.request("PUT", "/me/player/shuffle", params={"state": str(bool(state)).lower(), "device_id": device_id}) 187 188 def set_volume(self, *, volume_percent: int, device_id: Optional[str] = None) -> Any: 189 return self.request("PUT", "/me/player/volume", params={ 190 "volume_percent": volume_percent, 191 "device_id": device_id, 192 }) 193 194 def get_queue(self) -> Any: 195 return self.request("GET", "/me/player/queue") 196 197 def add_to_queue(self, *, uri: str, device_id: Optional[str] = None) -> Any: 198 return self.request("POST", "/me/player/queue", params={"uri": uri, "device_id": device_id}) 199 200 def search( 201 self, 202 *, 203 query: str, 204 search_types: list[str], 205 limit: int = 10, 206 offset: int = 0, 207 market: Optional[str] = None, 208 include_external: Optional[str] = None, 209 ) -> Any: 210 return self.request("GET", "/search", params={ 211 "q": query, 212 "type": ",".join(search_types), 213 "limit": limit, 214 "offset": offset, 215 "market": market, 216 "include_external": include_external, 217 }) 218 219 def get_my_playlists(self, *, limit: int = 20, offset: int = 0) -> Any: 220 return self.request("GET", "/me/playlists", params={"limit": limit, "offset": offset}) 221 222 def get_playlist(self, *, playlist_id: str, market: Optional[str] = None) -> Any: 223 return self.request("GET", f"/playlists/{playlist_id}", params={"market": market}) 224 225 def create_playlist( 226 self, 227 *, 228 name: str, 229 public: bool = False, 230 collaborative: bool = False, 231 description: Optional[str] = None, 232 ) -> Any: 233 return self.request("POST", "/me/playlists", json_body={ 234 "name": name, 235 "public": public, 236 "collaborative": collaborative, 237 "description": description, 238 }) 239 240 def add_playlist_items( 241 self, 242 *, 243 playlist_id: str, 244 uris: list[str], 245 position: Optional[int] = None, 246 ) -> Any: 247 return self.request("POST", f"/playlists/{playlist_id}/items", json_body={ 248 "uris": uris, 249 "position": position, 250 }) 251 252 def remove_playlist_items( 253 self, 254 *, 255 playlist_id: str, 256 uris: list[str], 257 snapshot_id: Optional[str] = None, 258 ) -> Any: 259 return self.request("DELETE", f"/playlists/{playlist_id}/items", json_body={ 260 "items": [{"uri": uri} for uri in uris], 261 "snapshot_id": snapshot_id, 262 }) 263 264 def update_playlist_details( 265 self, 266 *, 267 playlist_id: str, 268 name: Optional[str] = None, 269 public: Optional[bool] = None, 270 collaborative: Optional[bool] = None, 271 description: Optional[str] = None, 272 ) -> Any: 273 return self.request("PUT", f"/playlists/{playlist_id}", json_body={ 274 "name": name, 275 "public": public, 276 "collaborative": collaborative, 277 "description": description, 278 }) 279 280 def get_album(self, *, album_id: str, market: Optional[str] = None) -> Any: 281 return self.request("GET", f"/albums/{album_id}", params={"market": market}) 282 283 def get_album_tracks(self, *, album_id: str, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any: 284 return self.request("GET", f"/albums/{album_id}/tracks", params={ 285 "limit": limit, 286 "offset": offset, 287 "market": market, 288 }) 289 290 def get_saved_tracks(self, *, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any: 291 return self.request("GET", "/me/tracks", params={"limit": limit, "offset": offset, "market": market}) 292 293 def save_library_items(self, *, uris: list[str]) -> Any: 294 return self.request("PUT", "/me/library", params={"uris": ",".join(uris)}) 295 296 def library_contains(self, *, uris: list[str]) -> Any: 297 return self.request("GET", "/me/library/contains", params={"uris": ",".join(uris)}) 298 299 def get_saved_albums(self, *, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any: 300 return self.request("GET", "/me/albums", params={"limit": limit, "offset": offset, "market": market}) 301 302 def remove_saved_tracks(self, *, track_ids: list[str]) -> Any: 303 uris = [f"spotify:track:{track_id}" for track_id in track_ids] 304 return self.request("DELETE", "/me/library", params={"uris": ",".join(uris)}) 305 306 def remove_saved_albums(self, *, album_ids: list[str]) -> Any: 307 uris = [f"spotify:album:{album_id}" for album_id in album_ids] 308 return self.request("DELETE", "/me/library", params={"uris": ",".join(uris)}) 309 310 def get_recently_played( 311 self, 312 *, 313 limit: int = 20, 314 after: Optional[int] = None, 315 before: Optional[int] = None, 316 ) -> Any: 317 return self.request("GET", "/me/player/recently-played", params={ 318 "limit": limit, 319 "after": after, 320 "before": before, 321 }) 322 323 324 def _extract_spotify_error_detail(response: httpx.Response, *, fallback: str) -> str: 325 detail = fallback 326 try: 327 payload = response.json() 328 if isinstance(payload, dict): 329 error_obj = payload.get("error") 330 if isinstance(error_obj, dict): 331 detail = str(error_obj.get("message") or detail) 332 elif isinstance(error_obj, str): 333 detail = error_obj 334 except Exception: 335 pass 336 return detail.strip() 337 338 339 def _friendly_spotify_error_message( 340 *, 341 status_code: int, 342 detail: str, 343 method: str, 344 path: str, 345 retry_after: Optional[str], 346 ) -> str: 347 normalized_detail = detail.lower() 348 is_playback_path = path.startswith("/me/player") 349 350 if status_code == 401: 351 return "Spotify authentication failed or expired. Run `hermes auth spotify` again." 352 353 if status_code == 403: 354 if is_playback_path: 355 return ( 356 "Spotify rejected this playback request. Playback control usually requires a Spotify Premium account " 357 "and an active Spotify Connect device." 358 ) 359 if "scope" in normalized_detail or "permission" in normalized_detail: 360 return "Spotify rejected the request because the current auth scope is insufficient. Re-run `hermes auth spotify` to refresh permissions." 361 return "Spotify rejected the request. The account may not have permission for this action." 362 363 if status_code == 404: 364 if is_playback_path: 365 return "Spotify could not find an active playback device or player session for this request." 366 return "Spotify resource not found." 367 368 if status_code == 429: 369 message = "Spotify rate limit exceeded." 370 if retry_after: 371 message += f" Retry after {retry_after} seconds." 372 return message 373 374 if detail: 375 return detail 376 return f"Spotify API request failed with status {status_code}." 377 378 379 def _strip_none(payload: Optional[Dict[str, Any]]) -> Dict[str, Any]: 380 if not payload: 381 return {} 382 return {key: value for key, value in payload.items() if value is not None} 383 384 385 def normalize_spotify_id(value: str, expected_type: Optional[str] = None) -> str: 386 cleaned = (value or "").strip() 387 if not cleaned: 388 raise SpotifyError("Spotify id/uri/url is required.") 389 if cleaned.startswith("spotify:"): 390 parts = cleaned.split(":") 391 if len(parts) >= 3: 392 item_type = parts[1] 393 if expected_type and item_type != expected_type: 394 raise SpotifyError(f"Expected a Spotify {expected_type}, got {item_type}.") 395 return parts[2] 396 if "open.spotify.com" in cleaned: 397 parsed = urlparse(cleaned) 398 path_parts = [part for part in parsed.path.split("/") if part] 399 if len(path_parts) >= 2: 400 item_type, item_id = path_parts[0], path_parts[1] 401 if expected_type and item_type != expected_type: 402 raise SpotifyError(f"Expected a Spotify {expected_type}, got {item_type}.") 403 return item_id 404 return cleaned 405 406 407 def normalize_spotify_uri(value: str, expected_type: Optional[str] = None) -> str: 408 cleaned = (value or "").strip() 409 if not cleaned: 410 raise SpotifyError("Spotify URI/url/id is required.") 411 if cleaned.startswith("spotify:"): 412 if expected_type: 413 parts = cleaned.split(":") 414 if len(parts) >= 3 and parts[1] != expected_type: 415 raise SpotifyError(f"Expected a Spotify {expected_type}, got {parts[1]}.") 416 return cleaned 417 item_id = normalize_spotify_id(cleaned, expected_type) 418 if expected_type: 419 return f"spotify:{expected_type}:{item_id}" 420 return cleaned 421 422 423 def normalize_spotify_uris(values: Iterable[str], expected_type: Optional[str] = None) -> list[str]: 424 uris: list[str] = [] 425 for value in values: 426 uri = normalize_spotify_uri(str(value), expected_type) 427 if uri not in uris: 428 uris.append(uri) 429 if not uris: 430 raise SpotifyError("At least one Spotify item is required.") 431 return uris 432 433 434 def compact_json(data: Any) -> str: 435 return json.dumps(data, ensure_ascii=False)