Add typing for builtin (#1450)
authorJc2k <john.carr@unrouted.co.uk>
Sun, 7 Jul 2024 21:42:53 +0000 (22:42 +0100)
committerGitHub <noreply@github.com>
Sun, 7 Jul 2024 21:42:53 +0000 (23:42 +0200)
music_assistant/server/providers/builtin/__init__.py
mypy.ini

index 0da09eaf30c926b3bd1cd9b3893fb51adbde9c07..f3b55e3f09fa4348d14310c81a670688b9376181 100644 (file)
@@ -36,6 +36,7 @@ from music_assistant.common.models.media_items import (
     ProviderMapping,
     Radio,
     Track,
+    UniqueList,
 )
 from music_assistant.common.models.streamdetails import StreamDetails
 from music_assistant.constants import DB_SCHEMA_VERSION, MASS_LOGO, VARIOUS_ARTISTS_FANART
@@ -162,40 +163,46 @@ class BuiltinProvider(MusicProvider):
     async def get_track(self, prov_track_id: str) -> Track:
         """Get full track details by id."""
         parsed_item = await self.parse_item(prov_track_id)
+        assert isinstance(parsed_item, Track)
         stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_TRACKS, [])
         if stored_item := next((x for x in stored_items if x["item_id"] == prov_track_id), None):
             # always prefer the stored info, such as the name
             parsed_item.name = stored_item["name"]
             if image_url := stored_item.get("image_url"):
-                parsed_item.metadata.images = [
-                    MediaItemImage(
-                        type=ImageType.THUMB,
-                        path=image_url,
-                        provider=self.instance_id,
-                        remotely_accessible=image_url.startswith("http"),
-                    )
-                ]
+                parsed_item.metadata.images = UniqueList(
+                    [
+                        MediaItemImage(
+                            type=ImageType.THUMB,
+                            path=image_url,
+                            provider=self.instance_id,
+                            remotely_accessible=image_url.startswith("http"),
+                        )
+                    ]
+                )
         return parsed_item
 
     async def get_radio(self, prov_radio_id: str) -> Radio:
         """Get full radio details by id."""
         parsed_item = await self.parse_item(prov_radio_id, force_radio=True)
+        assert isinstance(parsed_item, Radio)
         stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_RADIOS, [])
         if stored_item := next((x for x in stored_items if x["item_id"] == prov_radio_id), None):
             # always prefer the stored info, such as the name
             parsed_item.name = stored_item["name"]
             if image_url := stored_item.get("image_url"):
-                parsed_item.metadata.images = [
-                    MediaItemImage(
-                        type=ImageType.THUMB,
-                        path=image_url,
-                        provider=self.instance_id,
-                        remotely_accessible=image_url.startswith("http"),
-                    )
-                ]
+                parsed_item.metadata.images = UniqueList(
+                    [
+                        MediaItemImage(
+                            type=ImageType.THUMB,
+                            path=image_url,
+                            provider=self.instance_id,
+                            remotely_accessible=image_url.startswith("http"),
+                        )
+                    ]
+                )
         return parsed_item
 
-    async def get_artist(self, prov_artist_id: str) -> Track:
+    async def get_artist(self, prov_artist_id: str) -> Artist:
         """Get full artist details by id."""
         artist = prov_artist_id
         # this is here for compatibility reasons only
@@ -231,9 +238,9 @@ class BuiltinProvider(MusicProvider):
                 owner="Music Assistant",
                 is_editable=False,
                 metadata=MediaItemMetadata(
-                    images=[DEFAULT_THUMB]
+                    images=UniqueList([DEFAULT_THUMB])
                     if prov_playlist_id in COLLAGE_IMAGE_PLAYLISTS
-                    else [DEFAULT_THUMB, DEFAULT_FANART],
+                    else UniqueList([DEFAULT_THUMB, DEFAULT_FANART]),
                     cache_checksum=str(int(time.time())),
                 ),
             )
@@ -258,14 +265,16 @@ class BuiltinProvider(MusicProvider):
         )
         playlist.metadata.cache_checksum = f"{DB_SCHEMA_VERSION}.{stored_item.get('last_updated')}"
         if image_url := stored_item.get("image_url"):
