Fix: Chore: Subonsic: Fix strict typing errors (#2200)
authorEric Munson <eric@munsonfam.org>
Wed, 21 May 2025 16:08:10 +0000 (12:08 -0400)
committerGitHub <noreply@github.com>
Wed, 21 May 2025 16:08:10 +0000 (18:08 +0200)
Turning on stricter type checking revealed several bugs in the provider.
This commit addreseses the issues brought up by the type checker.

Fixes: https://github.com/music-assistant/support/issues/3997
Signed-off-by: Eric B Munson <eric@munsonfam.org>
music_assistant/providers/opensubsonic/parsers.py
music_assistant/providers/opensubsonic/sonic_provider.py

index ddf0e33222ebf24b24d0ee7147562886020e6514..1f7d260991121c99f3156186ef0f07d22c25dfcb 100644 (file)
@@ -28,7 +28,7 @@ if TYPE_CHECKING:
     from libopensonic.media import AlbumID3 as SonicAlbum
     from libopensonic.media import AlbumInfo as SonicAlbumInfo
     from libopensonic.media import ArtistID3 as SonicArtist
-    from libopensonic.media import ArtistInfo as SonicArtistInfo
+    from libopensonic.media import ArtistInfo2 as SonicArtistInfo
     from libopensonic.media import Child as SonicSong
     from libopensonic.media import Playlist as SonicPlaylist
     from libopensonic.media import PodcastChannel as SonicPodcast
@@ -113,7 +113,7 @@ def parse_track(
         provider=instance_id,
         name=sonic_song.title,
         album=album,
-        duration=sonic_song.duration if sonic_song.duration is not None else 0,
+        duration=sonic_song.duration or 0,
         disc_number=sonic_song.disc_number or 0,
         favorite=bool(sonic_song.starred),
         metadata=metadata,
@@ -124,7 +124,7 @@ def parse_track(
                 provider_instance=instance_id,
                 available=True,
                 audio_format=AudioFormat(
-                    content_type=ContentType.try_parse(sonic_song.content_type),
+                    content_type=ContentType.try_parse(sonic_song.content_type or "?"),
                     sample_rate=sonic_song.sampling_rate if sonic_song.sampling_rate else 44100,
                     bit_depth=sonic_song.bit_depth if sonic_song.bit_depth else 16,
                     channels=sonic_song.channel_count if sonic_song.channel_count else 2,
@@ -207,7 +207,7 @@ def parse_track(
 
 
 def parse_artist(
-    instance_id: str, sonic_artist: SonicArtist, sonic_info: SonicArtistInfo = None
+    instance_id: str, sonic_artist: SonicArtist, sonic_info: SonicArtistInfo | None = None
 ) -> Artist:
     """Parse artist and artistInfo into a Music Assistant Artist."""
     metadata: MediaItemMetadata = MediaItemMetadata()
index 840ab5c0bff3296e813896426f433bc790e05028..54de11b688fe255db5cd5cc0b5c0952d113917e4 100644 (file)
@@ -59,8 +59,8 @@ from .parsers import (
 if TYPE_CHECKING:
     from collections.abc import AsyncGenerator, Callable
 
-    from libopensonic.media import Album as SonicAlbum
-    from libopensonic.media import Artist as SonicArtist
+    from libopensonic.media import AlbumID3 as SonicAlbum
+    from libopensonic.media import ArtistID3 as SonicArtist
     from libopensonic.media import Bookmark as SonicBookmark
     from libopensonic.media import Child as SonicSong
     from libopensonic.media import OpenSubsonicExtension
@@ -81,7 +81,7 @@ RetType = TypeVar("RetType")
 class OpenSonicProvider(MusicProvider):
     """Provider for Open Subsonic servers."""
 
-    conn: SonicConnection = None
+    conn: SonicConnection
     _enable_podcasts: bool = True
     _seek_support: bool = False
     _ignore_offset: bool = False
@@ -210,20 +210,25 @@ class OpenSonicProvider(MusicProvider):
             album_offset=0,
             song_count=songs,
             song_offset=0,
-            music_folder_id=None,
-        )
-        return SearchResults(
-            artists=[parse_artist(self.instance_id, entry) for entry in answer.artist]
-            if answer.artist
-            else [],
-            albums=[parse_album(self.logger, self.instance_id, entry) for entry in answer.album]
-            if answer.album
-            else [],
-            tracks=[parse_track(self.logger, self.instance_id, entry) for entry in answer.song]
-            if answer.song
-            else [],
         )
 
+        if answer.artist:
+            ar = [parse_artist(self.instance_id, entry) for entry in answer.artist]
+        else:
+            ar = []
+
+        if answer.album:
+            al = [parse_album(self.logger, self.instance_id, entry) for entry in answer.album]
+        else:
+            al = []
+
+        if answer.song:
+            tr = [parse_track(self.logger, self.instance_id, entry) for entry in answer.song]
+        else:
+            tr = []
+
+        return SearchResults(artists=ar, albums=al, tracks=tr)
+
     async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
         """Provide a generator for reading all artists."""
         artists = await self._run_async(self.conn.get_artists)
@@ -524,8 +529,15 @@ class OpenSonicProvider(MusicProvider):
 
     async def create_playlist(self, name: str) -> Playlist:
         """Create a new empty playlist on the server."""
-        playlist: SonicPlaylist = await self._run_async(self.conn.create_playlist, name=name)
-        return parse_playlist(self.instance_id, playlist)
+        if not await self._run_async(self.conn.create_playlist, name=name):
+            raise ProviderPermissionDenied(
+                "Please ensure you have permission to create playlists on your server"
+            )
+        pls: list[SonicPlaylist] = await self._run_async(self.conn.get_playlists)
+        for pl in pls:
+            if pl.name == name:
+                return parse_playlist(self.instance_id, pl)
+        raise MediaNotFoundError(f"Failed to create podcast with name '{name}'")
 
     async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
         """Append the listed tracks to the selected playlist.
@@ -593,7 +605,7 @@ class OpenSonicProvider(MusicProvider):
             raise UnsupportedFeaturedException(msg)
 
         # For mp4 or m4a files, better to let ffmpeg detect the codec in use so mark them unknown
-        if mime_type.endswith("mp4"):
+        if mime_type and mime_type.endswith("mp4"):
             self.logger.warning(
                 "Due to the streaming method used by the subsonic API, M4A files "
                 "may fail. See provider documentation for more information."
@@ -654,7 +666,7 @@ class OpenSonicProvider(MusicProvider):
         if fully_played:
             # We completed the episode and should delete our bookmark
             try:
-                await self._run_async(self.conn.delete_bookmark, id=ep_id)
+                await self._run_async(self.conn.delete_bookmark, mid=ep_id)
             except DataNotFoundError:
                 # We probably raced with something else deleting this bookmark, not really a problem
                 return
@@ -663,7 +675,7 @@ class OpenSonicProvider(MusicProvider):
         # MA provides a position in seconds but expects it back in milliseconds, while
         # the Open Subsonic spec expects a position in milliseconds but returns it in
         # seconds, go figure.
-        await self._run_async(self.conn.create_bookmark, id=ep_id, position=(position * 1000))
+        await self._run_async(self.conn.create_bookmark, mid=ep_id, position=position * 1000)
 
     async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
         """