From 55f75c8a4fa7486acf61414d2539a3d37666064b Mon Sep 17 00:00:00 2001 From: Jozef Kruszynski <60214390+jozefKruszynski@users.noreply.github.com> Date: Thu, 6 Mar 2025 18:48:49 +0100 Subject: [PATCH] Tidal: remove unnecessary error handling decorator (#2004) fix: remove unnecessary error handling decorator --- music_assistant/providers/tidal/__init__.py | 172 +++++++++++--------- 1 file changed, 95 insertions(+), 77 deletions(-) diff --git a/music_assistant/providers/tidal/__init__.py b/music_assistant/providers/tidal/__init__.py index 8c8dcd4f..add7ca5b 100644 --- a/music_assistant/providers/tidal/__init__.py +++ b/music_assistant/providers/tidal/__init__.py @@ -5,7 +5,7 @@ from __future__ import annotations import asyncio import functools import json -from collections.abc import Awaitable, Callable, Coroutine +from collections.abc import Awaitable, Callable from contextlib import suppress from enum import StrEnum from typing import TYPE_CHECKING, Any, TypeVar, cast @@ -439,28 +439,6 @@ class TidalProvider(MusicProvider): return wrapper - @staticmethod - def handle_item_errors( - item_type: str, - ) -> Callable[[Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]]: - """Handle standard error patterns in item getters.""" - - def decorator( - method: Callable[..., Coroutine[Any, Any, T]], - ) -> Callable[..., Coroutine[Any, Any, T]]: - @functools.wraps(method) - async def wrapper(self: TidalProvider, item_id: str, *args: Any, **kwargs: Any) -> T: - try: - return await method(self, item_id, *args, **kwargs) - except ResourceTemporarilyUnavailable: - raise - except Exception as err: - raise MediaNotFoundError(f"{item_type} {item_id} not found") from err - - return wrapper - - return decorator - # # CORE API METHODS # @@ -717,93 +695,133 @@ class TidalProvider(MusicProvider): return parsed_results - @handle_item_errors("Track") async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]: """Get similar tracks for given track id.""" - api_result = await self._get_data(f"tracks/{prov_track_id}/radio", params={"limit": limit}) - similar_tracks = self._extract_data(api_result) - return [self._parse_track(track_obj) for track_obj in similar_tracks.get("items", [])] + try: + api_result = await self._get_data( + f"tracks/{prov_track_id}/radio", params={"limit": limit} + ) + similar_tracks = self._extract_data(api_result) + return [self._parse_track(track_obj) for track_obj in similar_tracks.get("items", [])] + except ResourceTemporarilyUnavailable: + raise + except Exception as err: + raise MediaNotFoundError(f"Track {prov_track_id} not found") from err # # ITEM RETRIEVAL METHODS # - @handle_item_errors("Artist") async def get_artist(self, prov_artist_id: str) -> Artist: """Get artist details for given artist id.""" - api_result = await self._get_data(f"artists/{prov_artist_id}") - artist_obj = self._extract_data(api_result) - return self._parse_artist(artist_obj) + try: + api_result = await self._get_data(f"artists/{prov_artist_id}") + artist_obj = self._extract_data(api_result) + return self._parse_artist(artist_obj) + except ResourceTemporarilyUnavailable: + raise + except Exception as err: + raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err - @handle_item_errors("Album") async def get_album(self, prov_album_id: str) -> Album: """Get album details for given album id.""" - api_result = await self._get_data(f"albums/{prov_album_id}") - album_obj = self._extract_data(api_result) - return self._parse_album(album_obj) + try: + api_result = await self._get_data(f"albums/{prov_album_id}") + album_obj = self._extract_data(api_result) + return self._parse_album(album_obj) + except ResourceTemporarilyUnavailable: + raise + except Exception as err: + raise MediaNotFoundError(f"Album {prov_album_id} not found") from err - @handle_item_errors("Track") async def get_track(self, prov_track_id: str) -> Track: """Get track details for given track id.""" - api_result = await self._get_data(f"tracks/{prov_track_id}") - track_obj = self._extract_data(api_result) - track = self._parse_track(track_obj) - # Get additional details like lyrics if needed - with suppress(MediaNotFoundError): - api_result = await self._get_data(f"tracks/{prov_track_id}/lyrics") - lyrics_data = self._extract_data(api_result) - - if lyrics_data and "text" in lyrics_data: - track.metadata.lyrics = lyrics_data["text"] - - return track + try: + api_result = await self._get_data(f"tracks/{prov_track_id}") + track_obj = self._extract_data(api_result) + track = self._parse_track(track_obj) + # Get additional details like lyrics if needed + with suppress(MediaNotFoundError): + api_result = await self._get_data(f"tracks/{prov_track_id}/lyrics") + lyrics_data = self._extract_data(api_result) + + if lyrics_data and "text" in lyrics_data: + track.metadata.lyrics = lyrics_data["text"] + + return track + except ResourceTemporarilyUnavailable: + raise + except Exception as err: + raise MediaNotFoundError(f"Track {prov_track_id} not found") from err - @handle_item_errors("Playlist") async def get_playlist(self, prov_playlist_id: str) -> Playlist: """Get playlist details for given playlist id.""" - api_result = await self._get_data(f"playlists/{prov_playlist_id}") - playlist_obj = self._extract_data(api_result) - return self._parse_playlist(playlist_obj) + try: + api_result = await self._get_data(f"playlists/{prov_playlist_id}") + playlist_obj = self._extract_data(api_result) + return self._parse_playlist(playlist_obj) + except ResourceTemporarilyUnavailable: + raise + except Exception as err: + raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err - @handle_item_errors("Track") async def get_album_tracks(self, prov_album_id: str) -> list[Track]: """Get album tracks for given album id.""" - api_result = await self._get_data(f"albums/{prov_album_id}/tracks") - album_tracks = self._extract_data(api_result) - return [self._parse_track(track_obj) for track_obj in album_tracks.get("items", [])] + try: + api_result = await self._get_data(f"albums/{prov_album_id}/tracks") + album_tracks = self._extract_data(api_result) + return [self._parse_track(track_obj) for track_obj in album_tracks.get("items", [])] + except ResourceTemporarilyUnavailable: + raise + except Exception as err: + raise MediaNotFoundError(f"Album {prov_album_id} not found") from err - @handle_item_errors("Album") async def get_artist_albums(self, prov_artist_id: str) -> list[Album]: """Get a list of all albums for the given artist.""" - api_result = await self._get_data(f"artists/{prov_artist_id}/albums") - artist_albums = self._extract_data(api_result) - return [self._parse_album(album_obj) for album_obj in artist_albums.get("items", [])] + try: + api_result = await self._get_data(f"artists/{prov_artist_id}/albums") + artist_albums = self._extract_data(api_result) + return [self._parse_album(album_obj) for album_obj in artist_albums.get("items", [])] + except ResourceTemporarilyUnavailable: + raise + except Exception as err: + raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err - @handle_item_errors("Track") async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]: """Get a list of 10 most popular tracks for the given artist.""" - api_result = await self._get_data( - f"artists/{prov_artist_id}/toptracks", params={"limit": 10, "offset": 0} - ) - artist_top_tracks = self._extract_data(api_result) - return [self._parse_track(track_obj) for track_obj in artist_top_tracks.get("items", [])] + try: + api_result = await self._get_data( + f"artists/{prov_artist_id}/toptracks", params={"limit": 10, "offset": 0} + ) + artist_top_tracks = self._extract_data(api_result) + return [ + self._parse_track(track_obj) for track_obj in artist_top_tracks.get("items", []) + ] + except ResourceTemporarilyUnavailable: + raise + except Exception as err: + raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err - @handle_item_errors("Playlist") async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]: """Get playlist tracks.""" result: list[Track] = [] page_size = 200 offset = page * page_size - api_result = await self._get_data( - f"playlists/{prov_playlist_id}/tracks", - params={"limit": page_size, "offset": offset}, - ) - tidal_tracks = self._extract_data(api_result) - for index, track_obj in enumerate(tidal_tracks.get("items", []), 1): - track = self._parse_track(track_obj=track_obj) - track.position = offset + index - result.append(track) - return result + try: + api_result = await self._get_data( + f"playlists/{prov_playlist_id}/tracks", + params={"limit": page_size, "offset": offset}, + ) + tidal_tracks = self._extract_data(api_result) + for index, track_obj in enumerate(tidal_tracks.get("items", []), 1): + track = self._parse_track(track_obj=track_obj) + track.position = offset + index + result.append(track) + return result + except ResourceTemporarilyUnavailable: + raise + except Exception as err: + raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err async def get_stream_details( self, item_id: str, media_type: MediaType = MediaType.TRACK -- 2.34.1