-            playlist.metadata.images = [
-                MediaItemImage(
-                    type=ImageType.THUMB,
-                    path=image_url,
-                    provider=self.instance_id,
-                    remotely_accessible=image_url.startswith("http"),
-                )
-            ]
+            playlist.metadata.images = UniqueList(
+                [
+                    MediaItemImage(
+                        type=ImageType.THUMB,
+                        path=image_url,
+                        provider=self.instance_id,
+                        remotely_accessible=image_url.startswith("http"),
+                    )
+                ]
+            )
         return playlist
 
     async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType:
@@ -373,6 +382,7 @@ class BuiltinProvider(MusicProvider):
                     track = await media_controller.get_provider_item(
                         item_id, provider_instance_id_or_domain
                     )
+                assert isinstance(track, Track)
                 track.position = offset + index
                 result.append(track)
             except (MediaNotFoundError, InvalidDataError, ProviderUnavailableError) as err:
@@ -392,8 +402,9 @@ class BuiltinProvider(MusicProvider):
         # mark last_updated on playlist object
         stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
         stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
-        stored_item["last_updated"] = int(time.time())
-        self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
+        if stored_item:
+            stored_item["last_updated"] = int(time.time())
+            self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
 
     async def remove_playlist_tracks(
         self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
@@ -408,10 +419,11 @@ class BuiltinProvider(MusicProvider):
         # mark last_updated on playlist object
         stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
         stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
-        stored_item["last_updated"] = int(time.time())
-        self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
+        if stored_item:
+            stored_item["last_updated"] = int(time.time())
+            self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
 
-    async def create_playlist(self, name: str) -> Playlist:  # type: ignore[return]
+    async def create_playlist(self, name: str) -> Playlist:
         """Create a new playlist on provider with given name."""
         item_id = shortuuid.random(8)
         stored_item = StoredItem(item_id=item_id, name=name)
@@ -442,6 +454,7 @@ class BuiltinProvider(MusicProvider):
                 ),
             )
         }
+        media_item: Track | Radio
         if is_radio or force_radio:
             # treat as radio
             media_item = Radio(
@@ -459,19 +472,23 @@ class BuiltinProvider(MusicProvider):
                 provider=self.domain,
                 name=media_info.title or url,
                 duration=int(media_info.duration or 0),
-                artists=[await self.get_artist(artist) for artist in media_info.artists],
+                artists=UniqueList(
+                    [await self.get_artist(artist) for artist in media_info.artists]
+                ),
                 provider_mappings=provider_mappings,
             )
 
         if media_info.has_cover_image:
-            media_item.metadata.images = [
-                MediaItemImage(
-                    type=ImageType.THUMB,
-                    path=url,
-                    provider=self.instance_id,
-                    remotely_accessible=False,
-                )
-            ]
+            media_item.metadata.images = UniqueList(
+                [
+                    MediaItemImage(
+                        type=ImageType.THUMB,
+                        path=url,
+                        provider=self.instance_id,
+                        remotely_accessible=False,
+                    )
+                ]
+            )
         return media_item
 
     async def _get_media_info(self, url: str, force_refresh: bool = False) -> AudioTags:
@@ -507,59 +524,72 @@ class BuiltinProvider(MusicProvider):
             can_seek=not is_radio,
         )
 
-    async def _get_builtin_playlist_tracks(
-        self, builtin_playlist_id: str
-    ) -> AsyncGenerator[Track, None]:
-        """Get all playlist tracks for given builtin playlist id."""
+    async def _get_builtin_playlist_random_favorite_tracks(self) -> list[Track]:
         result: list[Track] = []
