/ plugins / spotify / client.py
client.py
  1  """Thin Spotify Web API helper used by Hermes native tools."""
  2  
  3  from __future__ import annotations
  4  
  5  import json
  6  from typing import Any, Dict, Iterable, Optional
  7  from urllib.parse import urlparse
  8  
  9  import httpx
 10  
 11  from hermes_cli.auth import (
 12      AuthError,
 13      resolve_spotify_runtime_credentials,
 14  )
 15  
 16  
 17  class SpotifyError(RuntimeError):
 18      """Base Spotify tool error."""
 19  
 20  
 21  class SpotifyAuthRequiredError(SpotifyError):
 22      """Raised when the user needs to authenticate with Spotify first."""
 23  
 24  
 25  class SpotifyAPIError(SpotifyError):
 26      """Structured Spotify API failure."""
 27  
 28      def __init__(
 29          self,
 30          message: str,
 31          *,
 32          status_code: Optional[int] = None,
 33          response_body: Optional[str] = None,
 34      ) -> None:
 35          super().__init__(message)
 36          self.status_code = status_code
 37          self.response_body = response_body
 38          self.path = None
 39  
 40  
 41  class SpotifyClient:
 42      def __init__(self) -> None:
 43          self._runtime = self._resolve_runtime(refresh_if_expiring=True)
 44  
 45      def _resolve_runtime(self, *, force_refresh: bool = False, refresh_if_expiring: bool = True) -> Dict[str, Any]:
 46          try:
 47              return resolve_spotify_runtime_credentials(
 48                  force_refresh=force_refresh,
 49                  refresh_if_expiring=refresh_if_expiring,
 50              )
 51          except AuthError as exc:
 52              raise SpotifyAuthRequiredError(str(exc)) from exc
 53  
 54      @property
 55      def base_url(self) -> str:
 56          return str(self._runtime.get("base_url") or "").rstrip("/")
 57  
 58      def _headers(self) -> Dict[str, str]:
 59          return {
 60              "Authorization": f"Bearer {self._runtime['access_token']}",
 61              "Content-Type": "application/json",
 62          }
 63  
 64      def request(
 65          self,
 66          method: str,
 67          path: str,
 68          *,
 69          params: Optional[Dict[str, Any]] = None,
 70          json_body: Optional[Dict[str, Any]] = None,
 71          allow_retry_on_401: bool = True,
 72          empty_response: Optional[Dict[str, Any]] = None,
 73      ) -> Any:
 74          url = f"{self.base_url}{path}"
 75          response = httpx.request(
 76              method,
 77              url,
 78              headers=self._headers(),
 79              params=_strip_none(params),
 80              json=_strip_none(json_body) if json_body is not None else None,
 81              timeout=30.0,
 82          )
 83          if response.status_code == 401 and allow_retry_on_401:
 84              self._runtime = self._resolve_runtime(force_refresh=True, refresh_if_expiring=True)
 85              return self.request(
 86                  method,
 87                  path,
 88                  params=params,
 89                  json_body=json_body,
 90                  allow_retry_on_401=False,
 91              )
 92          if response.status_code >= 400:
 93              self._raise_api_error(response, method=method, path=path)
 94          if response.status_code == 204 or not response.content:
 95              return empty_response or {"success": True, "status_code": response.status_code, "empty": True}
 96          if "application/json" in response.headers.get("content-type", ""):
 97              return response.json()
 98          return {"success": True, "text": response.text}
 99  
