from __future__ import annotations
import asyncio
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar
from libopensonic.connection import Connection as SonicConnection
from libopensonic.errors import (
StreamType,
)
from music_assistant_models.errors import (
+ ActionUnavailable,
LoginFailed,
MediaNotFoundError,
ProviderPermissionDenied,
Track,
)
from music_assistant_models.streamdetails import StreamDetails
+from music_assistant_models.unique_list import UniqueList
from music_assistant.constants import (
CONF_PASSWORD,
EP_CHAN_SEP = "$!$"
+Param = ParamSpec("Param")
+RetType = TypeVar("RetType")
+
+
class OpenSonicProvider(MusicProvider):
"""Provider for Open Subsonic servers."""
", check your settings."
)
raise LoginFailed(msg) from e
- self._enable_podcasts = self.config.get_value(CONF_ENABLE_PODCASTS)
- self._ignore_offset = self.config.get_value(CONF_OVERRIDE_OFFSET)
+ self._enable_podcasts = bool(self.config.get_value(CONF_ENABLE_PODCASTS))
+ self._ignore_offset = bool(self.config.get_value(CONF_OVERRIDE_OFFSET))
try:
ret = await self._run_async(self._conn.getOpenSubsonicExtensions)
extensions = ret["openSubsonicExtensions"]
},
)
+ artist.metadata.images = UniqueList()
if sonic_artist.cover_id:
- artist.metadata.images = [
+ artist.metadata.images.append(
MediaItemImage(
type=ImageType.THUMB,
path=sonic_artist.cover_id,
provider=self.instance_id,
remotely_accessible=False,
)
- ]
- else:
- artist.metadata.images = []
+ )
if sonic_info:
if sonic_info.biography:
year=sonic_album.year,
)
+ album.metadata.images = UniqueList()
if sonic_album.cover_id:
- album.metadata.images = [
+ album.metadata.images.append(
MediaItemImage(
type=ImageType.THUMB,
path=sonic_album.cover_id,
provider=self.instance_id,
remotely_accessible=False,
),
- ]
- else:
- album.metadata.images = []
+ )
if sonic_album.artist_id:
album.artists.append(
return album
- def _parse_track(self, sonic_song: SonicSong, album: Album = None) -> Track:
+ def _parse_track(
+ self, sonic_song: SonicSong, album: Album | ItemMapping | None = None
+ ) -> Track:
# Unfortunately, the Song response type is not defined in the open subsonic spec so we have
# implementations which disagree about where the album id for this song should be stored.
# We accept either song.ablum_id or song.parent but prefer album_id.
)
},
)
+
if sonic_playlist.cover_id:
- playlist.metadata.images = [
+ playlist.metadata.images = UniqueList()
+ playlist.metadata.images.append(
MediaItemImage(
type=ImageType.THUMB,
path=sonic_playlist.cover_id,
provider=self.instance_id,
remotely_accessible=False,
)
- ]
+ )
return playlist
def _parse_podcast(self, sonic_podcast: SonicPodcast) -> Podcast:
)
podcast.metadata.description = sonic_podcast.description
- podcast.metadata.images = []
+ podcast.metadata.images = UniqueList()
if sonic_podcast.cover_id:
podcast.metadata.images.append(
msg = f"Can't find episode {ep_id} in podcast {chan_id}"
raise MediaNotFoundError(msg)
- async def _run_async(self, call: Callable, *args, **kwargs):
- return await self.mass.create_task(call, *args, **kwargs)
+ async def _run_async(
+ self, call: Callable[Param, RetType], *args: Param.args, **kwargs: Param.kwargs
+ ) -> RetType:
+ return await asyncio.to_thread(call, *args, **kwargs)
- async def resolve_image(self, path: str) -> bytes:
+ async def resolve_image(self, path: str) -> bytes | Any:
"""Return the image."""
- def _get_cover_art() -> bytes:
+ def _get_cover_art() -> bytes | Any:
with self._conn.getCoverArt(path) as art:
return art.content
songCount=count,
)
while results["songs"]:
- album = None
+ album: SonicAlbum | None = None
for entry in results["songs"]:
if album is None or album.item_id != entry.parent:
album = await self._run_async(self.get_album, prov_album_id=entry.parent)
except (ParameterError, DataNotFoundError) as e:
msg = f"Item {prov_track_id} not found"
raise MediaNotFoundError(msg) from e
- album = await self._run_async(self.get_album, prov_album_id=sonic_song.parent)
+ album: SonicAlbum = await self._run_async(self.get_album, prov_album_id=sonic_song.parent)
return self._parse_track(sonic_song, album=album)
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
albums.append(self._parse_album(entry))
return albums
- async def get_playlist(self, prov_playlist_id) -> Playlist:
+ async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Return the specified Playlist."""
try:
sonic_playlist: SonicPlaylist = await self._run_async(
async def get_podcast(self, prov_podcast_id: str) -> Podcast:
"""Get full Podcast details by id."""
if not self._enable_podcasts:
- return None
+ msg = "Podcasts are currently disabled in the provider configuration"
+ raise ActionUnavailable(msg)
channels = await self._run_async(
self._conn.getPodcasts, incEpisodes=True, pid=prov_podcast_id
msg = f"Playlist {prov_playlist_id} not found"
raise MediaNotFoundError(msg) from e
- album = None
+ album: SonicAlbum | None = None
for index, sonic_song in enumerate(sonic_playlist.songs, 1):
if not album or album.item_id != sonic_song.parent:
album = await self._run_async(self.get_album, prov_album_id=sonic_song.parent)
self, item_id: str, media_type: MediaType = MediaType.TRACK
) -> StreamDetails:
"""Get the details needed to process a specified track."""
+ item: SonicSong | SonicEpisode
if media_type == MediaType.TRACK:
try:
- item: SonicSong = await self._run_async(self._conn.getSong, item_id)
+ item = await self._run_async(self._conn.getSong, item_id)
except (ParameterError, DataNotFoundError) as e:
msg = f"Item {item_id} not found"
raise MediaNotFoundError(msg) from e
self.mass.create_task(self._report_playback_started(item_id))
elif media_type == MediaType.PODCAST_EPISODE:
- item: SonicEpisode = await self._get_podcast_episode(item_id)
+ item = await self._get_podcast_episode(item_id)
self.logger.debug(
"Fetching stream details for podcast episode '%s' with format '%s'",
) -> None:
"""Handle callback when an item completed streaming."""
self.logger.debug("on_streamed called for %s", streamdetails.item_id)
- if seconds_streamed >= streamdetails.duration / 2:
+ if streamdetails.duration and seconds_streamed >= streamdetails.duration / 2:
self.logger.debug("scrobble for listen count called for %s", streamdetails.item_id)
await self._run_async(self._conn.scrobble, sid=streamdetails.item_id, submission=True)
self, streamdetails: StreamDetails, seek_position: int = 0
) -> AsyncGenerator[bytes, None]:
"""Provide a generator for the stream data."""
- audio_buffer = asyncio.Queue(1)
+ audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(1)
# ignore seek position if the server does not support it
# in that case we let the core handle seeking
if not self._seek_support: