--- /dev/null
- msg = (
- f"Failed to connect to {self.config.get_value(CONF_BASE_URL)}, "
- "check your settings."
- )
- raise LoginFailed(msg)
+"""The provider class for Open Subsonic."""
+
+from __future__ import annotations
+
+import asyncio
+from typing import TYPE_CHECKING
+
+from libopensonic.connection import Connection as SonicConnection
+from libopensonic.errors import (
+ AuthError,
+ CredentialError,
+ DataNotFoundError,
+ ParameterError,
+ SonicError,
+)
+from music_assistant_models.enums import (
+ ContentType,
+ ImageType,
+ MediaType,
+ ProviderFeature,
+ StreamType,
+)
+from music_assistant_models.errors import LoginFailed, MediaNotFoundError, ProviderPermissionDenied
+from music_assistant_models.media_items import (
+ Album,
+ AlbumType,
+ Artist,
+ AudioFormat,
+ ItemMapping,
+ MediaItemImage,
+ Playlist,
+ ProviderMapping,
+ SearchResults,
+ Track,
+)
+from music_assistant_models.streamdetails import StreamDetails
+
+from music_assistant.constants import (
+ CONF_PASSWORD,
+ CONF_PATH,
+ CONF_PORT,
+ CONF_USERNAME,
+ UNKNOWN_ARTIST,
+)
+from music_assistant.models.music_provider import MusicProvider
+
+if TYPE_CHECKING:
+ from collections.abc import AsyncGenerator, Callable
+
+ from libopensonic.media import Album as SonicAlbum
+ from libopensonic.media import AlbumInfo as SonicAlbumInfo
+ from libopensonic.media import Artist as SonicArtist
+ from libopensonic.media import ArtistInfo as SonicArtistInfo
+ from libopensonic.media import Playlist as SonicPlaylist
+ from libopensonic.media import PodcastChannel as SonicPodcastChannel
+ from libopensonic.media import PodcastEpisode as SonicPodcastEpisode
+ from libopensonic.media import Song as SonicSong
+
+CONF_BASE_URL = "baseURL"
+CONF_ENABLE_PODCASTS = "enable_podcasts"
+CONF_ENABLE_LEGACY_AUTH = "enable_legacy_auth"
+
+UNKNOWN_ARTIST_ID = "fake_artist_unknown"
+
+# We need the following prefix because of the way that Navidrome reports artists for individual
+# tracks on Various Artists albums, see the note in the _parse_track() method and the handling
+# in get_artist()
+NAVI_VARIOUS_PREFIX = "MA-NAVIDROME-"
+
+
+class OpenSonicProvider(MusicProvider):
+ """Provider for Open Subsonic servers."""
+
+ _conn: SonicConnection = None
+ _enable_podcasts: bool = True
+ _seek_support: bool = False
+
+ async def handle_async_init(self) -> None:
+ """Set up the music provider and test the connection."""
+ port = self.config.get_value(CONF_PORT)
+ if port is None:
+ port = 443
+ path = self.config.get_value(CONF_PATH)
+ if path is None:
+ path = ""
+ self._conn = SonicConnection(
+ self.config.get_value(CONF_BASE_URL),
+ username=self.config.get_value(CONF_USERNAME),
+ password=self.config.get_value(CONF_PASSWORD),
+ legacyAuth=self.config.get_value(CONF_ENABLE_LEGACY_AUTH),
+ port=port,
+ serverPath=path,
+ appName="Music Assistant",
+ )
+ try:
+ success = await self._run_async(self._conn.ping)
+ if not success:
- f"Failed to connect to {self.config.get_value(CONF_BASE_URL)}, check your settings."
++ raise CredentialError
+ except (AuthError, CredentialError) as e:
+ msg = (
- f"Unable to find an artist ID for album '{sonic_album.name}' with "
- f"ID '{sonic_album.id}'."
++ "Failed to connect to "
++ f"{self.config.get_value(CONF_BASE_URL)}"
++ ", check your settings."
+ )
+ raise LoginFailed(msg) from e
+ self._enable_podcasts = self.config.get_value(CONF_ENABLE_PODCASTS)
+ try:
+ ret = await self._run_async(self._conn.getOpenSubsonicExtensions)
+ extensions = ret["openSubsonicExtensions"]
+ for entry in extensions:
+ if entry["name"] == "transcodeOffset":
+ self._seek_support = True
+ break
+ except OSError:
+ self.logger.info("Server does not support transcodeOffset, seeking in player provider")
+
+ @property
+ def supported_features(self) -> tuple[ProviderFeature, ...]:
+ """Return a list of supported features."""
+ return (
+ ProviderFeature.LIBRARY_ARTISTS,
+ ProviderFeature.LIBRARY_ALBUMS,
+ ProviderFeature.LIBRARY_TRACKS,
+ ProviderFeature.LIBRARY_PLAYLISTS,
+ ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
+ ProviderFeature.BROWSE,
+ ProviderFeature.SEARCH,
+ ProviderFeature.ARTIST_ALBUMS,
+ ProviderFeature.ARTIST_TOPTRACKS,
+ ProviderFeature.SIMILAR_TRACKS,
+ ProviderFeature.PLAYLIST_TRACKS_EDIT,
+ ProviderFeature.PLAYLIST_CREATE,
+ )
+
+ @property
+ def is_streaming_provider(self) -> bool:
+ """
+ Return True if the provider is a streaming provider.
+
+ This literally means that the catalog is not the same as the library contents.
+ For local based providers (files, plex), the catalog is the same as the library content.
+ It also means that data is if this provider is NOT a streaming provider,
+ data cross instances is unique, the catalog and library differs per instance.
+
+ Setting this to True will only query one instance of the provider for search and lookups.
+ Setting this to False will query all instances of this provider for search and lookups.
+ """
+ return False
+
+ def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
+ return ItemMapping(
+ media_type=media_type,
+ item_id=key,
+ provider=self.instance_id,
+ name=name,
+ )
+
+ def _parse_podcast_artist(self, sonic_channel: SonicPodcastChannel) -> Artist:
+ artist = Artist(
+ item_id=sonic_channel.id,
+ name=sonic_channel.title,
+ provider=self.instance_id,
+ favorite=bool(sonic_channel.starred),
+ provider_mappings={
+ ProviderMapping(
+ item_id=sonic_channel.id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+ if sonic_channel.description is not None:
+ artist.metadata.description = sonic_channel.description
+ if sonic_channel.original_image_url:
+ artist.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=sonic_channel.original_image_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
+ return artist
+
+ def _parse_podcast_album(self, sonic_channel: SonicPodcastChannel) -> Album:
+ return Album(
+ item_id=sonic_channel.id,
+ provider=self.instance_id,
+ name=sonic_channel.title,
+ provider_mappings={
+ ProviderMapping(
+ item_id=sonic_channel.id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ available=True,
+ )
+ },
+ album_type=AlbumType.PODCAST,
+ )
+
+ def _parse_podcast_episode(
+ self, sonic_episode: SonicPodcastEpisode, sonic_channel: SonicPodcastChannel
+ ) -> Track:
+ return Track(
+ item_id=sonic_episode.id,
+ provider=self.instance_id,
+ name=sonic_episode.title,
+ album=self._parse_podcast_album(sonic_channel=sonic_channel),
+ artists=[self._parse_podcast_artist(sonic_channel=sonic_channel)],
+ duration=sonic_episode.duration if sonic_episode.duration is not None else 0,
+ favorite=bool(sonic_episode.starred),
+ provider_mappings={
+ ProviderMapping(
+ item_id=sonic_episode.id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ available=True,
+ )
+ },
+ )
+
+ async def _get_podcast_artists(self) -> list[Artist]:
+ if not self._enable_podcasts:
+ return []
+
+ sonic_channels = await self._run_async(self._conn.getPodcasts, incEpisodes=False)
+ artists = []
+ for channel in sonic_channels:
+ artists.append(self._parse_podcast_artist(channel))
+ return artists
+
+ async def _get_podcasts(self) -> list[SonicPodcastChannel]:
+ if not self._enable_podcasts:
+ return []
+ return await self._run_async(self._conn.getPodcasts, incEpisodes=True)
+
+ def _parse_artist(
+ self, sonic_artist: SonicArtist, sonic_info: SonicArtistInfo = None
+ ) -> Artist:
+ artist = Artist(
+ item_id=sonic_artist.id,
+ name=sonic_artist.name,
+ provider=self.domain,
+ favorite=bool(sonic_artist.starred),
+ provider_mappings={
+ ProviderMapping(
+ item_id=sonic_artist.id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+
+ if sonic_artist.cover_id:
+ artist.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=sonic_artist.cover_id,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ else:
+ artist.metadata.images = []
+
+ if sonic_info:
+ if sonic_info.biography:
+ artist.metadata.description = sonic_info.biography
+ if sonic_info.small_url:
+ artist.metadata.images.append(
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=sonic_info.small_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ )
+ return artist
+
+ def _parse_album(self, sonic_album: SonicAlbum, sonic_info: SonicAlbumInfo = None) -> Album:
+ album_id = sonic_album.id
+ album = Album(
+ item_id=album_id,
+ provider=self.domain,
+ name=sonic_album.name,
+ favorite=bool(sonic_album.starred),
+ provider_mappings={
+ ProviderMapping(
+ item_id=album_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ year=sonic_album.year,
+ )
+
+ if sonic_album.cover_id:
+ album.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=sonic_album.cover_id,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ ),
+ ]
+ else:
+ album.metadata.images = []
+
+ if sonic_album.artist_id:
+ album.artists.append(
+ self._get_item_mapping(
+ MediaType.ARTIST,
+ sonic_album.artist_id,
+ sonic_album.artist if sonic_album.artist else UNKNOWN_ARTIST,
+ )
+ )
+ else:
+ self.logger.info(
- disc_number=0,
++ "Unable to find an artist ID for album '%s' with ID '%s'.",
++ sonic_album.name,
++ sonic_album.id,
+ )
+ album.artists.append(
+ Artist(
+ item_id=UNKNOWN_ARTIST_ID,
+ name=UNKNOWN_ARTIST,
+ provider=self.instance_id,
+ provider_mappings={
+ ProviderMapping(
+ item_id=UNKNOWN_ARTIST_ID,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+ )
+
+ if sonic_info:
+ if sonic_info.small_url:
+ album.metadata.images.append(
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=sonic_info.small_url,
+ remotely_accessible=False,
+ provider=self.instance_id,
+ )
+ )
+ if sonic_info.notes:
+ album.metadata.description = sonic_info.notes
+
+ return album
+
+ def _parse_track(self, sonic_song: SonicSong) -> Track:
+ mapping = None
+ if sonic_song.album_id is not None and sonic_song.album is not None:
+ mapping = self._get_item_mapping(MediaType.ALBUM, sonic_song.album_id, sonic_song.album)
+
+ track = Track(
+ item_id=sonic_song.id,
+ provider=self.instance_id,
+ name=sonic_song.title,
+ album=mapping,
+ duration=sonic_song.duration if sonic_song.duration is not None else 0,
+ # We are setting disc number to 0 because the standard for what is part of
+ # a Open Subsonic Song is not yet set and the implementations I have checked
+ # do not contain this field. We should revisit this when the spec is finished
- item_id=f"{NAVI_VARIOUS_PREFIX}{sonic_song.artist}",
++ disc_number=sonic_song.disc_number or 0,
+ favorite=bool(sonic_song.starred),
+ provider_mappings={
+ ProviderMapping(
+ item_id=sonic_song.id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ available=True,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(sonic_song.content_type)
+ ),
+ )
+ },
+ track_number=getattr(sonic_song, "track", 0),
+ )
+
+ # We need to find an artist for this track but various implementations seem to disagree
+ # about where the artist with the valid ID needs to be found. We will add any artist with
+ # an ID and only use UNKNOWN if none are found.
+
+ if sonic_song.artist_id:
+ track.artists.append(
+ self._get_item_mapping(
+ MediaType.ARTIST,
+ sonic_song.artist_id,
+ sonic_song.artist if sonic_song.artist else UNKNOWN_ARTIST,
+ )
+ )
+
+ for entry in sonic_song.artists:
+ if entry.id == sonic_song.artist_id:
+ continue
+ if entry.id is not None and entry.name is not None:
+ track.artists.append(self._get_item_mapping(MediaType.ARTIST, entry.id, entry.name))
+
+ if not track.artists:
+ if sonic_song.artist and not sonic_song.artist_id:
+ # This is how Navidrome handles tracks from albums which are marked
+ # 'Various Artists'. Unfortunately, we cannot lookup this artist independently
+ # because it will not have an entry in the artists table so the best we can do it
+ # add a 'fake' id with the proper artist name and have get_artist() check for this
+ # id and handle it locally.
++ fake_id = f"{NAVI_VARIOUS_PREFIX}{sonic_song.artist}"
+ artist = Artist(
- item_id=UNKNOWN_ARTIST_ID,
++ item_id=fake_id,
+ provider=self.domain,
+ name=sonic_song.artist,
+ provider_mappings={
+ ProviderMapping(
- f"Unable to find artist ID for track '{sonic_song.title}' with "
- f"ID '{sonic_song.id}'."
++ item_id=fake_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+ else:
+ self.logger.info(
++ "Unable to find artist ID for track '%s' with ID '%s'.",
++ sonic_song.title,
++ sonic_song.id,
+ )
+ artist = Artist(
+ item_id=UNKNOWN_ARTIST_ID,
+ name=UNKNOWN_ARTIST,
+ provider=self.instance_id,
+ provider_mappings={
+ ProviderMapping(
+ item_id=UNKNOWN_ARTIST_ID,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+
+ track.artists.append(artist)
+ return track
+
+ def _parse_playlist(self, sonic_playlist: SonicPlaylist) -> Playlist:
+ playlist = Playlist(
+ item_id=sonic_playlist.id,
+ provider=self.domain,
+ name=sonic_playlist.name,
+ is_editable=True,
+ favorite=bool(sonic_playlist.starred),
+ provider_mappings={
+ ProviderMapping(
+ item_id=sonic_playlist.id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+ if sonic_playlist.cover_id:
+ playlist.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=sonic_playlist.cover_id,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ return playlist
+
+ async def _run_async(self, call: Callable, *args, **kwargs):
+ return await self.mass.create_task(call, *args, **kwargs)
+
+ async def resolve_image(self, path: str) -> bytes:
+ """Return the image."""
+
+ def _get_cover_art() -> bytes:
+ with self._conn.getCoverArt(path) as art:
+ return art.content
+
+ return await asyncio.to_thread(_get_cover_art)
+
+ async def search(
+ self, search_query: str, media_types: list[MediaType], limit: int = 20
+ ) -> SearchResults:
+ """Search the sonic library."""
+ artists = limit if MediaType.ARTIST in media_types else 0
+ albums = limit if MediaType.ALBUM in media_types else 0
+ songs = limit if MediaType.TRACK in media_types else 0
+ if not (artists or albums or songs):
+ return SearchResults()
+ answer = await self._run_async(
+ self._conn.search3,
+ query=search_query,
+ artistCount=artists,
+ artistOffset=0,
+ albumCount=albums,
+ albumOffset=0,
+ songCount=songs,
+ songOffset=0,
+ musicFolderId=None,
+ )
+ return SearchResults(
+ artists=[self._parse_artist(entry) for entry in answer["artists"]],
+ albums=[self._parse_album(entry) for entry in answer["albums"]],
+ tracks=[self._parse_track(entry) for entry in answer["songs"]],
+ )
+
+ async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
+ """Provide a generator for reading all artists."""
+ indices = await self._run_async(self._conn.getArtists)
+ for index in indices:
+ for artist in index.artists:
+ yield self._parse_artist(artist)
+
+ async def get_library_albums(self) -> AsyncGenerator[Album, None]:
+ """
+ Provide a generator for reading all artists.
+
+ Note the pagination, the open subsonic docs say that this method is limited to
+ returning 500 items per invocation.
+ """
+ offset = 0
+ size = 500
+ albums = await self._run_async(
+ self._conn.getAlbumList2, ltype="alphabeticalByArtist", size=size, offset=offset
+ )
+ while albums:
+ for album in albums:
+ yield self._parse_album(album)
+ offset += size
+ albums = await self._run_async(
+ self._conn.getAlbumList2, ltype="alphabeticalByArtist", size=size, offset=offset
+ )
+
+ async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
+ """Provide a generator for library playlists."""
+ results = await self._run_async(self._conn.getPlaylists)
+ for entry in results:
+ yield self._parse_playlist(entry)
+
+ async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
+ """
+ Provide a generator for library tracks.
+
+ Note the lack of item count on this method.
+ """
+ query = ""
+ offset = 0
+ count = 500
+ try:
+ results = await self._run_async(
+ self._conn.search3,
+ query=query,
+ artistCount=0,
+ albumCount=0,
+ songOffset=offset,
+ songCount=count,
+ )
+ except ParameterError:
+ # Older Navidrome does not accept an empty string and requires the empty quotes
+ query = '""'
+ results = await self._run_async(
+ self._conn.search3,
+ query=query,
+ artistCount=0,
+ albumCount=0,
+ songOffset=offset,
+ songCount=count,
+ )
+ while results["songs"]:
+ for entry in results["songs"]:
+ yield self._parse_track(entry)
+ offset += count
+ results = await self._run_async(
+ self._conn.search3,
+ query=query,
+ artistCount=0,
+ albumCount=0,
+ songOffset=offset,
+ songCount=count,
+ )
+
+ async def get_album(self, prov_album_id: str) -> Album:
+ """Return the requested Album."""
+ try:
+ sonic_album: SonicAlbum = await self._run_async(self._conn.getAlbum, prov_album_id)
+ sonic_info = await self._run_async(self._conn.getAlbumInfo2, aid=prov_album_id)
+ except (ParameterError, DataNotFoundError) as e:
+ if self._enable_podcasts:
+ # This might actually be a 'faked' album from podcasts, try that before giving up
+ try:
+ sonic_channel = await self._run_async(
+ self._conn.getPodcasts, incEpisodes=False, pid=prov_album_id
+ )
+ return self._parse_podcast_album(sonic_channel=sonic_channel)
+ except SonicError:
+ pass
+ msg = f"Album {prov_album_id} not found"
+ raise MediaNotFoundError(msg) from e
+
+ return self._parse_album(sonic_album, sonic_info)
+
+ async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
+ """Return a list of tracks on the specified Album."""
+ try:
+ sonic_album: SonicAlbum = await self._run_async(self._conn.getAlbum, prov_album_id)
+ except (ParameterError, DataNotFoundError) as e:
+ msg = f"Album {prov_album_id} not found"
+ raise MediaNotFoundError(msg) from e
+ tracks = []
+ for sonic_song in sonic_album.songs:
+ tracks.append(self._parse_track(sonic_song))
+ return tracks
+
+ async def get_artist(self, prov_artist_id: str) -> Artist:
+ """Return the requested Artist."""
+ if prov_artist_id == UNKNOWN_ARTIST_ID:
+ return Artist(
+ item_id=UNKNOWN_ARTIST_ID,
+ name=UNKNOWN_ARTIST,
+ provider=self.instance_id,
+ provider_mappings={
+ ProviderMapping(
+ item_id=UNKNOWN_ARTIST_ID,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+ elif prov_artist_id.startswith(NAVI_VARIOUS_PREFIX):
+ # Special case for handling track artists on various artists album for Navidrome.
+ return Artist(
+ item_id=prov_artist_id,
+ name=prov_artist_id.removeprefix(NAVI_VARIOUS_PREFIX),
+ provider=self.instance_id,
+ provider_mappings={
+ ProviderMapping(
+ item_id=prov_artist_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+
+ try:
+ sonic_artist: SonicArtist = await self._run_async(
+ self._conn.getArtist, artist_id=prov_artist_id
+ )
+ sonic_info = await self._run_async(self._conn.getArtistInfo2, aid=prov_artist_id)
+ except (ParameterError, DataNotFoundError) as e:
+ if self._enable_podcasts:
+ # This might actually be a 'faked' artist from podcasts, try that before giving up
+ try:
+ sonic_channel = await self._run_async(
+ self._conn.getPodcasts, incEpisodes=False, pid=prov_artist_id
+ )
+ return self._parse_podcast_artist(sonic_channel=sonic_channel[0])
+ except SonicError:
+ pass
+ msg = f"Artist {prov_artist_id} not found"
+ raise MediaNotFoundError(msg) from e
+ return self._parse_artist(sonic_artist, sonic_info)
+
+ async def get_track(self, prov_track_id: str) -> Track:
+ """Return the specified track."""
+ try:
+ sonic_song: SonicSong = await self._run_async(self._conn.getSong, prov_track_id)
+ except (ParameterError, DataNotFoundError) as e:
+ msg = f"Item {prov_track_id} not found"
+ raise MediaNotFoundError(msg) from e
+ return self._parse_track(sonic_song)
+
+ async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
+ """Return a list of all Albums by specified Artist."""
+ if prov_artist_id == UNKNOWN_ARTIST_ID or prov_artist_id.startswith(NAVI_VARIOUS_PREFIX):
+ return []
+
+ try:
+ sonic_artist: SonicArtist = await self._run_async(self._conn.getArtist, prov_artist_id)
+ except (ParameterError, DataNotFoundError) as e:
+ msg = f"Album {prov_artist_id} not found"
+ raise MediaNotFoundError(msg) from e
+ albums = []
+ for entry in sonic_artist.albums:
+ albums.append(self._parse_album(entry))
+ return albums
+
+ async def get_playlist(self, prov_playlist_id) -> Playlist:
+ """Return the specified Playlist."""
+ try:
+ sonic_playlist: SonicPlaylist = await self._run_async(
+ self._conn.getPlaylist, prov_playlist_id
+ )
+ except (ParameterError, DataNotFoundError) as e:
+ msg = f"Playlist {prov_playlist_id} not found"
+ raise MediaNotFoundError(msg) from e
+ return self._parse_playlist(sonic_playlist)
+
+ async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
+ """Get playlist tracks."""
+ result: list[Track] = []
+ if page > 0:
+ # paging not supported, we always return the whole list at once
+ return result
+ try:
+ sonic_playlist: SonicPlaylist = await self._run_async(
+ self._conn.getPlaylist, prov_playlist_id
+ )
+ except (ParameterError, DataNotFoundError) as e:
+ msg = f"Playlist {prov_playlist_id} not found"
+ raise MediaNotFoundError(msg) from e
+
+ # TODO: figure out if subsonic supports paging here
+ for index, sonic_song in enumerate(sonic_playlist.songs, 1):
+ track = self._parse_track(sonic_song)
+ track.position = index
+ result.append(track)
+ return result
+
+ async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
+ """Get the top listed tracks for a specified artist."""
+ # We have seen top tracks requested for the UNKNOWN_ARTIST ID, protect against that
+ if prov_artist_id == UNKNOWN_ARTIST_ID or prov_artist_id.startswith(NAVI_VARIOUS_PREFIX):
+ return []
+
+ try:
+ sonic_artist: SonicArtist = await self._run_async(self._conn.getArtist, prov_artist_id)
+ except DataNotFoundError as e:
+ msg = f"Artist {prov_artist_id} not found"
+ raise MediaNotFoundError(msg) from e
+ songs: list[SonicSong] = await self._run_async(self._conn.getTopSongs, sonic_artist.name)
+ return [self._parse_track(entry) for entry in songs]
+
+ async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
+ """Get tracks similar to selected track."""
+ songs: list[SonicSong] = await self._run_async(
+ self._conn.getSimilarSongs2, iid=prov_track_id, count=limit
+ )
+ return [self._parse_track(entry) for entry in songs]
+
+ async def create_playlist(self, name: str) -> Playlist:
+ """Create a new empty playlist on the server."""
+ playlist: SonicPlaylist = await self._run_async(self._conn.createPlaylist, name=name)
+ return self._parse_playlist(playlist)
+
+ async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
+ """Append the listed tracks to the selected playlist.
+
+ Note that the configured user must own the playlist to edit this way.
+ """
+ try:
+ await self._run_async(
+ self._conn.updatePlaylist, lid=prov_playlist_id, songIdsToAdd=prov_track_ids
+ )
+ except SonicError:
+ msg = f"Failed to add songs to {prov_playlist_id}, check your permissions."
+ raise ProviderPermissionDenied(msg)
+
+ async def remove_playlist_tracks(
+ self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
+ ) -> None:
+ """Remove selected positions from the playlist."""
+ idx_to_remove = [pos - 1 for pos in positions_to_remove]
+ try:
+ await self._run_async(
+ self._conn.updatePlaylist,
+ lid=prov_playlist_id,
+ songIndexesToRemove=idx_to_remove,
+ )
+ except SonicError:
+ msg = f"Failed to remove songs from {prov_playlist_id}, check your permissions."
+ raise ProviderPermissionDenied(msg)
+
+ async def get_stream_details(self, item_id: str) -> StreamDetails:
+ """Get the details needed to process a specified track."""
+ try:
+ sonic_song: SonicSong = await self._run_async(self._conn.getSong, item_id)
+ except (ParameterError, DataNotFoundError) as e:
+ msg = f"Item {item_id} not found"
+ raise MediaNotFoundError(msg) from e
+
+ self.mass.create_task(self._report_playback_started(item_id))
+
+ mime_type = sonic_song.content_type
+ if mime_type.endswith("mpeg"):
+ mime_type = sonic_song.suffix
+
+ self.logger.debug(
+ "Fetching stream details for id %s '%s' with format '%s'",
+ sonic_song.id,
+ sonic_song.title,
+ mime_type,
+ )
+
+ return StreamDetails(
+ item_id=sonic_song.id,
+ provider=self.instance_id,
+ can_seek=self._seek_support,
+ audio_format=AudioFormat(content_type=ContentType.try_parse(mime_type)),
+ stream_type=StreamType.CUSTOM,
+ duration=sonic_song.duration if sonic_song.duration is not None else 0,
+ )
+
+ async def _report_playback_started(self, item_id: str) -> None:
+ await self._run_async(self._conn.scrobble, sid=item_id, submission=False)
+
+ async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
+ """Handle callback when an item completed streaming."""
+ if seconds_streamed >= streamdetails.duration / 2:
+ await self._run_async(self._conn.scrobble, sid=streamdetails.item_id, submission=True)
+
+ async def get_audio_stream(
+ self, streamdetails: StreamDetails, seek_position: int = 0
+ ) -> AsyncGenerator[bytes, None]:
+ """Provide a generator for the stream data."""
+ audio_buffer = asyncio.Queue(1)
+
+ self.logger.debug("Streaming %s", streamdetails.item_id)
+
+ def _streamer() -> None:
+ with self._conn.stream(
+ streamdetails.item_id, timeOffset=seek_position, estimateContentLength=True
+ ) as stream:
+ for chunk in stream.iter_content(chunk_size=40960):
+ asyncio.run_coroutine_threadsafe(
+ audio_buffer.put(chunk), self.mass.loop
+ ).result()
+ # send empty chunk when we're done
+ asyncio.run_coroutine_threadsafe(audio_buffer.put(b"EOF"), self.mass.loop).result()
+
+ # fire up an executor thread to put the audio chunks (threadsafe) on the audio buffer
+ streamer_task = self.mass.loop.run_in_executor(None, _streamer)
+ try:
+ while True:
+ # keep reading from the audio buffer until there is no more data
+ chunk = await audio_buffer.get()
+ if chunk == b"EOF":
+ break
+ yield chunk
+ finally:
+ if not streamer_task.done():
+ streamer_task.cancel()
+
+ self.logger.debug("Done streaming %s", streamdetails.item_id)
--- /dev/null
+"""Snapcast Player provider for Music Assistant."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import pathlib
+import random
+import re
+import socket
+import time
+from contextlib import suppress
+from typing import TYPE_CHECKING, Final, cast
+
+from bidict import bidict
+from music_assistant_models.config_entries import (
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_FLOW_MODE_ENFORCED,
+ ConfigEntry,
+ ConfigValueOption,
+ ConfigValueType,
+ create_sample_rates_config_entry,
+)
+from music_assistant_models.enums import (
+ ConfigEntryType,
+ ContentType,
+ MediaType,
+ PlayerFeature,
+ PlayerState,
+ PlayerType,
+ ProviderFeature,
+)
+from music_assistant_models.errors import SetupFailedError
+from music_assistant_models.media_items import AudioFormat
+from music_assistant_models.player import DeviceInfo, Player, PlayerMedia
+from snapcast.control import create_server
+from snapcast.control.client import Snapclient
+from zeroconf import NonUniqueNameException
+from zeroconf.asyncio import AsyncServiceInfo
+
+from music_assistant.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params
+from music_assistant.helpers.process import AsyncProcess, check_output
+from music_assistant.helpers.util import get_ip_pton
+from music_assistant.models.player_provider import PlayerProvider
+
+if TYPE_CHECKING:
+ from music_assistant_models.config_entries import ProviderConfig
+ from music_assistant_models.provider import ProviderManifest
+ from snapcast.control.group import Snapgroup
+ from snapcast.control.server import Snapserver
+ from snapcast.control.stream import Snapstream
+
+ from music_assistant import MusicAssistant
+ from music_assistant.models import ProviderInstanceType
+ from music_assistant.providers.player_group import PlayerGroupProvider
+
+CONF_SERVER_HOST = "snapcast_server_host"
+CONF_SERVER_CONTROL_PORT = "snapcast_server_control_port"
+CONF_USE_EXTERNAL_SERVER = "snapcast_use_external_server"
+CONF_SERVER_BUFFER_SIZE = "snapcast_server_built_in_buffer_size"
+CONF_SERVER_CHUNK_MS = "snapcast_server_built_in_chunk_ms"
+CONF_SERVER_INITIAL_VOLUME = "snapcast_server_built_in_initial_volume"
+CONF_SERVER_TRANSPORT_CODEC = "snapcast_server_built_in_codec"
+CONF_SERVER_SEND_AUDIO_TO_MUTED = "snapcast_server_built_in_send_muted"
+CONF_STREAM_IDLE_THRESHOLD = "snapcast_stream_idle_threshold"
+
+
+CONF_CATEGORY_GENERIC = "generic"
+CONF_CATEGORY_ADVANCED = "advanced"
+CONF_CATEGORY_BUILT_IN = "Built-in Snapserver Settings"
+
+CONF_HELP_LINK = (
+ "https://raw.githubusercontent.com/badaix/snapcast/refs/heads/master/server/etc/snapserver.conf"
+)
+
+# airplay has fixed sample rate/bit depth so make this config entry static and hidden
+CONF_ENTRY_SAMPLE_RATES_SNAPCAST = create_sample_rates_config_entry(48000, 16, 48000, 16, True)
+
+DEFAULT_SNAPSERVER_IP = "127.0.0.1"
+DEFAULT_SNAPSERVER_PORT = 1705
+DEFAULT_SNAPSTREAM_IDLE_THRESHOLD = 60000
+
+SNAPWEB_DIR: Final[pathlib.Path] = pathlib.Path(__file__).parent.resolve().joinpath("snapweb")
+
+
+DEFAULT_SNAPCAST_FORMAT = AudioFormat(
+ content_type=ContentType.PCM_S16LE,
+ sample_rate=48000,
+ # TODO: can we handle 24 bits bit depth ?
+ bit_depth=16,
+ channels=2,
+)
+
+DEFAULT_SNAPCAST_PCM_FORMAT = AudioFormat(
+ # the format that is used as intermediate pcm stream,
+ # we prefer F32 here to account for volume normalization
+ content_type=ContentType.PCM_F32LE,
+ sample_rate=48000,
+ bit_depth=32,
+ channels=2,
+)
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ return SnapCastProvider(mass, manifest, config)
+
+
+async def get_config_entries(
+ mass: MusicAssistant, # noqa: ARG001
+ instance_id: str | None = None, # noqa: ARG001
+ action: str | None = None, # noqa: ARG001
+ values: dict[str, ConfigValueType] | None = None, # noqa: ARG001
+) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to setup this provider.
+
+ instance_id: id of an existing provider instance (None if new instance setup).
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ returncode, output = await check_output("snapserver", "-v")
+ snapserver_version = int(output.decode().split(".")[1]) if returncode == 0 else -1
+ local_snapserver_present = snapserver_version >= 27
+ if returncode == 0 and not local_snapserver_present:
+ raise SetupFailedError("Invalid snapserver version")
+
+ return (
+ ConfigEntry(
+ key=CONF_SERVER_BUFFER_SIZE,
+ type=ConfigEntryType.INTEGER,
+ range=(200, 6000),
+ default_value=1000,
+ label="Snapserver buffer size",
+ required=False,
+ category=CONF_CATEGORY_BUILT_IN,
+ hidden=not local_snapserver_present,
+ help_link=CONF_HELP_LINK,
+ ),
+ ConfigEntry(
+ key=CONF_SERVER_CHUNK_MS,
+ type=ConfigEntryType.INTEGER,
+ range=(10, 100),
+ default_value=26,
+ label="Snapserver chunk size",
+ required=False,
+ category=CONF_CATEGORY_BUILT_IN,
+ hidden=not local_snapserver_present,
+ help_link=CONF_HELP_LINK,
+ ),
+ ConfigEntry(
+ key=CONF_SERVER_INITIAL_VOLUME,
+ type=ConfigEntryType.INTEGER,
+ range=(0, 100),
+ default_value=25,
+ label="Snapserver initial volume",
+ required=False,
+ category=CONF_CATEGORY_BUILT_IN,
+ hidden=not local_snapserver_present,
+ help_link=CONF_HELP_LINK,
+ ),
+ ConfigEntry(
+ key=CONF_SERVER_SEND_AUDIO_TO_MUTED,
+ type=ConfigEntryType.BOOLEAN,
+ default_value=False,
+ label="Send audio to muted clients",
+ required=False,
+ category=CONF_CATEGORY_BUILT_IN,
+ hidden=not local_snapserver_present,
+ help_link=CONF_HELP_LINK,
+ ),
+ ConfigEntry(
+ key=CONF_SERVER_TRANSPORT_CODEC,
+ type=ConfigEntryType.STRING,
+ options=(
+ ConfigValueOption(
+ title="FLAC",
+ value="flac",
+ ),
+ ConfigValueOption(
+ title="OGG",
+ value="ogg",
+ ),
+ ConfigValueOption(
+ title="OPUS",
+ value="opus",
+ ),
+ ConfigValueOption(
+ title="PCM",
+ value="pcm",
+ ),
+ ),
+ default_value="flac",
+ label="Snapserver default transport codec",
+ required=False,
+ category=CONF_CATEGORY_BUILT_IN,
+ hidden=not local_snapserver_present,
+ help_link=CONF_HELP_LINK,
+ ),
+ ConfigEntry(
+ key=CONF_USE_EXTERNAL_SERVER,
+ type=ConfigEntryType.BOOLEAN,
+ default_value=not local_snapserver_present,
+ label="Use existing Snapserver",
+ required=False,
+ category=(
+ CONF_CATEGORY_ADVANCED if local_snapserver_present else CONF_CATEGORY_GENERIC
+ ),
+ ),
+ ConfigEntry(
+ key=CONF_SERVER_HOST,
+ type=ConfigEntryType.STRING,
+ default_value=DEFAULT_SNAPSERVER_IP,
+ label="Snapcast server ip",
+ required=False,
+ depends_on=CONF_USE_EXTERNAL_SERVER,
+ category=(
+ CONF_CATEGORY_ADVANCED if local_snapserver_present else CONF_CATEGORY_GENERIC
+ ),
+ ),
+ ConfigEntry(
+ key=CONF_SERVER_CONTROL_PORT,
+ type=ConfigEntryType.INTEGER,
+ default_value=DEFAULT_SNAPSERVER_PORT,
+ label="Snapcast control port",
+ required=False,
+ depends_on=CONF_USE_EXTERNAL_SERVER,
+ category=(
+ CONF_CATEGORY_ADVANCED if local_snapserver_present else CONF_CATEGORY_GENERIC
+ ),
+ ),
+ ConfigEntry(
+ key=CONF_STREAM_IDLE_THRESHOLD,
+ type=ConfigEntryType.INTEGER,
+ default_value=DEFAULT_SNAPSTREAM_IDLE_THRESHOLD,
+ label="Snapcast idle threshold stream parameter",
+ required=True,
+ category=CONF_CATEGORY_ADVANCED,
+ ),
+ )
+
+
+class SnapCastProvider(PlayerProvider):
+ """Player provider for Snapcast based players."""
+
+ _snapserver: Snapserver
+ _snapcast_server_host: str
+ _snapcast_server_control_port: int
+ _stream_tasks: dict[str, asyncio.Task]
+ _use_builtin_server: bool
+ _snapserver_runner: asyncio.Task | None
+ _snapserver_started: asyncio.Event | None
+ _ids_map: bidict # ma_id / snapclient_id
+ _stop_called: bool
+
+ def _get_snapclient_id(self, player_id: str) -> str:
+ search_dict = self._ids_map
+ return search_dict.get(player_id)
+
+ def _get_ma_id(self, snap_client_id: str) -> str:
+ search_dict = self._ids_map.inverse
+ return search_dict.get(snap_client_id)
+
+ def _generate_and_register_id(self, snap_client_id) -> str:
+ search_dict = self._ids_map.inverse
+ if snap_client_id not in search_dict:
+ new_id = "ma_" + str(re.sub(r"\W+", "", snap_client_id))
+ self._ids_map[new_id] = snap_client_id
+ return new_id
+ else:
+ return self._get_ma_id(snap_client_id)
+
+ @property
+ def supported_features(self) -> tuple[ProviderFeature, ...]:
+ """Return the features supported by this Provider."""
+ return (ProviderFeature.SYNC_PLAYERS,)
+
+ async def handle_async_init(self) -> None:
+ """Handle async initialization of the provider."""
+ # set snapcast logging
+ logging.getLogger("snapcast").setLevel(self.logger.level)
+ self._use_builtin_server = not self.config.get_value(CONF_USE_EXTERNAL_SERVER)
+ self._stop_called = False
+ if self._use_builtin_server:
+ self._snapcast_server_host = "127.0.0.1"
+ self._snapcast_server_control_port = DEFAULT_SNAPSERVER_PORT
+ self._snapcast_server_buffer_size = self.config.get_value(CONF_SERVER_BUFFER_SIZE)
+ self._snapcast_server_chunk_ms = self.config.get_value(CONF_SERVER_CHUNK_MS)
+ self._snapcast_server_initial_volume = self.config.get_value(CONF_SERVER_INITIAL_VOLUME)
+ self._snapcast_server_send_to_muted = self.config.get_value(
+ CONF_SERVER_SEND_AUDIO_TO_MUTED
+ )
+ self._snapcast_server_transport_codec = self.config.get_value(
+ CONF_SERVER_TRANSPORT_CODEC
+ )
+
+ else:
+ self._snapcast_server_host = self.config.get_value(CONF_SERVER_HOST)
+ self._snapcast_server_control_port = self.config.get_value(CONF_SERVER_CONTROL_PORT)
+ self._snapcast_stream_idle_threshold = self.config.get_value(CONF_STREAM_IDLE_THRESHOLD)
+ self._stream_tasks = {}
+ self._ids_map = bidict({})
+
+ if self._use_builtin_server:
+ await self._start_builtin_server()
+ else:
+ self._snapserver_runner = None
+ self._snapserver_started = None
+ try:
+ self._snapserver = await create_server(
+ self.mass.loop,
+ self._snapcast_server_host,
+ port=self._snapcast_server_control_port,
+ reconnect=True,
+ )
+ self._snapserver.set_on_update_callback(self._handle_update)
+ self.logger.info(
+ "Started connection to Snapserver %s",
+ f"{self._snapcast_server_host}:{self._snapcast_server_control_port}",
+ )
+ # register callback for when the connection gets lost to the snapserver
+ self._snapserver.set_on_disconnect_callback(self._handle_disconnect)
+ await self._create_default_stream()
+ except OSError as err:
+ msg = "Unable to start the Snapserver connection ?"
+ raise SetupFailedError(msg) from err
+
+ async def loaded_in_mass(self) -> None:
+ """Call after the provider has been loaded."""
+ await super().loaded_in_mass()
+ # initial load of players
+ self._handle_update()
+
+ async def unload(self) -> None:
+ """Handle close/cleanup of the provider."""
+ self._stop_called = True
+ for snap_client_id in self._snapserver.clients:
+ player_id = self._get_ma_id(snap_client_id)
+ await self.cmd_stop(player_id)
+ self._snapserver.stop()
+ await self._stop_builtin_server()
+
+ def _handle_update(self) -> None:
+ """Process Snapcast init Player/Group and set callback ."""
+ for snap_client in self._snapserver.clients:
+ self._handle_player_init(snap_client)
+ snap_client.set_callback(self._handle_player_update)
+ for snap_client in self._snapserver.clients:
+ self._handle_player_update(snap_client)
+ for snap_group in self._snapserver.groups:
+ snap_group.set_callback(self._handle_group_update)
+
+ def _handle_group_update(self, snap_group: Snapgroup) -> None:
+ """Process Snapcast group callback."""
+ for snap_client in self._snapserver.clients:
+ self._handle_player_update(snap_client)
+
+ def _handle_player_init(self, snap_client: Snapclient) -> None:
+ """Process Snapcast add to Player controller."""
+ player_id = self._generate_and_register_id(snap_client.identifier)
+ player = self.mass.players.get(player_id, raise_unavailable=False)
+ if not player:
+ snap_client = cast(
+ Snapclient, self._snapserver.client(self._get_snapclient_id(player_id))
+ )
+ player = Player(
+ player_id=player_id,
+ provider=self.instance_id,
+ type=PlayerType.PLAYER,
+ name=snap_client.friendly_name,
+ available=True,
+ powered=snap_client.connected,
+ device_info=DeviceInfo(
+ model=snap_client._client.get("host").get("os"),
+ address=snap_client._client.get("host").get("ip"),
+ manufacturer=snap_client._client.get("host").get("arch"),
+ ),
+ supported_features=(
+ PlayerFeature.SYNC,
+ PlayerFeature.VOLUME_SET,
+ PlayerFeature.VOLUME_MUTE,
+ ),
+ group_childs=set(),
+ synced_to=self._synced_to(player_id),
+ )
+ asyncio.run_coroutine_threadsafe(
+ self.mass.players.register_or_update(player), loop=self.mass.loop
+ )
+
+ def _handle_player_update(self, snap_client: Snapclient) -> None:
+ """Process Snapcast update to Player controller."""
+ player_id = self._get_ma_id(snap_client.identifier)
+ player = self.mass.players.get(player_id)
+ if not player:
+ return
+ player.name = snap_client.friendly_name
+ player.volume_level = snap_client.volume
+ player.volume_muted = snap_client.muted
+ player.available = snap_client.connected
+ player.synced_to = self._synced_to(player_id)
+ if player.active_group is None:
+ if stream := self._get_snapstream(player_id):
+ if stream.name.startswith(("MusicAssistant", "default")):
+ player.active_source = player_id
+ else:
+ player.active_source = stream.name
+ else:
+ player.active_source = player_id
+ self._group_childs(player_id)
+ self.mass.players.update(player_id)
+
+ async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
+ """Return all (provider/player specific) Config Entries for the given player (if any)."""
+ base_entries = await super().get_player_config_entries(player_id)
+ return (
+ *base_entries,
+ CONF_ENTRY_FLOW_MODE_ENFORCED,
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_SAMPLE_RATES_SNAPCAST,
+ )
+
+ async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
+ """Send VOLUME_SET command to given player."""
+ snap_client_id = self._get_snapclient_id(player_id)
+ await self._snapserver.client(snap_client_id).set_volume(volume_level)
+ self.mass.players.update(snap_client_id)
+
+ async def cmd_stop(self, player_id: str) -> None:
+ """Send STOP command to given player."""
+ player = self.mass.players.get(player_id, raise_unavailable=False)
+ if stream_task := self._stream_tasks.pop(player_id, None):
+ if not stream_task.done():
+ stream_task.cancel()
+ player.state = PlayerState.IDLE
+ self._set_childs_state(player_id)
+ self.mass.players.update(player_id)
+ # assign default/empty stream to the player
+ await self._get_snapgroup(player_id).set_stream("default")
+
+ async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
+ """Send MUTE command to given player."""
+ ma_player = self.mass.players.get(player_id, raise_unavailable=False)
+ snap_client_id = self._get_snapclient_id(player_id)
+ snapclient = self._snapserver.client(snap_client_id)
+ # Using optimistic value because the library does not return the response from the api
+ await snapclient.set_muted(muted)
+ ma_player.volume_muted = snapclient.muted
+ self.mass.players.update(player_id)
+
+ async def cmd_sync(self, player_id: str, target_player: str) -> None:
+ """Sync Snapcast player."""
+ group = self._get_snapgroup(target_player)
+ mass_target_player = self.mass.players.get(target_player)
+ if self._get_snapclient_id(player_id) not in group.clients:
+ await group.add_client(self._get_snapclient_id(player_id))
+ mass_player = self.mass.players.get(player_id)
+ mass_player.synced_to = target_player
+ mass_target_player.group_childs.add(player_id)
+ self.mass.players.update(player_id)
+ self.mass.players.update(target_player)
+
+ async def cmd_unsync(self, player_id: str) -> None:
+ """Unsync Snapcast player."""
+ mass_player = self.mass.players.get(player_id)
+ if mass_player.synced_to is None:
+ for mass_child_id in list(mass_player.group_childs):
+ if mass_child_id != player_id:
+ await self.cmd_unsync(mass_child_id)
+ return
+ mass_sync_master_player = self.mass.players.get(mass_player.synced_to)
+ mass_sync_master_player.group_childs.remove(player_id)
+ mass_player.synced_to = None
+ snap_client_id = self._get_snapclient_id(player_id)
+ group = self._get_snapgroup(player_id)
+ await group.remove_client(snap_client_id)
+ # assign default/empty stream to the player
+ await self._get_snapgroup(player_id).set_stream("default")
+ await self.cmd_stop(player_id=player_id)
+ # make sure that the player manager gets an update
+ self.mass.players.update(player_id, skip_forward=True)
+ self.mass.players.update(mass_player.synced_to, skip_forward=True)
+
+ async def play_media(self, player_id: str, media: PlayerMedia) -> None:
+ """Handle PLAY MEDIA on given player."""
+ player = self.mass.players.get(player_id)
+ if player.synced_to:
+ msg = "A synced player cannot receive play commands directly"
+ raise RuntimeError(msg)
+ # stop any existing streams first
+ if stream_task := self._stream_tasks.pop(player_id, None):
+ if not stream_task.done():
+ stream_task.cancel()
+ # initialize a new stream and attach it to the group
+ stream, port = await self._create_stream()
+ snap_group = self._get_snapgroup(player_id)
+ await snap_group.set_stream(stream.identifier)
+
+ # select audio source
+ if media.media_type == MediaType.ANNOUNCEMENT:
+ # special case: stream announcement
+ input_format = DEFAULT_SNAPCAST_FORMAT
+ audio_source = self.mass.streams.get_announcement_stream(
+ media.custom_data["url"],
+ output_format=DEFAULT_SNAPCAST_FORMAT,
+ use_pre_announce=media.custom_data["use_pre_announce"],
+ )
+ elif media.queue_id.startswith("ugp_"):
+ # special case: UGP stream
+ ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
+ ugp_stream = ugp_provider.ugp_streams[media.queue_id]
+ input_format = ugp_stream.output_format
+ audio_source = ugp_stream.subscribe()
+ elif media.queue_id and media.queue_item_id:
+ # regular queue (flow) stream request
+ input_format = DEFAULT_SNAPCAST_PCM_FORMAT
+ audio_source = self.mass.streams.get_flow_stream(
+ queue=self.mass.player_queues.get(media.queue_id),
+ start_queue_item=self.mass.player_queues.get_item(
+ media.queue_id, media.queue_item_id
+ ),
+ pcm_format=input_format,
+ )
+ else:
+ # assume url or some other direct path
+ # NOTE: this will fail if its an uri not playable by ffmpeg
+ input_format = DEFAULT_SNAPCAST_FORMAT
+ audio_source = get_ffmpeg_stream(
+ audio_input=media.uri,
+ input_format=AudioFormat(ContentType.try_parse(media.uri)),
+ output_format=DEFAULT_SNAPCAST_FORMAT,
+ )
+
+ async def _streamer() -> None:
+ host = self._snapcast_server_host
+ stream_path = f"tcp://{host}:{port}"
+ self.logger.debug("Start streaming to %s", stream_path)
+ try:
+ async with FFMpeg(
+ audio_input=audio_source,
+ input_format=input_format,
+ output_format=DEFAULT_SNAPCAST_FORMAT,
+ filter_params=get_player_filter_params(self.mass, player_id),
+ audio_output=stream_path,
+ ) as ffmpeg_proc:
+ player.state = PlayerState.PLAYING
+ player.current_media = media
+ player.elapsed_time = 0
+ player.elapsed_time_last_updated = time.time()
+ self.mass.players.update(player_id)
+ self._set_childs_state(player_id)
+ await ffmpeg_proc.wait()
+ self.logger.debug("Finished streaming to %s", stream_path)
+ # we need to wait a bit for the stream status to become idle
+ # to ensure that all snapclients have consumed the audio
+ while stream.status != "idle":
+ await asyncio.sleep(0.25)
+ player.state = PlayerState.IDLE
++ player.elapsed_time = time.time() - player.elapsed_time_last_updated
+ self.mass.players.update(player_id)
+ self._set_childs_state(player_id)
+ finally:
+ await self._delete_current_snapstream(stream, media)
+
+ # start streaming the queue (pcm) audio in a background task
+ self._stream_tasks[player_id] = asyncio.create_task(_streamer())
+
+ async def _delete_current_snapstream(self, stream: Snapstream, media: PlayerMedia) -> None:
+ with suppress(TypeError, KeyError, AttributeError):
+ if media.duration < 5:
+ await asyncio.sleep(5)
+ await self._snapserver.stream_remove_stream(stream.identifier)
+
+ def _get_snapgroup(self, player_id: str) -> Snapgroup:
+ """Get snapcast group for given player_id."""
+ snap_client_id = self._get_snapclient_id(player_id)
+ client: Snapclient = self._snapserver.client(snap_client_id)
+ return client.group
+
+ def _get_snapstream(self, player_id: str) -> Snapstream | None:
+ """Get snapcast stream for given player_id."""
+ if group := self._get_snapgroup(player_id):
+ with suppress(KeyError):
+ return self._snapserver.stream(group.stream)
+ return None
+
+ def _synced_to(self, player_id: str) -> str | None:
+ """Return player_id of the player this player is synced to."""
+ snap_group: Snapgroup = self._get_snapgroup(player_id)
+ master_id: str = self._get_ma_id(snap_group.clients[0])
+
+ if len(snap_group.clients) < 2 or player_id == master_id:
+ return None
+ return master_id
+
+ def _group_childs(self, player_id: str) -> set[str]:
+ """Return player_ids of the players synced to this player."""
+ mass_player = self.mass.players.get(player_id, raise_unavailable=False)
+ snap_group = self._get_snapgroup(player_id)
+ mass_player.group_childs.clear()
+ if mass_player.synced_to is not None:
+ return
+ mass_player.group_childs.add(player_id)
+ {
+ mass_player.group_childs.add(self._get_ma_id(snap_client_id))
+ for snap_client_id in snap_group.clients
+ if self._get_ma_id(snap_client_id) != player_id
+ and self._snapserver.client(snap_client_id).connected
+ }
+
+ async def _create_stream(self) -> tuple[Snapstream, int]:
+ """Create new stream on snapcast server."""
+ attempts = 50
+ while attempts:
+ attempts -= 1
+ # pick a random port
+ port = random.randint(4953, 4953 + 200)
+ name = f"MusicAssistant--{port}"
+ result = await self._snapserver.stream_add_stream(
+ # NOTE: setting the sampleformat to something else
+ # (like 24 bits bit depth) does not seem to work at all!
+ f"tcp://0.0.0.0:{port}?name={name}&sampleformat=48000:16:2&idle_threshold={self._snapcast_stream_idle_threshold}",
+ )
+ if "id" not in result:
+ # if the port is already taken, the result will be an error
+ self.logger.warning(result)
+ continue
+ stream = self._snapserver.stream(result["id"])
+ return (stream, port)
+ msg = "Unable to create stream - No free port found?"
+ raise RuntimeError(msg)
+
+ async def _create_default_stream(self) -> None:
+ """Create new stream on snapcast server named default case not exist."""
+ all_streams = {stream.name for stream in self._snapserver.streams}
+ if "default" not in all_streams:
+ await self._snapserver.stream_add_stream(
+ "pipe:///tmp/snapfifo?name=default&sampleformat=48000:16:2"
+ )
+
+ def _set_childs_state(self, player_id: str) -> None:
+ """Set the state of the child`s of the player."""
+ mass_player = self.mass.players.get(player_id)
+ for child_player_id in mass_player.group_childs:
+ if child_player_id == player_id:
+ continue
+ mass_child_player = self.mass.players.get(child_player_id)
+ mass_child_player.state = mass_player.state
+ self.mass.players.update(child_player_id)
+
+ async def _builtin_server_runner(self) -> None:
+ """Start running the builtin snapserver."""
+ if self._snapserver_started.is_set():
+ raise RuntimeError("Snapserver is already started!")
+ logger = self.logger.getChild("snapserver")
+ logger.info("Starting builtin Snapserver...")
+ # register the snapcast mdns services
+ for name, port in (
+ ("-http", 1780),
+ ("-jsonrpc", 1705),
+ ("-stream", 1704),
+ ("-tcp", 1705),
+ ("", 1704),
+ ):
+ zeroconf_type = f"_snapcast{name}._tcp.local."
+ try:
+ info = AsyncServiceInfo(
+ zeroconf_type,
+ name=f"Snapcast.{zeroconf_type}",
+ properties={"is_mass": "true"},
+ addresses=[await get_ip_pton(self.mass.streams.publish_ip)],
+ port=port,
+ server=f"{socket.gethostname()}.local",
+ )
+ attr_name = f"zc_service_set{name}"
+ if getattr(self, attr_name, None):
+ await self.mass.aiozc.async_update_service(info)
+ else:
+ await self.mass.aiozc.async_register_service(info, strict=False)
+ setattr(self, attr_name, True)
+ except NonUniqueNameException:
+ self.logger.debug(
+ "Could not register mdns record for %s as its already in use",
+ zeroconf_type,
+ )
+ except Exception as err:
+ self.logger.exception(
+ "Could not register mdns record for %s: %s", zeroconf_type, str(err)
+ )
+ args = [
+ "snapserver",
+ # config settings taken from
+ # https://raw.githubusercontent.com/badaix/snapcast/86cd4b2b63e750a72e0dfe6a46d47caf01426c8d/server/etc/snapserver.conf
+ f"--server.datadir={self.mass.storage_path}",
+ "--http.enabled=true",
+ "--http.port=1780",
+ f"--http.doc_root={SNAPWEB_DIR}",
+ "--tcp.enabled=true",
+ f"--tcp.port={self._snapcast_server_control_port}",
+ f"--stream.buffer={self._snapcast_server_buffer_size}",
+ f"--stream.chunk_ms={self._snapcast_server_chunk_ms}",
+ f"--stream.codec={self._snapcast_server_transport_codec}",
+ f"--stream.send_to_muted={str(self._snapcast_server_send_to_muted).lower()}",
+ f"--streaming_client.initial_volume={self._snapcast_server_initial_volume}",
+ ]
+ async with AsyncProcess(args, stdout=True, name="snapserver") as snapserver_proc:
+ # keep reading from stdout until exit
+ async for data in snapserver_proc.iter_any():
+ data = data.decode().strip() # noqa: PLW2901
+ for line in data.split("\n"):
+ logger.debug(line)
+ if "(Snapserver) Version 0." in line:
+ # delay init a small bit to prevent race conditions
+ # where we try to connect too soon
+ self.mass.loop.call_later(2, self._snapserver_started.set)
+
+ async def _stop_builtin_server(self) -> None:
+ """Stop the built-in Snapserver."""
+ self.logger.info("Stopping, built-in Snapserver")
+ if self._snapserver_runner and not self._snapserver_runner.done():
+ self._snapserver_runner.cancel()
+ self._snapserver_started.clear()
+
+ async def _start_builtin_server(self) -> None:
+ """Start the built-in Snapserver."""
+ if self._use_builtin_server:
+ self._snapserver_started = asyncio.Event()
+ self._snapserver_runner = asyncio.create_task(self._builtin_server_runner())
+ await asyncio.wait_for(self._snapserver_started.wait(), 10)
+
+ def _handle_disconnect(self, exc: Exception) -> None:
+ """Handle disconnect callback from snapserver."""
+ if self._stop_called:
+ # we're instructed to stop/exit, so no need to restart the connection
+ return
+ self.logger.info(
+ "Connection to SnapServer lost, reason: %s. Reloading provider in 5 seconds.",
+ str(exc),
+ )
+ # schedule a reload of the provider
+ self.mass.call_later(5, self.mass.load_provider, self.instance_id, allow_retry=True)