-        if builtin_playlist_id == ALL_FAVORITE_TRACKS:
-            res = await self.mass.music.tracks.library_items(
-                favorite=True, limit=250000, order_by="random"
+        res = await self.mass.music.tracks.library_items(
+            favorite=True, limit=250000, order_by="random"
+        )
+        for idx, item in enumerate(res, 1):
+            item.position = idx
+            result.append(item)
+        return result
+
+    async def _get_builtin_playlist_random_tracks(self) -> list[Track]:
+        result: list[Track] = []
+        res = await self.mass.music.tracks.library_items(limit=500, order_by="random_fast")
+        for idx, item in enumerate(res, 1):
+            item.position = idx
+            result.append(item)
+        return result
+
+    async def _get_builtin_playlist_random_album(self) -> list[Track]:
+        result: list[Track] = []
+        for random_album in await self.mass.music.albums.library_items(
+            limit=1, order_by="random_fast"
+        ):
+            tracks = await self.mass.music.albums.tracks(
+                random_album.item_id, random_album.provider
+            )
+            for idx, track in enumerate(tracks, 1):
+                track.position = idx
+                result.append(track)
+        return result
+
+    async def _get_builtin_playlist_random_artist(self) -> list[Track]:
+        result: list[Track] = []
+        for random_artist in await self.mass.music.artists.library_items(
+            limit=1, order_by="random_fast"
+        ):
+            tracks = await self.mass.music.artists.tracks(
+                random_artist.item_id, random_artist.provider
             )
-            for idx, item in enumerate(res, 1):
-                item.position = idx
-                result.append(item)
-            return result
-        if builtin_playlist_id == RANDOM_TRACKS:
-            res = await self.mass.music.tracks.library_items(limit=500, order_by="random_fast")
-            for idx, item in enumerate(res, 1):
-                item.position = idx
-                result.append(item)
-            return result
-        if builtin_playlist_id == RANDOM_ALBUM:
-            for random_album in await self.mass.music.albums.library_items(
-                limit=1, order_by="random_fast"
-            ):
-                # use the function specified in the queue controller as that
-                # already handles unwrapping an album by user preference
-                tracks = await self.mass.music.albums.tracks(
-                    random_album.item_id, random_album.provider
-                )
-                for idx, track in enumerate(tracks, 1):
-                    track.position = idx
-                    result.append(track)
-                return result
-        if builtin_playlist_id == RANDOM_ARTIST:
-            for random_artist in await self.mass.music.artists.library_items(
-                limit=1, order_by="random_fast"
-            ):
-                # use the function specified in the queue controller as that
-                # already handles unwrapping an artist by user preference
-                tracks = await self.mass.music.artists.tracks(
-                    random_artist.item_id, random_artist.provider
-                )
-                for idx, track in enumerate(tracks, 1):
-                    track.position = idx
-                    result.append(track)
-                return result
-        if builtin_playlist_id == RECENTLY_PLAYED:
-            tracks = await self.mass.music.recently_played(100, [MediaType.TRACK])
             for idx, track in enumerate(tracks, 1):
                 track.position = idx
                 result.append(track)
-            return result
         return result
 
+    async def _get_builtin_playlist_recently_played(self) -> list[Track]:
+        result: list[Track] = []
+        recent_tracks = await self.mass.music.recently_played(100, [MediaType.TRACK])
+        for idx, track in enumerate(recent_tracks, 1):
+            assert isinstance(track, Track)
+            track.position = idx
+            result.append(track)
+        return result
+
+    async def _get_builtin_playlist_tracks(self, builtin_playlist_id: str) -> list[Track]:
+        """Get all playlist tracks for given builtin playlist id."""
+        try:
+            return await {
+                ALL_FAVORITE_TRACKS: self._get_builtin_playlist_random_favorite_tracks,
+                RANDOM_TRACKS: self._get_builtin_playlist_random_tracks,
+                RANDOM_ALBUM: self._get_builtin_playlist_random_album,
+                RANDOM_ARTIST: self._get_builtin_playlist_random_artist,
+                RECENTLY_PLAYED: self._get_builtin_playlist_recently_played,
+            }[builtin_playlist_id]()
+        except KeyError:
+            raise MediaNotFoundError(f"No built in playlist: {builtin_playlist_id}")
+
     async def _read_playlist_file_items(
         self, playlist_id: str, offset: int = 0, limit: int = 100000
     ) -> list[str]:
index 63d2e50898b517e3212baa8fe3824bb5bf5a7a7e..2b11bae037be12108649328426093ca5a776c386 100644 (file)
--- a/mypy.ini
+++ b/mypy.ini
@@ -21,4 +21,4 @@ disallow_untyped_decorators = true
 disallow_untyped_defs = true
 warn_return_any = true
 warn_unreachable = true
-packages=tests,music_assistant.client,music_assistant.common,music_assistant.server.providers.jellyfin,music_assistant.server.providers.radiobrowser
+packages=tests,music_assistant.client,music_assistant.common,music_assistant.server.providers.builtin,music_assistant.server.providers.jellyfin,music_assistant.server.providers.radiobrowser