100      def _raise_api_error(self, response: httpx.Response, *, method: str, path: str) -> None:
101          detail = response.text.strip()
102          message = _friendly_spotify_error_message(
103              status_code=response.status_code,
104              detail=_extract_spotify_error_detail(response, fallback=detail),
105              method=method,
106              path=path,
107              retry_after=response.headers.get("Retry-After"),
108          )
109          error = SpotifyAPIError(message, status_code=response.status_code, response_body=detail)
110          error.path = path
111          raise error
112  
113      def get_devices(self) -> Any:
114          return self.request("GET", "/me/player/devices")
115  
116      def transfer_playback(self, *, device_id: str, play: bool = False) -> Any:
117          return self.request("PUT", "/me/player", json_body={
118              "device_ids": [device_id],
119              "play": play,
120          })
121  
122      def get_playback_state(self, *, market: Optional[str] = None) -> Any:
123          return self.request(
124              "GET",
125              "/me/player",
126              params={"market": market},
127              empty_response={
128                  "status_code": 204,
129                  "empty": True,
130                  "message": "No active Spotify playback session was found. Open Spotify on a device and start playback, or transfer playback to an available device.",
131              },
132          )
133  
134      def get_currently_playing(self, *, market: Optional[str] = None) -> Any:
135          return self.request(
136              "GET",
137              "/me/player/currently-playing",
138              params={"market": market},
139              empty_response={
140                  "status_code": 204,
141                  "empty": True,
142                  "message": "Spotify is not currently playing anything. Start playback in Spotify and try again.",
143              },
144          )
145  
146      def start_playback(
147          self,
148          *,
149          device_id: Optional[str] = None,
150          context_uri: Optional[str] = None,
151          uris: Optional[list[str]] = None,
152          offset: Optional[Dict[str, Any]] = None,
153          position_ms: Optional[int] = None,
154      ) -> Any:
155          return self.request(
156              "PUT",
157              "/me/player/play",
158              params={"device_id": device_id},
159              json_body={
160                  "context_uri": context_uri,
161                  "uris": uris,
162                  "offset": offset,
163                  "position_ms": position_ms,
164              },
165          )
166  
167      def pause_playback(self, *, device_id: Optional[str] = None) -> Any:
168          return self.request("PUT", "/me/player/pause", params={"device_id": device_id})
169  
170      def skip_next(self, *, device_id: Optional[str] = None) -> Any:
171          return self.request("POST", "/me/player/next", params={"device_id": device_id})
172  
173      def skip_previous(self, *, device_id: Optional[str] = None) -> Any:
174          return self.request("POST", "/me/player/previous", params={"device_id": device_id})
175  
176      def seek(self, *, position_ms: int, device_id: Optional[str] = None) -> Any:
177          return self.request("PUT", "/me/player/seek", params={
178              "position_ms": position_ms,
179              "device_id": device_id,
180          })
181  
182      def set_repeat(self, *, state: str, device_id: Optional[str] = None) -> Any:
183          return self.request("PUT", "/me/player/repeat", params={"state": state, "device_id": device_id})
184  
185      def set_shuffle(self, *, state: bool, device_id: Optional[str] = None) -> Any:
186          return self.request("PUT", "/me/player/shuffle", params={"state": str(bool(state)).lower(), "device_id": device_id})
187  
188      def set_volume(self, *, volume_percent: int, device_id: Optional[str] = None) -> Any:
189          return self.request("PUT", "/me/player/volume", params={
190              "volume_percent": volume_percent,
191              "device_id": device_id,
192          })
193  
194      def get_queue(self) -> Any:
195          return self.request("GET", "/me/player/queue")
196  
197      def add_to_queue(self, *, uri: str, device_id: Optional[str] = None) -> Any:
198          return self.request("POST", "/me/player/queue", params={"uri": uri, "device_id": device_id})
199  
200      def search(
201          self,
202          *,
203          query: str,
204          search_types: list[str],
205          limit: int = 10,
206          offset: int = 0,
207          market: Optional[str] = None,
208          include_external: Optional[str] = None,
209      ) -> Any:
210          return self.request("GET", "/search", params={
211              "q": query,
212              "type": ",".join(search_types),
213              "limit": limit,
214              "offset": offset,
215              "market": market,
216              "include_external": include_external,
217          })
218  
219      def get_my_playlists(self, *, limit: int = 20, offset: int = 0) -> Any:
220          return self.request("GET", "/me/playlists", params={"limit": limit, "offset": offset})
221  
222      def get_playlist(self, *, playlist_id: str, market: Optional[str] = None) -> Any:
223          return self.request("GET", f"/playlists/{playlist_id}", params={"market": market})
224  
225      def create_playlist(
226          self,
227          *,
228          name: str,
229          public: bool = False,
230          collaborative: bool = False,
231          description: Optional[str] = None,
232      ) -> Any:
233          return self.request("POST", "/me/playlists", json_body={
234              "name": name,
235              "public": public,
236              "collaborative": collaborative,
237              "description": description,
238          })
239  
240      def add_playlist_items(
241          self,
242          *,
243          playlist_id: str,
244          uris: list[str],
245          position: Optional[int] = None,
246      ) -> Any:
247          return self.request("POST", f"/playlists/{playlist_id}/items", json_body={
248              "uris": uris,
249              "position": position,
250          })
251  
252      def remove_playlist_items(
253          self,
254          *,
255          playlist_id: str,
256          uris: list[str],
257          snapshot_id: Optional[str] = None,
258      ) -> Any:
259          return self.request("DELETE", f"/playlists/{playlist_id}/items", json_body={
260              "items": [{"uri": uri} for uri in uris],
261              "snapshot_id": snapshot_id,
262          })
263  
264      def update_playlist_details(
265          self,
266          *,
267          playlist_id: str,
268          name: Optional[str] = None,
269          public: Optional[bool] = None,
270          collaborative: Optional[bool] = None,
271          description: Optional[str] = None,
272      ) -> Any:
273          return self.request("PUT", f"/playlists/{playlist_id}", json_body={
274              "name": name,
275              "public": public,
276              "collaborative": collaborative,
277              "description": description,
278          })
279  
280      def get_album(self, *, album_id: str, market: Optional[str] = None) -> Any:
281          return self.request("GET", f"/albums/{album_id}", params={"market": market})
282  
283      def get_album_tracks(self, *, album_id: str, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any:
284          return self.request("GET", f"/albums/{album_id}/tracks", params={
285              "limit": limit,
286              "offset": offset,
287              "market": market,
288          })
289  
290      def get_saved_tracks(self, *, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any:
291          return self.request("GET", "/me/tracks", params={"limit": limit, "offset": offset, "market": market})
292  
293      def save_library_items(self, *, uris: list[str]) -> Any:
294          return self.request("PUT", "/me/library", params={"uris": ",".join(uris)})
295  
296      def library_contains(self, *, uris: list[str]) -> Any:
297          return self.request("GET", "/me/library/contains", params={"uris": ",".join(uris)})
298  
299      def get_saved_albums(self, *, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any:
300          return self.request("GET", "/me/albums", params={"limit": limit, "offset": offset, "market": market})
301  
302      def remove_saved_tracks(self, *, track_ids: list[str]) -> Any:
303          uris = [f"spotify:track:{track_id}" for track_id in track_ids]
304          return self.request("DELETE", "/me/library", params={"uris": ",".join(uris)})
305  
306      def remove_saved_albums(self, *, album_ids: list[str]) -> Any:
307          uris = [f"spotify:album:{album_id}" for album_id in album_ids]
308          return self.request("DELETE", "/me/library", params={"uris": ",".join(uris)})
309  
310      def get_recently_played(
311          self,
312          *,
313          limit: int = 20,
314          after: Optional[int] = None,
315          before: Optional[int] = None,
316      ) -> Any:
317          return self.request("GET", "/me/player/recently-played", params={
318              "limit": limit,
319              "after": after,
320              "before": before,
321          })
322  
323  
324  def _extract_spotify_error_detail(response: httpx.Response, *, fallback: str) -> str:
325      detail = fallback
326      try:
327          payload = response.json()
328          if isinstance(payload, dict):
329              error_obj = payload.get("error")
330              if isinstance(error_obj, dict):
331                  detail = str(error_obj.get("message") or detail)
332              elif isinstance(error_obj, str):
333                  detail = error_obj
334      except Exception:
335          pass
336      return detail.strip()
337  
338  
339  def _friendly_spotify_error_message(
340      *,
341      status_code: int,
342      detail: str,
343      method: str,
344      path: str,
345      retry_after: Optional[str],
346  ) -> str:
347      normalized_detail = detail.lower()
348      is_playback_path = path.startswith("/me/player")
349  
350      if status_code == 401:
351          return "Spotify authentication failed or expired. Run `hermes auth spotify` again."
352  
353      if status_code == 403:
354          if is_playback_path:
355              return (
356                  "Spotify rejected this playback request. Playback control usually requires a Spotify Premium account "
357                  "and an active Spotify Connect device."
358              )
359          if "scope" in normalized_detail or "permission" in normalized_detail:
360              return "Spotify rejected the request because the current auth scope is insufficient. Re-run `hermes auth spotify` to refresh permissions."
361          return "Spotify rejected the request. The account may not have permission for this action."
362  
363      if status_code == 404:
364          if is_playback_path:
365              return "Spotify could not find an active playback device or player session for this request."
366          return "Spotify resource not found."
367  
368      if status_code == 429:
369          message = "Spotify rate limit exceeded."
370          if retry_after:
371              message += f" Retry after {retry_after} seconds."
372          return message
373  
374      if detail:
375          return detail
376      return f"Spotify API request failed with status {status_code}."
377  
378  
379  def _strip_none(payload: Optional[Dict[str, Any]]) -> Dict[str, Any]:
380      if not payload:
381          return {}
382      return {key: value for key, value in payload.items() if value is not None}
383  
384  
385  def normalize_spotify_id(value: str, expected_type: Optional[str] = None) -> str:
386      cleaned = (value or "").strip()
387      if not cleaned:
388          raise SpotifyError("Spotify id/uri/url is required.")
389      if cleaned.startswith("spotify:"):
390          parts = cleaned.split(":")
391          if len(parts) >= 3:
392              item_type = parts[1]
393              if expected_type and item_type != expected_type:
394                  raise SpotifyError(f"Expected a Spotify {expected_type}, got {item_type}.")
395              return parts[2]
396      if "open.spotify.com" in cleaned:
397          parsed = urlparse(cleaned)
398          path_parts = [part for part in parsed.path.split("/") if part]
399          if len(path_parts) >= 2:
400              item_type, item_id = path_parts[0], path_parts[1]
401              if expected_type and item_type != expected_type:
402                  raise SpotifyError(f"Expected a Spotify {expected_type}, got {item_type}.")
403              return item_id
404      return cleaned
405  
406  
407  def normalize_spotify_uri(value: str, expected_type: Optional[str] = None) -> str:
408      cleaned = (value or "").strip()
409      if not cleaned:
410          raise SpotifyError("Spotify URI/url/id is required.")
411      if cleaned.startswith("spotify:"):
412          if expected_type:
413              parts = cleaned.split(":")
414              if len(parts) >= 3 and parts[1] != expected_type:
415                  raise SpotifyError(f"Expected a Spotify {expected_type}, got {parts[1]}.")
416          return cleaned
417      item_id = normalize_spotify_id(cleaned, expected_type)
418      if expected_type:
419          return f"spotify:{expected_type}:{item_id}"
420      return cleaned
421  
422  
423  def normalize_spotify_uris(values: Iterable[str], expected_type: Optional[str] = None) -> list[str]:
424      uris: list[str] = []
425      for value in values:
426          uri = normalize_spotify_uri(str(value), expected_type)
427          if uri not in uris:
428              uris.append(uri)
429      if not uris:
430          raise SpotifyError("At least one Spotify item is required.")
431      return uris
432  
433  
434  def compact_json(data: Any) -> str:
435      return json.dumps(data, ensure_ascii=False)