Tidal: remove unnecessary error handling decorator (#2004)
authorJozef Kruszynski <60214390+jozefKruszynski@users.noreply.github.com>
Thu, 6 Mar 2025 17:48:49 +0000 (18:48 +0100)
committerGitHub <noreply@github.com>
Thu, 6 Mar 2025 17:48:49 +0000 (18:48 +0100)
fix: remove unnecessary error handling decorator

music_assistant/providers/tidal/__init__.py

index 8c8dcd4f30a6996f5b8d910c330506911b3af610..add7ca5b3fd3b51ae6cab7346690975a7c712d2f 100644 (file)
@@ -5,7 +5,7 @@ from __future__ import annotations
 import asyncio
 import functools
 import json
-from collections.abc import Awaitable, Callable, Coroutine
+from collections.abc import Awaitable, Callable
 from contextlib import suppress
 from enum import StrEnum
 from typing import TYPE_CHECKING, Any, TypeVar, cast
@@ -439,28 +439,6 @@ class TidalProvider(MusicProvider):
 
         return wrapper
 
-    @staticmethod
-    def handle_item_errors(
-        item_type: str,
-    ) -> Callable[[Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]]:
-        """Handle standard error patterns in item getters."""
-
-        def decorator(
-            method: Callable[..., Coroutine[Any, Any, T]],
-        ) -> Callable[..., Coroutine[Any, Any, T]]:
-            @functools.wraps(method)
-            async def wrapper(self: TidalProvider, item_id: str, *args: Any, **kwargs: Any) -> T:
-                try:
-                    return await method(self, item_id, *args, **kwargs)
-                except ResourceTemporarilyUnavailable:
-                    raise
-                except Exception as err:
-                    raise MediaNotFoundError(f"{item_type} {item_id} not found") from err
-
-            return wrapper
-
-        return decorator
-
     #
     # CORE API METHODS
     #
@@ -717,93 +695,133 @@ class TidalProvider(MusicProvider):
 
         return parsed_results
 
-    @handle_item_errors("Track")
     async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
         """Get similar tracks for given track id."""
-        api_result = await self._get_data(f"tracks/{prov_track_id}/radio", params={"limit": limit})
-        similar_tracks = self._extract_data(api_result)
-        return [self._parse_track(track_obj) for track_obj in similar_tracks.get("items", [])]
+        try:
+            api_result = await self._get_data(
+                f"tracks/{prov_track_id}/radio", params={"limit": limit}
+            )
+            similar_tracks = self._extract_data(api_result)
+            return [self._parse_track(track_obj) for track_obj in similar_tracks.get("items", [])]
+        except ResourceTemporarilyUnavailable:
+            raise
+        except Exception as err:
+            raise MediaNotFoundError(f"Track {prov_track_id} not found") from err
 
     #
     # ITEM RETRIEVAL METHODS
     #
 
-    @handle_item_errors("Artist")
     async def get_artist(self, prov_artist_id: str) -> Artist:
         """Get artist details for given artist id."""
-        api_result = await self._get_data(f"artists/{prov_artist_id}")
-        artist_obj = self._extract_data(api_result)
-        return self._parse_artist(artist_obj)
+        try:
+            api_result = await self._get_data(f"artists/{prov_artist_id}")
+            artist_obj = self._extract_data(api_result)
+            return self._parse_artist(artist_obj)
+        except ResourceTemporarilyUnavailable:
+            raise
+        except Exception as err:
+            raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
 
-    @handle_item_errors("Album")
     async def get_album(self, prov_album_id: str) -> Album:
         """Get album details for given album id."""
-        api_result = await self._get_data(f"albums/{prov_album_id}")
-        album_obj = self._extract_data(api_result)
-        return self._parse_album(album_obj)
+        try:
+            api_result = await self._get_data(f"albums/{prov_album_id}")
+            album_obj = self._extract_data(api_result)
+            return self._parse_album(album_obj)
+        except ResourceTemporarilyUnavailable:
+            raise
+        except Exception as err:
+            raise MediaNotFoundError(f"Album {prov_album_id} not found") from err
 
-    @handle_item_errors("Track")
     async def get_track(self, prov_track_id: str) -> Track:
         """Get track details for given track id."""
-        api_result = await self._get_data(f"tracks/{prov_track_id}")
-        track_obj = self._extract_data(api_result)
-        track = self._parse_track(track_obj)
-        # Get additional details like lyrics if needed
-        with suppress(MediaNotFoundError):
-            api_result = await self._get_data(f"tracks/{prov_track_id}/lyrics")
-            lyrics_data = self._extract_data(api_result)
-
-            if lyrics_data and "text" in lyrics_data:
-                track.metadata.lyrics = lyrics_data["text"]
-
-        return track
+        try:
+            api_result = await self._get_data(f"tracks/{prov_track_id}")
+            track_obj = self._extract_data(api_result)
+            track = self._parse_track(track_obj)
+            # Get additional details like lyrics if needed
+            with suppress(MediaNotFoundError):
+                api_result = await self._get_data(f"tracks/{prov_track_id}/lyrics")
+                lyrics_data = self._extract_data(api_result)
+
+                if lyrics_data and "text" in lyrics_data:
+                    track.metadata.lyrics = lyrics_data["text"]
+
+            return track
+        except ResourceTemporarilyUnavailable:
+            raise
+        except Exception as err:
+            raise MediaNotFoundError(f"Track {prov_track_id} not found") from err
 
-    @handle_item_errors("Playlist")
     async def get_playlist(self, prov_playlist_id: str) -> Playlist:
         """Get playlist details for given playlist id."""
-        api_result = await self._get_data(f"playlists/{prov_playlist_id}")
-        playlist_obj = self._extract_data(api_result)
-        return self._parse_playlist(playlist_obj)
+        try:
+            api_result = await self._get_data(f"playlists/{prov_playlist_id}")
+            playlist_obj = self._extract_data(api_result)
+            return self._parse_playlist(playlist_obj)
+        except ResourceTemporarilyUnavailable:
+            raise
+        except Exception as err:
+            raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err
 
-    @handle_item_errors("Track")
     async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
         """Get album tracks for given album id."""
-        api_result = await self._get_data(f"albums/{prov_album_id}/tracks")
-        album_tracks = self._extract_data(api_result)
-        return [self._parse_track(track_obj) for track_obj in album_tracks.get("items", [])]
+        try:
+            api_result = await self._get_data(f"albums/{prov_album_id}/tracks")
+            album_tracks = self._extract_data(api_result)
+            return [self._parse_track(track_obj) for track_obj in album_tracks.get("items", [])]
+        except ResourceTemporarilyUnavailable:
+            raise
+        except Exception as err:
+            raise MediaNotFoundError(f"Album {prov_album_id} not found") from err
 
-    @handle_item_errors("Album")
     async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
         """Get a list of all albums for the given artist."""
-        api_result = await self._get_data(f"artists/{prov_artist_id}/albums")
-        artist_albums = self._extract_data(api_result)
-        return [self._parse_album(album_obj) for album_obj in artist_albums.get("items", [])]
+        try:
+            api_result = await self._get_data(f"artists/{prov_artist_id}/albums")
+            artist_albums = self._extract_data(api_result)
+            return [self._parse_album(album_obj) for album_obj in artist_albums.get("items", [])]
+        except ResourceTemporarilyUnavailable:
+            raise
+        except Exception as err:
+            raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
 
-    @handle_item_errors("Track")
     async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
         """Get a list of 10 most popular tracks for the given artist."""
-        api_result = await self._get_data(
-            f"artists/{prov_artist_id}/toptracks", params={"limit": 10, "offset": 0}
-        )
-        artist_top_tracks = self._extract_data(api_result)
-        return [self._parse_track(track_obj) for track_obj in artist_top_tracks.get("items", [])]
+        try:
+            api_result = await self._get_data(
+                f"artists/{prov_artist_id}/toptracks", params={"limit": 10, "offset": 0}
+            )
+            artist_top_tracks = self._extract_data(api_result)
+            return [
+                self._parse_track(track_obj) for track_obj in artist_top_tracks.get("items", [])
+            ]
+        except ResourceTemporarilyUnavailable:
+            raise
+        except Exception as err:
+            raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
 
-    @handle_item_errors("Playlist")
     async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
         """Get playlist tracks."""
         result: list[Track] = []
         page_size = 200
         offset = page * page_size
-        api_result = await self._get_data(
-            f"playlists/{prov_playlist_id}/tracks",
-            params={"limit": page_size, "offset": offset},
-        )
-        tidal_tracks = self._extract_data(api_result)
-        for index, track_obj in enumerate(tidal_tracks.get("items", []), 1):
-            track = self._parse_track(track_obj=track_obj)
-            track.position = offset + index
-            result.append(track)
-        return result
+        try:
+            api_result = await self._get_data(
+                f"playlists/{prov_playlist_id}/tracks",
+                params={"limit": page_size, "offset": offset},
+            )
+            tidal_tracks = self._extract_data(api_result)
+            for index, track_obj in enumerate(tidal_tracks.get("items", []), 1):
+                track = self._parse_track(track_obj=track_obj)
+                track.position = offset + index
+                result.append(track)
+            return result
+        except ResourceTemporarilyUnavailable:
+            raise
+        except Exception as err:
+            raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err
 
     async def get_stream_details(
         self, item_id: str, media_type: MediaType = MediaType.TRACK