From: Marcel van der Veldt Date: Sat, 2 Nov 2024 16:20:21 +0000 (+0100) Subject: Merge branch 'dev' of https://github.com/music-assistant/server into dev X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=03530f92992093f2cd3a0bd027ef0b96f53bd7ce;p=music-assistant-server.git Merge branch 'dev' of https://github.com/music-assistant/server into dev --- 03530f92992093f2cd3a0bd027ef0b96f53bd7ce diff --cc music_assistant/providers/chromecast/manifest.json index 6bcee60c,00000000..fdbf817f mode 100644,000000..100644 --- a/music_assistant/providers/chromecast/manifest.json +++ b/music_assistant/providers/chromecast/manifest.json @@@ -1,19 -1,0 +1,19 @@@ +{ + "type": "player", + "domain": "chromecast", + "name": "Chromecast", + "description": "Support for Chromecast based players.", + "codeowners": [ + "@music-assistant" + ], + "requirements": [ - "PyChromecast==14.0.4" ++ "PyChromecast==14.0.5" + ], + "documentation": "https://music-assistant.io/player-support/google-cast/", + "multi_instance": false, + "builtin": false, + "icon": "cast", + "mdns_discovery": [ + "_googlecast._tcp.local." + ] +} diff --cc music_assistant/providers/opensubsonic/manifest.json index 002cb5be,00000000..fda0aa2a mode 100644,000000..100644 --- a/music_assistant/providers/opensubsonic/manifest.json +++ b/music_assistant/providers/opensubsonic/manifest.json @@@ -1,14 -1,0 +1,14 @@@ +{ + "type": "music", + "domain": "opensubsonic", + "name": "Open Subsonic Media Server Library", + "description": "Support for Open Subsonic based streaming providers in Music Assistant.", + "codeowners": [ + "@khers" + ], + "requirements": [ - "py-opensonic==5.1.1" ++ "py-opensonic==5.2.1" + ], + "documentation": "https://music-assistant.io/music-providers/subsonic/", + "multi_instance": true +} diff --cc music_assistant/providers/opensubsonic/sonic_provider.py index ed1c07a4,00000000..09fb1eaf mode 100644,000000..100644 --- a/music_assistant/providers/opensubsonic/sonic_provider.py +++ b/music_assistant/providers/opensubsonic/sonic_provider.py @@@ -1,843 -1,0 +1,844 @@@ +"""The provider class for Open Subsonic.""" + +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING + +from libopensonic.connection import Connection as SonicConnection +from libopensonic.errors import ( + AuthError, + CredentialError, + DataNotFoundError, + ParameterError, + SonicError, +) +from music_assistant_models.enums import ( + ContentType, + ImageType, + MediaType, + ProviderFeature, + StreamType, +) +from music_assistant_models.errors import LoginFailed, MediaNotFoundError, ProviderPermissionDenied +from music_assistant_models.media_items import ( + Album, + AlbumType, + Artist, + AudioFormat, + ItemMapping, + MediaItemImage, + Playlist, + ProviderMapping, + SearchResults, + Track, +) +from music_assistant_models.streamdetails import StreamDetails + +from music_assistant.constants import ( + CONF_PASSWORD, + CONF_PATH, + CONF_PORT, + CONF_USERNAME, + UNKNOWN_ARTIST, +) +from music_assistant.models.music_provider import MusicProvider + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator, Callable + + from libopensonic.media import Album as SonicAlbum + from libopensonic.media import AlbumInfo as SonicAlbumInfo + from libopensonic.media import Artist as SonicArtist + from libopensonic.media import ArtistInfo as SonicArtistInfo + from libopensonic.media import Playlist as SonicPlaylist + from libopensonic.media import PodcastChannel as SonicPodcastChannel + from libopensonic.media import PodcastEpisode as SonicPodcastEpisode + from libopensonic.media import Song as SonicSong + +CONF_BASE_URL = "baseURL" +CONF_ENABLE_PODCASTS = "enable_podcasts" +CONF_ENABLE_LEGACY_AUTH = "enable_legacy_auth" + +UNKNOWN_ARTIST_ID = "fake_artist_unknown" + +# We need the following prefix because of the way that Navidrome reports artists for individual +# tracks on Various Artists albums, see the note in the _parse_track() method and the handling +# in get_artist() +NAVI_VARIOUS_PREFIX = "MA-NAVIDROME-" + + +class OpenSonicProvider(MusicProvider): + """Provider for Open Subsonic servers.""" + + _conn: SonicConnection = None + _enable_podcasts: bool = True + _seek_support: bool = False + + async def handle_async_init(self) -> None: + """Set up the music provider and test the connection.""" + port = self.config.get_value(CONF_PORT) + if port is None: + port = 443 + path = self.config.get_value(CONF_PATH) + if path is None: + path = "" + self._conn = SonicConnection( + self.config.get_value(CONF_BASE_URL), + username=self.config.get_value(CONF_USERNAME), + password=self.config.get_value(CONF_PASSWORD), + legacyAuth=self.config.get_value(CONF_ENABLE_LEGACY_AUTH), + port=port, + serverPath=path, + appName="Music Assistant", + ) + try: + success = await self._run_async(self._conn.ping) + if not success: - msg = ( - f"Failed to connect to {self.config.get_value(CONF_BASE_URL)}, " - "check your settings." - ) - raise LoginFailed(msg) ++ raise CredentialError + except (AuthError, CredentialError) as e: + msg = ( - f"Failed to connect to {self.config.get_value(CONF_BASE_URL)}, check your settings." ++ "Failed to connect to " ++ f"{self.config.get_value(CONF_BASE_URL)}" ++ ", check your settings." + ) + raise LoginFailed(msg) from e + self._enable_podcasts = self.config.get_value(CONF_ENABLE_PODCASTS) + try: + ret = await self._run_async(self._conn.getOpenSubsonicExtensions) + extensions = ret["openSubsonicExtensions"] + for entry in extensions: + if entry["name"] == "transcodeOffset": + self._seek_support = True + break + except OSError: + self.logger.info("Server does not support transcodeOffset, seeking in player provider") + + @property + def supported_features(self) -> tuple[ProviderFeature, ...]: + """Return a list of supported features.""" + return ( + ProviderFeature.LIBRARY_ARTISTS, + ProviderFeature.LIBRARY_ALBUMS, + ProviderFeature.LIBRARY_TRACKS, + ProviderFeature.LIBRARY_PLAYLISTS, + ProviderFeature.LIBRARY_PLAYLISTS_EDIT, + ProviderFeature.BROWSE, + ProviderFeature.SEARCH, + ProviderFeature.ARTIST_ALBUMS, + ProviderFeature.ARTIST_TOPTRACKS, + ProviderFeature.SIMILAR_TRACKS, + ProviderFeature.PLAYLIST_TRACKS_EDIT, + ProviderFeature.PLAYLIST_CREATE, + ) + + @property + def is_streaming_provider(self) -> bool: + """ + Return True if the provider is a streaming provider. + + This literally means that the catalog is not the same as the library contents. + For local based providers (files, plex), the catalog is the same as the library content. + It also means that data is if this provider is NOT a streaming provider, + data cross instances is unique, the catalog and library differs per instance. + + Setting this to True will only query one instance of the provider for search and lookups. + Setting this to False will query all instances of this provider for search and lookups. + """ + return False + + def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping: + return ItemMapping( + media_type=media_type, + item_id=key, + provider=self.instance_id, + name=name, + ) + + def _parse_podcast_artist(self, sonic_channel: SonicPodcastChannel) -> Artist: + artist = Artist( + item_id=sonic_channel.id, + name=sonic_channel.title, + provider=self.instance_id, + favorite=bool(sonic_channel.starred), + provider_mappings={ + ProviderMapping( + item_id=sonic_channel.id, + provider_domain=self.domain, + provider_instance=self.instance_id, + ) + }, + ) + if sonic_channel.description is not None: + artist.metadata.description = sonic_channel.description + if sonic_channel.original_image_url: + artist.metadata.images = [ + MediaItemImage( + type=ImageType.THUMB, + path=sonic_channel.original_image_url, + provider=self.instance_id, + remotely_accessible=True, + ) + ] + return artist + + def _parse_podcast_album(self, sonic_channel: SonicPodcastChannel) -> Album: + return Album( + item_id=sonic_channel.id, + provider=self.instance_id, + name=sonic_channel.title, + provider_mappings={ + ProviderMapping( + item_id=sonic_channel.id, + provider_domain=self.domain, + provider_instance=self.instance_id, + available=True, + ) + }, + album_type=AlbumType.PODCAST, + ) + + def _parse_podcast_episode( + self, sonic_episode: SonicPodcastEpisode, sonic_channel: SonicPodcastChannel + ) -> Track: + return Track( + item_id=sonic_episode.id, + provider=self.instance_id, + name=sonic_episode.title, + album=self._parse_podcast_album(sonic_channel=sonic_channel), + artists=[self._parse_podcast_artist(sonic_channel=sonic_channel)], + duration=sonic_episode.duration if sonic_episode.duration is not None else 0, + favorite=bool(sonic_episode.starred), + provider_mappings={ + ProviderMapping( + item_id=sonic_episode.id, + provider_domain=self.domain, + provider_instance=self.instance_id, + available=True, + ) + }, + ) + + async def _get_podcast_artists(self) -> list[Artist]: + if not self._enable_podcasts: + return [] + + sonic_channels = await self._run_async(self._conn.getPodcasts, incEpisodes=False) + artists = [] + for channel in sonic_channels: + artists.append(self._parse_podcast_artist(channel)) + return artists + + async def _get_podcasts(self) -> list[SonicPodcastChannel]: + if not self._enable_podcasts: + return [] + return await self._run_async(self._conn.getPodcasts, incEpisodes=True) + + def _parse_artist( + self, sonic_artist: SonicArtist, sonic_info: SonicArtistInfo = None + ) -> Artist: + artist = Artist( + item_id=sonic_artist.id, + name=sonic_artist.name, + provider=self.domain, + favorite=bool(sonic_artist.starred), + provider_mappings={ + ProviderMapping( + item_id=sonic_artist.id, + provider_domain=self.domain, + provider_instance=self.instance_id, + ) + }, + ) + + if sonic_artist.cover_id: + artist.metadata.images = [ + MediaItemImage( + type=ImageType.THUMB, + path=sonic_artist.cover_id, + provider=self.instance_id, + remotely_accessible=False, + ) + ] + else: + artist.metadata.images = [] + + if sonic_info: + if sonic_info.biography: + artist.metadata.description = sonic_info.biography + if sonic_info.small_url: + artist.metadata.images.append( + MediaItemImage( + type=ImageType.THUMB, + path=sonic_info.small_url, + provider=self.instance_id, + remotely_accessible=True, + ) + ) + return artist + + def _parse_album(self, sonic_album: SonicAlbum, sonic_info: SonicAlbumInfo = None) -> Album: + album_id = sonic_album.id + album = Album( + item_id=album_id, + provider=self.domain, + name=sonic_album.name, + favorite=bool(sonic_album.starred), + provider_mappings={ + ProviderMapping( + item_id=album_id, + provider_domain=self.domain, + provider_instance=self.instance_id, + ) + }, + year=sonic_album.year, + ) + + if sonic_album.cover_id: + album.metadata.images = [ + MediaItemImage( + type=ImageType.THUMB, + path=sonic_album.cover_id, + provider=self.instance_id, + remotely_accessible=False, + ), + ] + else: + album.metadata.images = [] + + if sonic_album.artist_id: + album.artists.append( + self._get_item_mapping( + MediaType.ARTIST, + sonic_album.artist_id, + sonic_album.artist if sonic_album.artist else UNKNOWN_ARTIST, + ) + ) + else: + self.logger.info( - f"Unable to find an artist ID for album '{sonic_album.name}' with " - f"ID '{sonic_album.id}'." ++ "Unable to find an artist ID for album '%s' with ID '%s'.", ++ sonic_album.name, ++ sonic_album.id, + ) + album.artists.append( + Artist( + item_id=UNKNOWN_ARTIST_ID, + name=UNKNOWN_ARTIST, + provider=self.instance_id, + provider_mappings={ + ProviderMapping( + item_id=UNKNOWN_ARTIST_ID, + provider_domain=self.domain, + provider_instance=self.instance_id, + ) + }, + ) + ) + + if sonic_info: + if sonic_info.small_url: + album.metadata.images.append( + MediaItemImage( + type=ImageType.THUMB, + path=sonic_info.small_url, + remotely_accessible=False, + provider=self.instance_id, + ) + ) + if sonic_info.notes: + album.metadata.description = sonic_info.notes + + return album + + def _parse_track(self, sonic_song: SonicSong) -> Track: + mapping = None + if sonic_song.album_id is not None and sonic_song.album is not None: + mapping = self._get_item_mapping(MediaType.ALBUM, sonic_song.album_id, sonic_song.album) + + track = Track( + item_id=sonic_song.id, + provider=self.instance_id, + name=sonic_song.title, + album=mapping, + duration=sonic_song.duration if sonic_song.duration is not None else 0, + # We are setting disc number to 0 because the standard for what is part of + # a Open Subsonic Song is not yet set and the implementations I have checked + # do not contain this field. We should revisit this when the spec is finished - disc_number=0, ++ disc_number=sonic_song.disc_number or 0, + favorite=bool(sonic_song.starred), + provider_mappings={ + ProviderMapping( + item_id=sonic_song.id, + provider_domain=self.domain, + provider_instance=self.instance_id, + available=True, + audio_format=AudioFormat( + content_type=ContentType.try_parse(sonic_song.content_type) + ), + ) + }, + track_number=getattr(sonic_song, "track", 0), + ) + + # We need to find an artist for this track but various implementations seem to disagree + # about where the artist with the valid ID needs to be found. We will add any artist with + # an ID and only use UNKNOWN if none are found. + + if sonic_song.artist_id: + track.artists.append( + self._get_item_mapping( + MediaType.ARTIST, + sonic_song.artist_id, + sonic_song.artist if sonic_song.artist else UNKNOWN_ARTIST, + ) + ) + + for entry in sonic_song.artists: + if entry.id == sonic_song.artist_id: + continue + if entry.id is not None and entry.name is not None: + track.artists.append(self._get_item_mapping(MediaType.ARTIST, entry.id, entry.name)) + + if not track.artists: + if sonic_song.artist and not sonic_song.artist_id: + # This is how Navidrome handles tracks from albums which are marked + # 'Various Artists'. Unfortunately, we cannot lookup this artist independently + # because it will not have an entry in the artists table so the best we can do it + # add a 'fake' id with the proper artist name and have get_artist() check for this + # id and handle it locally. ++ fake_id = f"{NAVI_VARIOUS_PREFIX}{sonic_song.artist}" + artist = Artist( - item_id=f"{NAVI_VARIOUS_PREFIX}{sonic_song.artist}", ++ item_id=fake_id, + provider=self.domain, + name=sonic_song.artist, + provider_mappings={ + ProviderMapping( - item_id=UNKNOWN_ARTIST_ID, ++ item_id=fake_id, + provider_domain=self.domain, + provider_instance=self.instance_id, + ) + }, + ) + else: + self.logger.info( - f"Unable to find artist ID for track '{sonic_song.title}' with " - f"ID '{sonic_song.id}'." ++ "Unable to find artist ID for track '%s' with ID '%s'.", ++ sonic_song.title, ++ sonic_song.id, + ) + artist = Artist( + item_id=UNKNOWN_ARTIST_ID, + name=UNKNOWN_ARTIST, + provider=self.instance_id, + provider_mappings={ + ProviderMapping( + item_id=UNKNOWN_ARTIST_ID, + provider_domain=self.domain, + provider_instance=self.instance_id, + ) + }, + ) + + track.artists.append(artist) + return track + + def _parse_playlist(self, sonic_playlist: SonicPlaylist) -> Playlist: + playlist = Playlist( + item_id=sonic_playlist.id, + provider=self.domain, + name=sonic_playlist.name, + is_editable=True, + favorite=bool(sonic_playlist.starred), + provider_mappings={ + ProviderMapping( + item_id=sonic_playlist.id, + provider_domain=self.domain, + provider_instance=self.instance_id, + ) + }, + ) + if sonic_playlist.cover_id: + playlist.metadata.images = [ + MediaItemImage( + type=ImageType.THUMB, + path=sonic_playlist.cover_id, + provider=self.instance_id, + remotely_accessible=False, + ) + ] + return playlist + + async def _run_async(self, call: Callable, *args, **kwargs): + return await self.mass.create_task(call, *args, **kwargs) + + async def resolve_image(self, path: str) -> bytes: + """Return the image.""" + + def _get_cover_art() -> bytes: + with self._conn.getCoverArt(path) as art: + return art.content + + return await asyncio.to_thread(_get_cover_art) + + async def search( + self, search_query: str, media_types: list[MediaType], limit: int = 20 + ) -> SearchResults: + """Search the sonic library.""" + artists = limit if MediaType.ARTIST in media_types else 0 + albums = limit if MediaType.ALBUM in media_types else 0 + songs = limit if MediaType.TRACK in media_types else 0 + if not (artists or albums or songs): + return SearchResults() + answer = await self._run_async( + self._conn.search3, + query=search_query, + artistCount=artists, + artistOffset=0, + albumCount=albums, + albumOffset=0, + songCount=songs, + songOffset=0, + musicFolderId=None, + ) + return SearchResults( + artists=[self._parse_artist(entry) for entry in answer["artists"]], + albums=[self._parse_album(entry) for entry in answer["albums"]], + tracks=[self._parse_track(entry) for entry in answer["songs"]], + ) + + async def get_library_artists(self) -> AsyncGenerator[Artist, None]: + """Provide a generator for reading all artists.""" + indices = await self._run_async(self._conn.getArtists) + for index in indices: + for artist in index.artists: + yield self._parse_artist(artist) + + async def get_library_albums(self) -> AsyncGenerator[Album, None]: + """ + Provide a generator for reading all artists. + + Note the pagination, the open subsonic docs say that this method is limited to + returning 500 items per invocation. + """ + offset = 0 + size = 500 + albums = await self._run_async( + self._conn.getAlbumList2, ltype="alphabeticalByArtist", size=size, offset=offset + ) + while albums: + for album in albums: + yield self._parse_album(album) + offset += size + albums = await self._run_async( + self._conn.getAlbumList2, ltype="alphabeticalByArtist", size=size, offset=offset + ) + + async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]: + """Provide a generator for library playlists.""" + results = await self._run_async(self._conn.getPlaylists) + for entry in results: + yield self._parse_playlist(entry) + + async def get_library_tracks(self) -> AsyncGenerator[Track, None]: + """ + Provide a generator for library tracks. + + Note the lack of item count on this method. + """ + query = "" + offset = 0 + count = 500 + try: + results = await self._run_async( + self._conn.search3, + query=query, + artistCount=0, + albumCount=0, + songOffset=offset, + songCount=count, + ) + except ParameterError: + # Older Navidrome does not accept an empty string and requires the empty quotes + query = '""' + results = await self._run_async( + self._conn.search3, + query=query, + artistCount=0, + albumCount=0, + songOffset=offset, + songCount=count, + ) + while results["songs"]: + for entry in results["songs"]: + yield self._parse_track(entry) + offset += count + results = await self._run_async( + self._conn.search3, + query=query, + artistCount=0, + albumCount=0, + songOffset=offset, + songCount=count, + ) + + async def get_album(self, prov_album_id: str) -> Album: + """Return the requested Album.""" + try: + sonic_album: SonicAlbum = await self._run_async(self._conn.getAlbum, prov_album_id) + sonic_info = await self._run_async(self._conn.getAlbumInfo2, aid=prov_album_id) + except (ParameterError, DataNotFoundError) as e: + if self._enable_podcasts: + # This might actually be a 'faked' album from podcasts, try that before giving up + try: + sonic_channel = await self._run_async( + self._conn.getPodcasts, incEpisodes=False, pid=prov_album_id + ) + return self._parse_podcast_album(sonic_channel=sonic_channel) + except SonicError: + pass + msg = f"Album {prov_album_id} not found" + raise MediaNotFoundError(msg) from e + + return self._parse_album(sonic_album, sonic_info) + + async def get_album_tracks(self, prov_album_id: str) -> list[Track]: + """Return a list of tracks on the specified Album.""" + try: + sonic_album: SonicAlbum = await self._run_async(self._conn.getAlbum, prov_album_id) + except (ParameterError, DataNotFoundError) as e: + msg = f"Album {prov_album_id} not found" + raise MediaNotFoundError(msg) from e + tracks = [] + for sonic_song in sonic_album.songs: + tracks.append(self._parse_track(sonic_song)) + return tracks + + async def get_artist(self, prov_artist_id: str) -> Artist: + """Return the requested Artist.""" + if prov_artist_id == UNKNOWN_ARTIST_ID: + return Artist( + item_id=UNKNOWN_ARTIST_ID, + name=UNKNOWN_ARTIST, + provider=self.instance_id, + provider_mappings={ + ProviderMapping( + item_id=UNKNOWN_ARTIST_ID, + provider_domain=self.domain, + provider_instance=self.instance_id, + ) + }, + ) + elif prov_artist_id.startswith(NAVI_VARIOUS_PREFIX): + # Special case for handling track artists on various artists album for Navidrome. + return Artist( + item_id=prov_artist_id, + name=prov_artist_id.removeprefix(NAVI_VARIOUS_PREFIX), + provider=self.instance_id, + provider_mappings={ + ProviderMapping( + item_id=prov_artist_id, + provider_domain=self.domain, + provider_instance=self.instance_id, + ) + }, + ) + + try: + sonic_artist: SonicArtist = await self._run_async( + self._conn.getArtist, artist_id=prov_artist_id + ) + sonic_info = await self._run_async(self._conn.getArtistInfo2, aid=prov_artist_id) + except (ParameterError, DataNotFoundError) as e: + if self._enable_podcasts: + # This might actually be a 'faked' artist from podcasts, try that before giving up + try: + sonic_channel = await self._run_async( + self._conn.getPodcasts, incEpisodes=False, pid=prov_artist_id + ) + return self._parse_podcast_artist(sonic_channel=sonic_channel[0]) + except SonicError: + pass + msg = f"Artist {prov_artist_id} not found" + raise MediaNotFoundError(msg) from e + return self._parse_artist(sonic_artist, sonic_info) + + async def get_track(self, prov_track_id: str) -> Track: + """Return the specified track.""" + try: + sonic_song: SonicSong = await self._run_async(self._conn.getSong, prov_track_id) + except (ParameterError, DataNotFoundError) as e: + msg = f"Item {prov_track_id} not found" + raise MediaNotFoundError(msg) from e + return self._parse_track(sonic_song) + + async def get_artist_albums(self, prov_artist_id: str) -> list[Album]: + """Return a list of all Albums by specified Artist.""" + if prov_artist_id == UNKNOWN_ARTIST_ID or prov_artist_id.startswith(NAVI_VARIOUS_PREFIX): + return [] + + try: + sonic_artist: SonicArtist = await self._run_async(self._conn.getArtist, prov_artist_id) + except (ParameterError, DataNotFoundError) as e: + msg = f"Album {prov_artist_id} not found" + raise MediaNotFoundError(msg) from e + albums = [] + for entry in sonic_artist.albums: + albums.append(self._parse_album(entry)) + return albums + + async def get_playlist(self, prov_playlist_id) -> Playlist: + """Return the specified Playlist.""" + try: + sonic_playlist: SonicPlaylist = await self._run_async( + self._conn.getPlaylist, prov_playlist_id + ) + except (ParameterError, DataNotFoundError) as e: + msg = f"Playlist {prov_playlist_id} not found" + raise MediaNotFoundError(msg) from e + return self._parse_playlist(sonic_playlist) + + async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]: + """Get playlist tracks.""" + result: list[Track] = [] + if page > 0: + # paging not supported, we always return the whole list at once + return result + try: + sonic_playlist: SonicPlaylist = await self._run_async( + self._conn.getPlaylist, prov_playlist_id + ) + except (ParameterError, DataNotFoundError) as e: + msg = f"Playlist {prov_playlist_id} not found" + raise MediaNotFoundError(msg) from e + + # TODO: figure out if subsonic supports paging here + for index, sonic_song in enumerate(sonic_playlist.songs, 1): + track = self._parse_track(sonic_song) + track.position = index + result.append(track) + return result + + async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]: + """Get the top listed tracks for a specified artist.""" + # We have seen top tracks requested for the UNKNOWN_ARTIST ID, protect against that + if prov_artist_id == UNKNOWN_ARTIST_ID or prov_artist_id.startswith(NAVI_VARIOUS_PREFIX): + return [] + + try: + sonic_artist: SonicArtist = await self._run_async(self._conn.getArtist, prov_artist_id) + except DataNotFoundError as e: + msg = f"Artist {prov_artist_id} not found" + raise MediaNotFoundError(msg) from e + songs: list[SonicSong] = await self._run_async(self._conn.getTopSongs, sonic_artist.name) + return [self._parse_track(entry) for entry in songs] + + async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]: + """Get tracks similar to selected track.""" + songs: list[SonicSong] = await self._run_async( + self._conn.getSimilarSongs2, iid=prov_track_id, count=limit + ) + return [self._parse_track(entry) for entry in songs] + + async def create_playlist(self, name: str) -> Playlist: + """Create a new empty playlist on the server.""" + playlist: SonicPlaylist = await self._run_async(self._conn.createPlaylist, name=name) + return self._parse_playlist(playlist) + + async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None: + """Append the listed tracks to the selected playlist. + + Note that the configured user must own the playlist to edit this way. + """ + try: + await self._run_async( + self._conn.updatePlaylist, lid=prov_playlist_id, songIdsToAdd=prov_track_ids + ) + except SonicError: + msg = f"Failed to add songs to {prov_playlist_id}, check your permissions." + raise ProviderPermissionDenied(msg) + + async def remove_playlist_tracks( + self, prov_playlist_id: str, positions_to_remove: tuple[int, ...] + ) -> None: + """Remove selected positions from the playlist.""" + idx_to_remove = [pos - 1 for pos in positions_to_remove] + try: + await self._run_async( + self._conn.updatePlaylist, + lid=prov_playlist_id, + songIndexesToRemove=idx_to_remove, + ) + except SonicError: + msg = f"Failed to remove songs from {prov_playlist_id}, check your permissions." + raise ProviderPermissionDenied(msg) + + async def get_stream_details(self, item_id: str) -> StreamDetails: + """Get the details needed to process a specified track.""" + try: + sonic_song: SonicSong = await self._run_async(self._conn.getSong, item_id) + except (ParameterError, DataNotFoundError) as e: + msg = f"Item {item_id} not found" + raise MediaNotFoundError(msg) from e + + self.mass.create_task(self._report_playback_started(item_id)) + + mime_type = sonic_song.content_type + if mime_type.endswith("mpeg"): + mime_type = sonic_song.suffix + + self.logger.debug( + "Fetching stream details for id %s '%s' with format '%s'", + sonic_song.id, + sonic_song.title, + mime_type, + ) + + return StreamDetails( + item_id=sonic_song.id, + provider=self.instance_id, + can_seek=self._seek_support, + audio_format=AudioFormat(content_type=ContentType.try_parse(mime_type)), + stream_type=StreamType.CUSTOM, + duration=sonic_song.duration if sonic_song.duration is not None else 0, + ) + + async def _report_playback_started(self, item_id: str) -> None: + await self._run_async(self._conn.scrobble, sid=item_id, submission=False) + + async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None: + """Handle callback when an item completed streaming.""" + if seconds_streamed >= streamdetails.duration / 2: + await self._run_async(self._conn.scrobble, sid=streamdetails.item_id, submission=True) + + async def get_audio_stream( + self, streamdetails: StreamDetails, seek_position: int = 0 + ) -> AsyncGenerator[bytes, None]: + """Provide a generator for the stream data.""" + audio_buffer = asyncio.Queue(1) + + self.logger.debug("Streaming %s", streamdetails.item_id) + + def _streamer() -> None: + with self._conn.stream( + streamdetails.item_id, timeOffset=seek_position, estimateContentLength=True + ) as stream: + for chunk in stream.iter_content(chunk_size=40960): + asyncio.run_coroutine_threadsafe( + audio_buffer.put(chunk), self.mass.loop + ).result() + # send empty chunk when we're done + asyncio.run_coroutine_threadsafe(audio_buffer.put(b"EOF"), self.mass.loop).result() + + # fire up an executor thread to put the audio chunks (threadsafe) on the audio buffer + streamer_task = self.mass.loop.run_in_executor(None, _streamer) + try: + while True: + # keep reading from the audio buffer until there is no more data + chunk = await audio_buffer.get() + if chunk == b"EOF": + break + yield chunk + finally: + if not streamer_task.done(): + streamer_task.cancel() + + self.logger.debug("Done streaming %s", streamdetails.item_id) diff --cc music_assistant/providers/snapcast/__init__.py index f2151905,00000000..3a4df08c mode 100644,000000..100644 --- a/music_assistant/providers/snapcast/__init__.py +++ b/music_assistant/providers/snapcast/__init__.py @@@ -1,743 -1,0 +1,744 @@@ +"""Snapcast Player provider for Music Assistant.""" + +from __future__ import annotations + +import asyncio +import logging +import pathlib +import random +import re +import socket +import time +from contextlib import suppress +from typing import TYPE_CHECKING, Final, cast + +from bidict import bidict +from music_assistant_models.config_entries import ( + CONF_ENTRY_CROSSFADE, + CONF_ENTRY_CROSSFADE_DURATION, + CONF_ENTRY_FLOW_MODE_ENFORCED, + ConfigEntry, + ConfigValueOption, + ConfigValueType, + create_sample_rates_config_entry, +) +from music_assistant_models.enums import ( + ConfigEntryType, + ContentType, + MediaType, + PlayerFeature, + PlayerState, + PlayerType, + ProviderFeature, +) +from music_assistant_models.errors import SetupFailedError +from music_assistant_models.media_items import AudioFormat +from music_assistant_models.player import DeviceInfo, Player, PlayerMedia +from snapcast.control import create_server +from snapcast.control.client import Snapclient +from zeroconf import NonUniqueNameException +from zeroconf.asyncio import AsyncServiceInfo + +from music_assistant.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params +from music_assistant.helpers.process import AsyncProcess, check_output +from music_assistant.helpers.util import get_ip_pton +from music_assistant.models.player_provider import PlayerProvider + +if TYPE_CHECKING: + from music_assistant_models.config_entries import ProviderConfig + from music_assistant_models.provider import ProviderManifest + from snapcast.control.group import Snapgroup + from snapcast.control.server import Snapserver + from snapcast.control.stream import Snapstream + + from music_assistant import MusicAssistant + from music_assistant.models import ProviderInstanceType + from music_assistant.providers.player_group import PlayerGroupProvider + +CONF_SERVER_HOST = "snapcast_server_host" +CONF_SERVER_CONTROL_PORT = "snapcast_server_control_port" +CONF_USE_EXTERNAL_SERVER = "snapcast_use_external_server" +CONF_SERVER_BUFFER_SIZE = "snapcast_server_built_in_buffer_size" +CONF_SERVER_CHUNK_MS = "snapcast_server_built_in_chunk_ms" +CONF_SERVER_INITIAL_VOLUME = "snapcast_server_built_in_initial_volume" +CONF_SERVER_TRANSPORT_CODEC = "snapcast_server_built_in_codec" +CONF_SERVER_SEND_AUDIO_TO_MUTED = "snapcast_server_built_in_send_muted" +CONF_STREAM_IDLE_THRESHOLD = "snapcast_stream_idle_threshold" + + +CONF_CATEGORY_GENERIC = "generic" +CONF_CATEGORY_ADVANCED = "advanced" +CONF_CATEGORY_BUILT_IN = "Built-in Snapserver Settings" + +CONF_HELP_LINK = ( + "https://raw.githubusercontent.com/badaix/snapcast/refs/heads/master/server/etc/snapserver.conf" +) + +# airplay has fixed sample rate/bit depth so make this config entry static and hidden +CONF_ENTRY_SAMPLE_RATES_SNAPCAST = create_sample_rates_config_entry(48000, 16, 48000, 16, True) + +DEFAULT_SNAPSERVER_IP = "127.0.0.1" +DEFAULT_SNAPSERVER_PORT = 1705 +DEFAULT_SNAPSTREAM_IDLE_THRESHOLD = 60000 + +SNAPWEB_DIR: Final[pathlib.Path] = pathlib.Path(__file__).parent.resolve().joinpath("snapweb") + + +DEFAULT_SNAPCAST_FORMAT = AudioFormat( + content_type=ContentType.PCM_S16LE, + sample_rate=48000, + # TODO: can we handle 24 bits bit depth ? + bit_depth=16, + channels=2, +) + +DEFAULT_SNAPCAST_PCM_FORMAT = AudioFormat( + # the format that is used as intermediate pcm stream, + # we prefer F32 here to account for volume normalization + content_type=ContentType.PCM_F32LE, + sample_rate=48000, + bit_depth=32, + channels=2, +) + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider(instance) with given configuration.""" + return SnapCastProvider(mass, manifest, config) + + +async def get_config_entries( + mass: MusicAssistant, # noqa: ARG001 + instance_id: str | None = None, # noqa: ARG001 + action: str | None = None, # noqa: ARG001 + values: dict[str, ConfigValueType] | None = None, # noqa: ARG001 +) -> tuple[ConfigEntry, ...]: + """ + Return Config entries to setup this provider. + + instance_id: id of an existing provider instance (None if new instance setup). + action: [optional] action key called from config entries UI. + values: the (intermediate) raw values for config entries sent with the action. + """ + returncode, output = await check_output("snapserver", "-v") + snapserver_version = int(output.decode().split(".")[1]) if returncode == 0 else -1 + local_snapserver_present = snapserver_version >= 27 + if returncode == 0 and not local_snapserver_present: + raise SetupFailedError("Invalid snapserver version") + + return ( + ConfigEntry( + key=CONF_SERVER_BUFFER_SIZE, + type=ConfigEntryType.INTEGER, + range=(200, 6000), + default_value=1000, + label="Snapserver buffer size", + required=False, + category=CONF_CATEGORY_BUILT_IN, + hidden=not local_snapserver_present, + help_link=CONF_HELP_LINK, + ), + ConfigEntry( + key=CONF_SERVER_CHUNK_MS, + type=ConfigEntryType.INTEGER, + range=(10, 100), + default_value=26, + label="Snapserver chunk size", + required=False, + category=CONF_CATEGORY_BUILT_IN, + hidden=not local_snapserver_present, + help_link=CONF_HELP_LINK, + ), + ConfigEntry( + key=CONF_SERVER_INITIAL_VOLUME, + type=ConfigEntryType.INTEGER, + range=(0, 100), + default_value=25, + label="Snapserver initial volume", + required=False, + category=CONF_CATEGORY_BUILT_IN, + hidden=not local_snapserver_present, + help_link=CONF_HELP_LINK, + ), + ConfigEntry( + key=CONF_SERVER_SEND_AUDIO_TO_MUTED, + type=ConfigEntryType.BOOLEAN, + default_value=False, + label="Send audio to muted clients", + required=False, + category=CONF_CATEGORY_BUILT_IN, + hidden=not local_snapserver_present, + help_link=CONF_HELP_LINK, + ), + ConfigEntry( + key=CONF_SERVER_TRANSPORT_CODEC, + type=ConfigEntryType.STRING, + options=( + ConfigValueOption( + title="FLAC", + value="flac", + ), + ConfigValueOption( + title="OGG", + value="ogg", + ), + ConfigValueOption( + title="OPUS", + value="opus", + ), + ConfigValueOption( + title="PCM", + value="pcm", + ), + ), + default_value="flac", + label="Snapserver default transport codec", + required=False, + category=CONF_CATEGORY_BUILT_IN, + hidden=not local_snapserver_present, + help_link=CONF_HELP_LINK, + ), + ConfigEntry( + key=CONF_USE_EXTERNAL_SERVER, + type=ConfigEntryType.BOOLEAN, + default_value=not local_snapserver_present, + label="Use existing Snapserver", + required=False, + category=( + CONF_CATEGORY_ADVANCED if local_snapserver_present else CONF_CATEGORY_GENERIC + ), + ), + ConfigEntry( + key=CONF_SERVER_HOST, + type=ConfigEntryType.STRING, + default_value=DEFAULT_SNAPSERVER_IP, + label="Snapcast server ip", + required=False, + depends_on=CONF_USE_EXTERNAL_SERVER, + category=( + CONF_CATEGORY_ADVANCED if local_snapserver_present else CONF_CATEGORY_GENERIC + ), + ), + ConfigEntry( + key=CONF_SERVER_CONTROL_PORT, + type=ConfigEntryType.INTEGER, + default_value=DEFAULT_SNAPSERVER_PORT, + label="Snapcast control port", + required=False, + depends_on=CONF_USE_EXTERNAL_SERVER, + category=( + CONF_CATEGORY_ADVANCED if local_snapserver_present else CONF_CATEGORY_GENERIC + ), + ), + ConfigEntry( + key=CONF_STREAM_IDLE_THRESHOLD, + type=ConfigEntryType.INTEGER, + default_value=DEFAULT_SNAPSTREAM_IDLE_THRESHOLD, + label="Snapcast idle threshold stream parameter", + required=True, + category=CONF_CATEGORY_ADVANCED, + ), + ) + + +class SnapCastProvider(PlayerProvider): + """Player provider for Snapcast based players.""" + + _snapserver: Snapserver + _snapcast_server_host: str + _snapcast_server_control_port: int + _stream_tasks: dict[str, asyncio.Task] + _use_builtin_server: bool + _snapserver_runner: asyncio.Task | None + _snapserver_started: asyncio.Event | None + _ids_map: bidict # ma_id / snapclient_id + _stop_called: bool + + def _get_snapclient_id(self, player_id: str) -> str: + search_dict = self._ids_map + return search_dict.get(player_id) + + def _get_ma_id(self, snap_client_id: str) -> str: + search_dict = self._ids_map.inverse + return search_dict.get(snap_client_id) + + def _generate_and_register_id(self, snap_client_id) -> str: + search_dict = self._ids_map.inverse + if snap_client_id not in search_dict: + new_id = "ma_" + str(re.sub(r"\W+", "", snap_client_id)) + self._ids_map[new_id] = snap_client_id + return new_id + else: + return self._get_ma_id(snap_client_id) + + @property + def supported_features(self) -> tuple[ProviderFeature, ...]: + """Return the features supported by this Provider.""" + return (ProviderFeature.SYNC_PLAYERS,) + + async def handle_async_init(self) -> None: + """Handle async initialization of the provider.""" + # set snapcast logging + logging.getLogger("snapcast").setLevel(self.logger.level) + self._use_builtin_server = not self.config.get_value(CONF_USE_EXTERNAL_SERVER) + self._stop_called = False + if self._use_builtin_server: + self._snapcast_server_host = "127.0.0.1" + self._snapcast_server_control_port = DEFAULT_SNAPSERVER_PORT + self._snapcast_server_buffer_size = self.config.get_value(CONF_SERVER_BUFFER_SIZE) + self._snapcast_server_chunk_ms = self.config.get_value(CONF_SERVER_CHUNK_MS) + self._snapcast_server_initial_volume = self.config.get_value(CONF_SERVER_INITIAL_VOLUME) + self._snapcast_server_send_to_muted = self.config.get_value( + CONF_SERVER_SEND_AUDIO_TO_MUTED + ) + self._snapcast_server_transport_codec = self.config.get_value( + CONF_SERVER_TRANSPORT_CODEC + ) + + else: + self._snapcast_server_host = self.config.get_value(CONF_SERVER_HOST) + self._snapcast_server_control_port = self.config.get_value(CONF_SERVER_CONTROL_PORT) + self._snapcast_stream_idle_threshold = self.config.get_value(CONF_STREAM_IDLE_THRESHOLD) + self._stream_tasks = {} + self._ids_map = bidict({}) + + if self._use_builtin_server: + await self._start_builtin_server() + else: + self._snapserver_runner = None + self._snapserver_started = None + try: + self._snapserver = await create_server( + self.mass.loop, + self._snapcast_server_host, + port=self._snapcast_server_control_port, + reconnect=True, + ) + self._snapserver.set_on_update_callback(self._handle_update) + self.logger.info( + "Started connection to Snapserver %s", + f"{self._snapcast_server_host}:{self._snapcast_server_control_port}", + ) + # register callback for when the connection gets lost to the snapserver + self._snapserver.set_on_disconnect_callback(self._handle_disconnect) + await self._create_default_stream() + except OSError as err: + msg = "Unable to start the Snapserver connection ?" + raise SetupFailedError(msg) from err + + async def loaded_in_mass(self) -> None: + """Call after the provider has been loaded.""" + await super().loaded_in_mass() + # initial load of players + self._handle_update() + + async def unload(self) -> None: + """Handle close/cleanup of the provider.""" + self._stop_called = True + for snap_client_id in self._snapserver.clients: + player_id = self._get_ma_id(snap_client_id) + await self.cmd_stop(player_id) + self._snapserver.stop() + await self._stop_builtin_server() + + def _handle_update(self) -> None: + """Process Snapcast init Player/Group and set callback .""" + for snap_client in self._snapserver.clients: + self._handle_player_init(snap_client) + snap_client.set_callback(self._handle_player_update) + for snap_client in self._snapserver.clients: + self._handle_player_update(snap_client) + for snap_group in self._snapserver.groups: + snap_group.set_callback(self._handle_group_update) + + def _handle_group_update(self, snap_group: Snapgroup) -> None: + """Process Snapcast group callback.""" + for snap_client in self._snapserver.clients: + self._handle_player_update(snap_client) + + def _handle_player_init(self, snap_client: Snapclient) -> None: + """Process Snapcast add to Player controller.""" + player_id = self._generate_and_register_id(snap_client.identifier) + player = self.mass.players.get(player_id, raise_unavailable=False) + if not player: + snap_client = cast( + Snapclient, self._snapserver.client(self._get_snapclient_id(player_id)) + ) + player = Player( + player_id=player_id, + provider=self.instance_id, + type=PlayerType.PLAYER, + name=snap_client.friendly_name, + available=True, + powered=snap_client.connected, + device_info=DeviceInfo( + model=snap_client._client.get("host").get("os"), + address=snap_client._client.get("host").get("ip"), + manufacturer=snap_client._client.get("host").get("arch"), + ), + supported_features=( + PlayerFeature.SYNC, + PlayerFeature.VOLUME_SET, + PlayerFeature.VOLUME_MUTE, + ), + group_childs=set(), + synced_to=self._synced_to(player_id), + ) + asyncio.run_coroutine_threadsafe( + self.mass.players.register_or_update(player), loop=self.mass.loop + ) + + def _handle_player_update(self, snap_client: Snapclient) -> None: + """Process Snapcast update to Player controller.""" + player_id = self._get_ma_id(snap_client.identifier) + player = self.mass.players.get(player_id) + if not player: + return + player.name = snap_client.friendly_name + player.volume_level = snap_client.volume + player.volume_muted = snap_client.muted + player.available = snap_client.connected + player.synced_to = self._synced_to(player_id) + if player.active_group is None: + if stream := self._get_snapstream(player_id): + if stream.name.startswith(("MusicAssistant", "default")): + player.active_source = player_id + else: + player.active_source = stream.name + else: + player.active_source = player_id + self._group_childs(player_id) + self.mass.players.update(player_id) + + async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: + """Return all (provider/player specific) Config Entries for the given player (if any).""" + base_entries = await super().get_player_config_entries(player_id) + return ( + *base_entries, + CONF_ENTRY_FLOW_MODE_ENFORCED, + CONF_ENTRY_CROSSFADE, + CONF_ENTRY_CROSSFADE_DURATION, + CONF_ENTRY_SAMPLE_RATES_SNAPCAST, + ) + + async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: + """Send VOLUME_SET command to given player.""" + snap_client_id = self._get_snapclient_id(player_id) + await self._snapserver.client(snap_client_id).set_volume(volume_level) + self.mass.players.update(snap_client_id) + + async def cmd_stop(self, player_id: str) -> None: + """Send STOP command to given player.""" + player = self.mass.players.get(player_id, raise_unavailable=False) + if stream_task := self._stream_tasks.pop(player_id, None): + if not stream_task.done(): + stream_task.cancel() + player.state = PlayerState.IDLE + self._set_childs_state(player_id) + self.mass.players.update(player_id) + # assign default/empty stream to the player + await self._get_snapgroup(player_id).set_stream("default") + + async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: + """Send MUTE command to given player.""" + ma_player = self.mass.players.get(player_id, raise_unavailable=False) + snap_client_id = self._get_snapclient_id(player_id) + snapclient = self._snapserver.client(snap_client_id) + # Using optimistic value because the library does not return the response from the api + await snapclient.set_muted(muted) + ma_player.volume_muted = snapclient.muted + self.mass.players.update(player_id) + + async def cmd_sync(self, player_id: str, target_player: str) -> None: + """Sync Snapcast player.""" + group = self._get_snapgroup(target_player) + mass_target_player = self.mass.players.get(target_player) + if self._get_snapclient_id(player_id) not in group.clients: + await group.add_client(self._get_snapclient_id(player_id)) + mass_player = self.mass.players.get(player_id) + mass_player.synced_to = target_player + mass_target_player.group_childs.add(player_id) + self.mass.players.update(player_id) + self.mass.players.update(target_player) + + async def cmd_unsync(self, player_id: str) -> None: + """Unsync Snapcast player.""" + mass_player = self.mass.players.get(player_id) + if mass_player.synced_to is None: + for mass_child_id in list(mass_player.group_childs): + if mass_child_id != player_id: + await self.cmd_unsync(mass_child_id) + return + mass_sync_master_player = self.mass.players.get(mass_player.synced_to) + mass_sync_master_player.group_childs.remove(player_id) + mass_player.synced_to = None + snap_client_id = self._get_snapclient_id(player_id) + group = self._get_snapgroup(player_id) + await group.remove_client(snap_client_id) + # assign default/empty stream to the player + await self._get_snapgroup(player_id).set_stream("default") + await self.cmd_stop(player_id=player_id) + # make sure that the player manager gets an update + self.mass.players.update(player_id, skip_forward=True) + self.mass.players.update(mass_player.synced_to, skip_forward=True) + + async def play_media(self, player_id: str, media: PlayerMedia) -> None: + """Handle PLAY MEDIA on given player.""" + player = self.mass.players.get(player_id) + if player.synced_to: + msg = "A synced player cannot receive play commands directly" + raise RuntimeError(msg) + # stop any existing streams first + if stream_task := self._stream_tasks.pop(player_id, None): + if not stream_task.done(): + stream_task.cancel() + # initialize a new stream and attach it to the group + stream, port = await self._create_stream() + snap_group = self._get_snapgroup(player_id) + await snap_group.set_stream(stream.identifier) + + # select audio source + if media.media_type == MediaType.ANNOUNCEMENT: + # special case: stream announcement + input_format = DEFAULT_SNAPCAST_FORMAT + audio_source = self.mass.streams.get_announcement_stream( + media.custom_data["url"], + output_format=DEFAULT_SNAPCAST_FORMAT, + use_pre_announce=media.custom_data["use_pre_announce"], + ) + elif media.queue_id.startswith("ugp_"): + # special case: UGP stream + ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group") + ugp_stream = ugp_provider.ugp_streams[media.queue_id] + input_format = ugp_stream.output_format + audio_source = ugp_stream.subscribe() + elif media.queue_id and media.queue_item_id: + # regular queue (flow) stream request + input_format = DEFAULT_SNAPCAST_PCM_FORMAT + audio_source = self.mass.streams.get_flow_stream( + queue=self.mass.player_queues.get(media.queue_id), + start_queue_item=self.mass.player_queues.get_item( + media.queue_id, media.queue_item_id + ), + pcm_format=input_format, + ) + else: + # assume url or some other direct path + # NOTE: this will fail if its an uri not playable by ffmpeg + input_format = DEFAULT_SNAPCAST_FORMAT + audio_source = get_ffmpeg_stream( + audio_input=media.uri, + input_format=AudioFormat(ContentType.try_parse(media.uri)), + output_format=DEFAULT_SNAPCAST_FORMAT, + ) + + async def _streamer() -> None: + host = self._snapcast_server_host + stream_path = f"tcp://{host}:{port}" + self.logger.debug("Start streaming to %s", stream_path) + try: + async with FFMpeg( + audio_input=audio_source, + input_format=input_format, + output_format=DEFAULT_SNAPCAST_FORMAT, + filter_params=get_player_filter_params(self.mass, player_id), + audio_output=stream_path, + ) as ffmpeg_proc: + player.state = PlayerState.PLAYING + player.current_media = media + player.elapsed_time = 0 + player.elapsed_time_last_updated = time.time() + self.mass.players.update(player_id) + self._set_childs_state(player_id) + await ffmpeg_proc.wait() + self.logger.debug("Finished streaming to %s", stream_path) + # we need to wait a bit for the stream status to become idle + # to ensure that all snapclients have consumed the audio + while stream.status != "idle": + await asyncio.sleep(0.25) + player.state = PlayerState.IDLE ++ player.elapsed_time = time.time() - player.elapsed_time_last_updated + self.mass.players.update(player_id) + self._set_childs_state(player_id) + finally: + await self._delete_current_snapstream(stream, media) + + # start streaming the queue (pcm) audio in a background task + self._stream_tasks[player_id] = asyncio.create_task(_streamer()) + + async def _delete_current_snapstream(self, stream: Snapstream, media: PlayerMedia) -> None: + with suppress(TypeError, KeyError, AttributeError): + if media.duration < 5: + await asyncio.sleep(5) + await self._snapserver.stream_remove_stream(stream.identifier) + + def _get_snapgroup(self, player_id: str) -> Snapgroup: + """Get snapcast group for given player_id.""" + snap_client_id = self._get_snapclient_id(player_id) + client: Snapclient = self._snapserver.client(snap_client_id) + return client.group + + def _get_snapstream(self, player_id: str) -> Snapstream | None: + """Get snapcast stream for given player_id.""" + if group := self._get_snapgroup(player_id): + with suppress(KeyError): + return self._snapserver.stream(group.stream) + return None + + def _synced_to(self, player_id: str) -> str | None: + """Return player_id of the player this player is synced to.""" + snap_group: Snapgroup = self._get_snapgroup(player_id) + master_id: str = self._get_ma_id(snap_group.clients[0]) + + if len(snap_group.clients) < 2 or player_id == master_id: + return None + return master_id + + def _group_childs(self, player_id: str) -> set[str]: + """Return player_ids of the players synced to this player.""" + mass_player = self.mass.players.get(player_id, raise_unavailable=False) + snap_group = self._get_snapgroup(player_id) + mass_player.group_childs.clear() + if mass_player.synced_to is not None: + return + mass_player.group_childs.add(player_id) + { + mass_player.group_childs.add(self._get_ma_id(snap_client_id)) + for snap_client_id in snap_group.clients + if self._get_ma_id(snap_client_id) != player_id + and self._snapserver.client(snap_client_id).connected + } + + async def _create_stream(self) -> tuple[Snapstream, int]: + """Create new stream on snapcast server.""" + attempts = 50 + while attempts: + attempts -= 1 + # pick a random port + port = random.randint(4953, 4953 + 200) + name = f"MusicAssistant--{port}" + result = await self._snapserver.stream_add_stream( + # NOTE: setting the sampleformat to something else + # (like 24 bits bit depth) does not seem to work at all! + f"tcp://0.0.0.0:{port}?name={name}&sampleformat=48000:16:2&idle_threshold={self._snapcast_stream_idle_threshold}", + ) + if "id" not in result: + # if the port is already taken, the result will be an error + self.logger.warning(result) + continue + stream = self._snapserver.stream(result["id"]) + return (stream, port) + msg = "Unable to create stream - No free port found?" + raise RuntimeError(msg) + + async def _create_default_stream(self) -> None: + """Create new stream on snapcast server named default case not exist.""" + all_streams = {stream.name for stream in self._snapserver.streams} + if "default" not in all_streams: + await self._snapserver.stream_add_stream( + "pipe:///tmp/snapfifo?name=default&sampleformat=48000:16:2" + ) + + def _set_childs_state(self, player_id: str) -> None: + """Set the state of the child`s of the player.""" + mass_player = self.mass.players.get(player_id) + for child_player_id in mass_player.group_childs: + if child_player_id == player_id: + continue + mass_child_player = self.mass.players.get(child_player_id) + mass_child_player.state = mass_player.state + self.mass.players.update(child_player_id) + + async def _builtin_server_runner(self) -> None: + """Start running the builtin snapserver.""" + if self._snapserver_started.is_set(): + raise RuntimeError("Snapserver is already started!") + logger = self.logger.getChild("snapserver") + logger.info("Starting builtin Snapserver...") + # register the snapcast mdns services + for name, port in ( + ("-http", 1780), + ("-jsonrpc", 1705), + ("-stream", 1704), + ("-tcp", 1705), + ("", 1704), + ): + zeroconf_type = f"_snapcast{name}._tcp.local." + try: + info = AsyncServiceInfo( + zeroconf_type, + name=f"Snapcast.{zeroconf_type}", + properties={"is_mass": "true"}, + addresses=[await get_ip_pton(self.mass.streams.publish_ip)], + port=port, + server=f"{socket.gethostname()}.local", + ) + attr_name = f"zc_service_set{name}" + if getattr(self, attr_name, None): + await self.mass.aiozc.async_update_service(info) + else: + await self.mass.aiozc.async_register_service(info, strict=False) + setattr(self, attr_name, True) + except NonUniqueNameException: + self.logger.debug( + "Could not register mdns record for %s as its already in use", + zeroconf_type, + ) + except Exception as err: + self.logger.exception( + "Could not register mdns record for %s: %s", zeroconf_type, str(err) + ) + args = [ + "snapserver", + # config settings taken from + # https://raw.githubusercontent.com/badaix/snapcast/86cd4b2b63e750a72e0dfe6a46d47caf01426c8d/server/etc/snapserver.conf + f"--server.datadir={self.mass.storage_path}", + "--http.enabled=true", + "--http.port=1780", + f"--http.doc_root={SNAPWEB_DIR}", + "--tcp.enabled=true", + f"--tcp.port={self._snapcast_server_control_port}", + f"--stream.buffer={self._snapcast_server_buffer_size}", + f"--stream.chunk_ms={self._snapcast_server_chunk_ms}", + f"--stream.codec={self._snapcast_server_transport_codec}", + f"--stream.send_to_muted={str(self._snapcast_server_send_to_muted).lower()}", + f"--streaming_client.initial_volume={self._snapcast_server_initial_volume}", + ] + async with AsyncProcess(args, stdout=True, name="snapserver") as snapserver_proc: + # keep reading from stdout until exit + async for data in snapserver_proc.iter_any(): + data = data.decode().strip() # noqa: PLW2901 + for line in data.split("\n"): + logger.debug(line) + if "(Snapserver) Version 0." in line: + # delay init a small bit to prevent race conditions + # where we try to connect too soon + self.mass.loop.call_later(2, self._snapserver_started.set) + + async def _stop_builtin_server(self) -> None: + """Stop the built-in Snapserver.""" + self.logger.info("Stopping, built-in Snapserver") + if self._snapserver_runner and not self._snapserver_runner.done(): + self._snapserver_runner.cancel() + self._snapserver_started.clear() + + async def _start_builtin_server(self) -> None: + """Start the built-in Snapserver.""" + if self._use_builtin_server: + self._snapserver_started = asyncio.Event() + self._snapserver_runner = asyncio.create_task(self._builtin_server_runner()) + await asyncio.wait_for(self._snapserver_started.wait(), 10) + + def _handle_disconnect(self, exc: Exception) -> None: + """Handle disconnect callback from snapserver.""" + if self._stop_called: + # we're instructed to stop/exit, so no need to restart the connection + return + self.logger.info( + "Connection to SnapServer lost, reason: %s. Reloading provider in 5 seconds.", + str(exc), + ) + # schedule a reload of the provider + self.mass.call_later(5, self.mass.load_provider, self.instance_id, allow_retry=True)