From ea6d8a1d6ccfd1c692c01da935f336d74f40ab63 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Sun, 16 Feb 2025 18:24:24 +0100 Subject: [PATCH] Feat: Handle playback of plugin source as player source Handle playback of plugin source as player source and implement this in Spotify Connect --- music_assistant/controllers/player_queues.py | 4 +- music_assistant/controllers/players.py | 83 +++++++ music_assistant/controllers/streams.py | 138 ++++++++-- music_assistant/helpers/audio.py | 2 +- music_assistant/helpers/ffmpeg.py | 21 +- music_assistant/mass.py | 6 +- music_assistant/models/plugin.py | 103 +++++--- .../_template_player_provider/__init__.py | 2 +- .../_template_plugin_provider/__init__.py | 2 +- music_assistant/providers/airplay/provider.py | 26 +- music_assistant/providers/airplay/raop.py | 9 +- music_assistant/providers/hass/__init__.py | 2 + .../providers/player_group/__init__.py | 15 +- .../providers/siriusxm/__init__.py | 8 +- .../providers/slimproto/__init__.py | 15 +- .../providers/snapcast/__init__.py | 27 +- music_assistant/providers/sonos/const.py | 4 +- music_assistant/providers/sonos/provider.py | 6 +- .../providers/spotify_connect/__init__.py | 235 +++++++----------- 19 files changed, 470 insertions(+), 238 deletions(-) diff --git a/music_assistant/controllers/player_queues.py b/music_assistant/controllers/player_queues.py index 0c7b4f10..7a570ec9 100644 --- a/music_assistant/controllers/player_queues.py +++ b/music_assistant/controllers/player_queues.py @@ -785,8 +785,8 @@ class PlayerQueuesController(CoreController): queue.index_in_buffer = index queue.flow_mode_stream_log = [] queue.flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE) - # no point in enabled flow mode for radio or plugin sources - if queue_item.media_type in (MediaType.RADIO, MediaType.PLUGIN_SOURCE): + # no point in enabling flow mode for radio sources + if queue_item.media_type == MediaType.RADIO: queue.flow_mode = False queue.current_item = queue_item diff --git a/music_assistant/controllers/players.py b/music_assistant/controllers/players.py index d233903d..d96ccd5a 100644 --- a/music_assistant/controllers/players.py +++ b/music_assistant/controllers/players.py @@ -25,6 +25,7 @@ from music_assistant_models.enums import ( PlayerFeature, PlayerState, PlayerType, + ProviderFeature, ProviderType, ) from music_assistant_models.errors import ( @@ -57,6 +58,7 @@ from music_assistant.helpers.throttle_retry import Throttler from music_assistant.helpers.util import TaskManager, get_changed_values from music_assistant.models.core_controller import CoreController from music_assistant.models.player_provider import PlayerProvider +from music_assistant.models.plugin import PluginProvider, PluginSource if TYPE_CHECKING: from collections.abc import Awaitable, Callable, Coroutine, Iterator @@ -715,6 +717,25 @@ class PlayerController(CoreController): - source: The ID of the source that needs to be activated/selected. """ player = self.get(player_id, True) + if player.synced_to or player.active_group: + raise PlayerCommandFailed(f"Player {player.display_name} is currently grouped") + # check if player is already playing and source is different + # in that case we to stop the player first + if source != player.active_source and player.state != PlayerState.IDLE: + await self.cmd_stop(player_id) + await asyncio.sleep(0.5) # small delay to allow stop to process + # check if source is a pluginsource + # in that case the source id is the lookup_key of the plugin provider + if plugin_prov := self.mass.get_provider(source): + await self._handle_select_plugin_source(player, plugin_prov) + return + # check if source is a mass queue + # this can be used to restore the queue after a source switch + if mass_queue := self.mass.player_queues.get(source): + player.active_source = mass_queue.queue_id + if mass_queue.items: + await self.mass.player_queues.play(mass_queue.queue_id) + return # basic check if player supports source selection if PlayerFeature.SELECT_SOURCE not in player.supported_features: raise UnsupportedFeaturedException( @@ -982,6 +1003,8 @@ class PlayerController(CoreController): player = self._players[player_id] prev_state = self._prev_states.get(player_id, {}) player.active_source = self._get_active_source(player) + # set player sources + self._set_player_sources(player) # prefer any overridden name from config player.display_name = ( self.mass.config.get_raw_player_config_value(player.player_id, "name") @@ -1376,6 +1399,23 @@ class PlayerController(CoreController): # if player has group active, return those details if player.active_group and (group_player := self.get(player.active_group)): return self._get_active_source(group_player) + # if player has plugin source active return that + for plugin_source in self._get_plugin_sources(): + if ( + plugin_source.in_use_by == player.player_id + ) or player.active_source == plugin_source.id: + # copy/set current media if available + if plugin_source.metadata: + player.set_current_media( + uri=plugin_source.metadata.uri, + media_type=plugin_source.metadata.media_type, + title=plugin_source.metadata.title, + artist=plugin_source.metadata.artist, + album=plugin_source.metadata.album, + image_url=plugin_source.metadata.image_url, + duration=plugin_source.metadata.duration, + ) + return plugin_source.id # defaults to the player's own player id if no active source set return player.active_source or player.player_id @@ -1584,3 +1624,46 @@ class PlayerController(CoreController): # always update player state self.mass.loop.call_soon(self.update, player_id) await asyncio.sleep(1) + + async def _handle_select_plugin_source( + self, player: Player, plugin_prov: PluginProvider + ) -> None: + """Handle playback/select of given plugin source on player.""" + plugin_source = plugin_prov.get_source() + if plugin_source.in_use_by and plugin_source.in_use_by != player.player_id: + raise PlayerCommandFailed( + f"Source {plugin_source.name} is already in use by another player" + ) + plugin_source.in_use_by = player.player_id + player.active_source = plugin_source.id + stream_url = self.mass.streams.get_plugin_source_url(plugin_source.id, player.player_id) + await self.play_media( + player_id=player.player_id, + media=PlayerMedia( + uri=stream_url, + media_type=MediaType.PLUGIN_SOURCE, + title=plugin_source.name, + custom_data={ + "provider": plugin_source.id, + "audio_format": plugin_source.audio_format, + }, + ), + ) + + def _get_plugin_sources(self) -> list[PluginSource]: + """Return all available plugin sources.""" + return [ + plugin_prov.get_source() + for plugin_prov in self.mass.get_providers(ProviderType.PLUGIN) + if ProviderFeature.AUDIO_SOURCE in plugin_prov.supported_features + ] + + def _set_player_sources(self, player: Player) -> None: + """Set all available player sources.""" + player_source_ids = [x.id for x in player.source_list] + for plugin_source in self._get_plugin_sources(): + if plugin_source.id in player_source_ids: + continue + if plugin_source.passive and plugin_source.in_use_by != player.player_id: + continue + player.source_list.append(plugin_source) diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index dfe6934e..6aae97e8 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -65,13 +65,13 @@ from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stre from music_assistant.helpers.util import get_ip, get_ips, select_free_port, try_parse_bool from music_assistant.helpers.webserver import Webserver from music_assistant.models.core_controller import CoreController +from music_assistant.models.plugin import PluginProvider if TYPE_CHECKING: from music_assistant_models.config_entries import CoreConfig from music_assistant_models.player import Player from music_assistant_models.player_queue import PlayerQueue from music_assistant_models.queue_item import QueueItem - from music_assistant_models.streamdetails import StreamDetails isfile = wrap(os.path.isfile) @@ -233,6 +233,11 @@ class StreamsController(CoreController): "/announcement/{player_id}.{fmt}", self.serve_announcement_stream, ), + ( + "*", + "/pluginsource/{plugin_source}/{player_id}.{fmt}", + self.serve_plugin_source_stream, + ), ], ) @@ -295,6 +300,8 @@ class StreamsController(CoreController): headers = { **DEFAULT_STREAM_HEADERS, "icy-name": queue_item.name, + "Accept-Ranges": "none", + "Content-Type": f"audio/{output_format.output_format_str}", } resp = web.StreamResponse( status=200, @@ -339,8 +346,8 @@ class StreamsController(CoreController): self.mass.player_queues.track_loaded_in_buffer(queue_id, queue_item_id) async for chunk in get_ffmpeg_stream( - audio_input=self.get_media_stream( - streamdetails=queue_item.streamdetails, + audio_input=self.get_queue_item_stream( + queue_item=queue_item, pcm_format=pcm_format, ), input_format=pcm_format, @@ -434,7 +441,7 @@ class StreamsController(CoreController): self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name) async for chunk in get_ffmpeg_stream( - audio_input=self.get_flow_stream( + audio_input=self.get_queue_flow_stream( queue=queue, start_queue_item=start_queue_item, pcm_format=flow_pcm_format, @@ -562,6 +569,73 @@ class StreamsController(CoreController): return resp + async def serve_plugin_source_stream(self, request: web.Request) -> web.Response: + """Stream PluginSource audio to a player.""" + self._log_request(request) + plugin_source_id = request.match_info["plugin_source"] + provider: PluginProvider | None + if not (provider := self.mass.get_provider(plugin_source_id)): + raise web.HTTPNotFound(reason=f"Unknown PluginSource: {plugin_source_id}") + # work out output format/details + player_id = request.match_info["player_id"] + player = self.mass.players.get(player_id) + if not player: + raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}") + plugin_source = provider.get_source() + output_format = await self.get_output_format( + output_format_str=request.match_info["fmt"], + player=player, + default_sample_rate=plugin_source.audio_format.sample_rate, + default_bit_depth=plugin_source.audio_format.bit_depth, + ) + headers = { + **DEFAULT_STREAM_HEADERS, + "icy-name": plugin_source.name, + "Accept-Ranges": "none", + "Content-Type": f"audio/{output_format.output_format_str}", + } + resp = web.StreamResponse( + status=200, + reason="OK", + headers=headers, + ) + resp.content_type = f"audio/{output_format.output_format_str}" + http_profile: str = await self.mass.config.get_player_config_value( + player_id, CONF_HTTP_PROFILE + ) + if http_profile == "forced_content_length": + # guess content length based on duration + resp.content_length = get_chunksize(output_format, 12 * 3600) + elif http_profile == "chunked": + resp.enable_chunked_encoding() + + await resp.prepare(request) + + # return early if this is not a GET request + if request.method != "GET": + return resp + + # all checks passed, start streaming! + self.logger.debug( + "Start serving audio stream for PluginSource %s (%s) to %s", + plugin_source.name, + plugin_source.id, + player.display_name, + ) + async for chunk in self.get_plugin_source_stream( + plugin_source_id=plugin_source_id, + output_format=output_format, + player_id=player_id, + player_filter_params=get_player_filter_params( + self.mass, player_id, plugin_source.audio_format, output_format + ), + ): + try: + await resp.write(chunk) + except (BrokenPipeError, ConnectionResetError, ConnectionError): + break + return resp + def get_command_url(self, player_or_queue_id: str, command: str) -> str: """Get the url for the special command stream.""" return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3" @@ -582,8 +656,7 @@ class StreamsController(CoreController): def get_plugin_source_url( self, - provider: str, - source_id: str, + plugin_source: str, player_id: str, output_codec: ContentType = ContentType.FLAC, ) -> str: @@ -592,9 +665,9 @@ class StreamsController(CoreController): # handle raw pcm without exact format specifiers if output_codec.is_pcm() and ";" not in fmt: fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}" - return f"{self._server.base_url}/pluginsource/{provider}/{source_id}/{player_id}.{fmt}" + return f"{self._server.base_url}/pluginsource/{plugin_source}/{player_id}.{fmt}" - async def get_flow_stream( + async def get_queue_flow_stream( self, queue: PlayerQueue, start_queue_item: QueueItem, @@ -664,8 +737,8 @@ class StreamsController(CoreController): bytes_written = 0 buffer = b"" # handle incoming audio chunks - async for chunk in self.get_media_stream( - queue_track.streamdetails, + async for chunk in self.get_queue_item_stream( + queue_track, pcm_format=pcm_format, ): # buffer size needs to be big enough to include the crossfade part @@ -785,13 +858,50 @@ class StreamsController(CoreController): ): yield chunk - async def get_media_stream( + async def get_plugin_source_stream( + self, + plugin_source_id: str, + output_format: AudioFormat, + player_id: str, + player_filter_params: list[str] | None = None, + ) -> AsyncGenerator[bytes, None]: + """Get the special plugin source stream.""" + provider: PluginProvider = self.mass.get_provider(plugin_source_id) + plugin_source = provider.get_source() + if plugin_source.in_use_by and plugin_source.in_use_by != player_id: + raise RuntimeError( + f"PluginSource plugin_source.name is already in use by {plugin_source.in_use_by}" + ) + + audio_input = ( + provider.get_audio_stream(player_id) + if plugin_source.stream_type == StreamType.CUSTOM + else plugin_source.path + ) + chunk_size = int(get_chunksize(output_format, 1) / 10) + try: + plugin_source.in_use_by = player_id + async for chunk in get_ffmpeg_stream( + audio_input=audio_input, + input_format=plugin_source.audio_format, + output_format=output_format, + chunk_size=chunk_size, + filter_params=player_filter_params, + extra_input_args=["-re"], + ): + yield chunk + finally: + plugin_source.in_use_by = None + + async def get_queue_item_stream( self, - streamdetails: StreamDetails, + queue_item: QueueItem, pcm_format: AudioFormat, - ) -> AsyncGenerator[tuple[bool, bytes], None]: - """Get the audio stream for the given streamdetails as raw pcm chunks.""" + ) -> AsyncGenerator[bytes, None]: + """Get the audio stream for a single queue item as raw PCM audio.""" # collect all arguments for ffmpeg + streamdetails = queue_item.streamdetails + assert streamdetails filter_params = [] extra_input_args = streamdetails.extra_input_args or [] # handle volume normalization diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index 165b67a0..c0d40711 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -398,6 +398,7 @@ async def get_media_stream( collect_log_history=True, loglevel="debug" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "info", ) + try: await ffmpeg_proc.start() logger.debug( @@ -543,7 +544,6 @@ async def get_media_stream( VolumeNormalizationMode.FIXED_GAIN, ) and (finished or (seconds_streamed >= 30)) - and streamdetails.media_type != MediaType.PLUGIN_SOURCE ): # dynamic mode not allowed and no measurement known, we need to analyze the audio # add background task to start analyzing the audio diff --git a/music_assistant/helpers/ffmpeg.py b/music_assistant/helpers/ffmpeg.py index 7c68baca..c4722826 100644 --- a/music_assistant/helpers/ffmpeg.py +++ b/music_assistant/helpers/ffmpeg.py @@ -17,7 +17,7 @@ from music_assistant_models.helpers import get_global_cache_value, set_global_ca from music_assistant.constants import VERBOSE_LOG_LEVEL from .process import AsyncProcess, check_output -from .util import TimedAsyncGenerator, close_async_generator +from .util import close_async_generator if TYPE_CHECKING: from music_assistant_models.media_items import AudioFormat @@ -147,7 +147,11 @@ class FFMpeg(AsyncProcess): # for data to arrive (e.g. when there is X amount of seconds in the buffer) # so this timeout is just to catch if the source is stuck and rpeort it and not # to recover from it. - async for chunk in TimedAsyncGenerator(self.audio_input, timeout=300): + # async for chunk in TimedAsyncGenerator(self.audio_input, timeout=300): + # if self.closed: + # return + # await self.write(chunk) + async for chunk in self.audio_input: if self.closed: return await self.write(chunk) @@ -291,6 +295,8 @@ def get_ffmpeg_args( "-ac", str(output_format.channels), ] + if not output_format.content_type.is_pcm() and output_format.content_type.is_lossless(): + output_args += ["-sample_fmt", f"s{output_format.bit_depth}"] if output_format.output_format_str == "flac": # use level 0 compression for fastest encoding output_args += ["-compression_level", "0"] @@ -346,11 +352,12 @@ async def check_ffmpeg_version() -> None: "Please install ffmpeg on your OS to enable playback." ) if returncode != 0: - raise AudioError( - "Error determining FFmpeg version on your system." - "Your CPU may be too old to run this version of FFmpeg." - f"Additional info: {returncode} {output.decode().strip()}" - ) + err_msg = "Error determining FFmpeg version on your system." + if returncode < 0: + # error below 0 is often illegal instruction + err_msg += " - Your CPU may be too old to run this version of FFmpeg." + err_msg += f" - Additional info: {returncode} {output.decode().strip()}" + raise AudioError(err_msg) # parse version number from output try: version = output.decode().split("ffmpeg version ")[1].split(" ")[0].split("-")[0] diff --git a/music_assistant/mass.py b/music_assistant/mass.py index 23c128cf..2bc9b135 100644 --- a/music_assistant/mass.py +++ b/music_assistant/mass.py @@ -66,7 +66,7 @@ rmfile = wrap(os.remove) listdir = wrap(os.listdir) rename = wrap(os.rename) -EventCallBackType = Callable[[MassEvent], None] | Coroutine[MassEvent, Any, None] +EventCallBackType = Callable[[MassEvent], None] | Callable[[MassEvent], Coroutine[Any, Any, None]] EventSubscriptionType = tuple[ EventCallBackType, tuple[EventType, ...] | None, tuple[str, ...] | None ] @@ -307,7 +307,7 @@ class MusicAssistant: continue if asyncio.iscoroutinefunction(cb_func): if TYPE_CHECKING: - cb_func = cast(Coroutine[Any, Any, None], cb_func) + cb_func = cast(Callable[[MassEvent], Coroutine[Any, Any, None]], cb_func) self.create_task(cb_func, event_obj) else: if TYPE_CHECKING: @@ -341,7 +341,7 @@ class MusicAssistant: def create_task( self, - target: Coroutine[Any, Any, _R] | Awaitable[_R], + target: Callable[[MassEvent], Coroutine[Any, Any, None]] | Awaitable[_R], *args: Any, task_id: str | None = None, abort_existing: bool = False, diff --git a/music_assistant/models/plugin.py b/music_assistant/models/plugin.py index d7009bd6..9bb01aef 100644 --- a/music_assistant/models/plugin.py +++ b/music_assistant/models/plugin.py @@ -3,18 +3,69 @@ from __future__ import annotations from collections.abc import AsyncGenerator -from typing import TYPE_CHECKING +from dataclasses import dataclass, field -from .provider import Provider +from mashumaro import field_options, pass_through +from music_assistant_models.enums import StreamType +from music_assistant_models.player import PlayerMedia, PlayerSource +from music_assistant_models.streamdetails import AudioFormat # noqa: TC002 -if TYPE_CHECKING: - from music_assistant_models.enums import MediaType - from music_assistant_models.media_items import PluginSource - from music_assistant_models.streamdetails import StreamDetails +from .provider import Provider # ruff: noqa: ARG001, ARG002 +@dataclass() +class PluginSource(PlayerSource): + """ + Model for a PluginSource, which is a player (audio)source provided by a plugin. + + This (intermediate) model is not exposed on the api, + but is used internally by the plugin provider. + """ + + # The output format that is sent to the player + # (or to the library/application that is used to send audio to the player) + audio_format: AudioFormat | None = field( + default=None, + compare=False, + metadata=field_options(serialize="omit", deserialize=pass_through), + repr=False, + ) + + # use this if the plugin can only provide a source to a single player at a time + in_use_by: str | None = field( + default=None, + compare=False, + metadata=field_options(serialize="omit", deserialize=pass_through), + repr=False, + ) + + # metadata of the current playing media (if known) + metadata: PlayerMedia | None = field( + default=None, + compare=False, + metadata=field_options(serialize="omit", deserialize=pass_through), + repr=False, + ) + + # The type of stream that is provided by this source + stream_type: StreamType | None = field( + default=StreamType.CUSTOM, + compare=False, + metadata=field_options(serialize="omit", deserialize=pass_through), + repr=False, + ) + + # The path to the source/audio (if streamtype is not custom) + path: str | None = field( + default=None, + compare=False, + metadata=field_options(serialize="omit", deserialize=pass_through), + repr=False, + ) + + class PluginProvider(Provider): """ Base representation of a Plugin for Music Assistant. @@ -22,44 +73,20 @@ class PluginProvider(Provider): Plugin Provider implementations should inherit from this base model. """ - async def get_sources(self) -> list[PluginSource]: # type: ignore[return] - """Get all audio sources provided by this provider.""" - # Will only be called if ProviderFeature.AUDIO_SOURCE is declared - raise NotImplementedError - - async def get_source(self, prov_source_id: str) -> PluginSource: # type: ignore[return] - """Get AudioSource details by id.""" + def get_source(self) -> PluginSource: # type: ignore[return] + """Get (audio)source details for this plugin.""" # Will only be called if ProviderFeature.AUDIO_SOURCE is declared raise NotImplementedError - async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails: - """Return the streamdetails to stream an (audio)source provided by this plugin.""" - # Will only be called if ProviderFeature.AUDIO_SOURCE is declared - raise NotImplementedError - - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: + async def get_audio_stream(self, player_id: str) -> AsyncGenerator[bytes, None]: """ - Return the (custom) audio stream for the provider item. + Return the (custom) audio stream for the audio source provided by this plugin. + + Will only be called if this plugin is a PluginSource, meaning that + the ProviderFeature.AUDIO_SOURCE is declared AND if the streamtype is StreamType.CUSTOM. - Will only be called when the stream_type is set to CUSTOM. + The player_id is the id of the player that is requesting the stream. """ if False: yield b"" raise NotImplementedError - - async def on_streamed( - self, - streamdetails: StreamDetails, - ) -> None: - """ - Handle callback when given streamdetails completed streaming. - - To get the number of seconds streamed, see streamdetails.seconds_streamed. - To get the number of seconds seeked/skipped, see streamdetails.seek_position. - Note that seconds_streamed is the total streamed seconds, so without seeked time. - - NOTE: Due to internal and player buffering, - this may be called in advance of the actual completion. - """ diff --git a/music_assistant/providers/_template_player_provider/__init__.py b/music_assistant/providers/_template_player_provider/__init__.py index 9e5aa66a..33ff6dea 100644 --- a/music_assistant/providers/_template_player_provider/__init__.py +++ b/music_assistant/providers/_template_player_provider/__init__.py @@ -109,7 +109,7 @@ class MyDemoPlayerprovider(PlayerProvider): """Return the features supported by this Provider.""" # MANDATORY # you should return a set of provider-level features - # here that your player provider supports or an empty tuple if none. + # here that your player provider supports or an empty set if none. # for example 'ProviderFeature.SYNC_PLAYERS' if you can sync players. return {ProviderFeature.SYNC_PLAYERS} diff --git a/music_assistant/providers/_template_plugin_provider/__init__.py b/music_assistant/providers/_template_plugin_provider/__init__.py index 5e5fd807..fbed2400 100644 --- a/music_assistant/providers/_template_plugin_provider/__init__.py +++ b/music_assistant/providers/_template_plugin_provider/__init__.py @@ -38,7 +38,7 @@ from collections.abc import AsyncGenerator from typing import TYPE_CHECKING from music_assistant_models.enums import ContentType, EventType, ProviderFeature -from music_assistant_models.streamdetails import AudioFormat +from music_assistant_models.media_items.audio_format import AudioFormat from music_assistant.models.plugin import PluginProvider, PluginSource diff --git a/music_assistant/providers/airplay/provider.py b/music_assistant/providers/airplay/provider.py index f952dabf..6d3c850c 100644 --- a/music_assistant/providers/airplay/provider.py +++ b/music_assistant/providers/airplay/provider.py @@ -300,12 +300,30 @@ class AirplayProvider(PlayerProvider): output_format=AIRPLAY_PCM_FORMAT, use_pre_announce=media.custom_data["use_pre_announce"], ) + elif media.media_type == MediaType.PLUGIN_SOURCE: + # special case: plugin source stream + input_format = AIRPLAY_PCM_FORMAT + assert media.custom_data is not None # for type checking + audio_source = self.mass.streams.get_plugin_source_stream( + plugin_source_id=media.custom_data["provider"], + output_format=AIRPLAY_PCM_FORMAT, + player_id=player_id, + ) elif media.queue_id and media.queue_id.startswith("ugp_"): # special case: UGP stream ugp_provider = cast(PlayerGroupProvider, self.mass.get_provider("player_group")) ugp_stream = ugp_provider.ugp_streams[media.queue_id] input_format = ugp_stream.base_pcm_format audio_source = ugp_stream.subscribe_raw() + elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id: + # use single item stream request for radio streams + input_format = AIRPLAY_PCM_FORMAT + queue_item = self.mass.player_queues.get_item(media.queue_id, media.queue_item_id) + assert queue_item is not None # for type checking + audio_source = self.mass.streams.get_queue_item_stream( + queue_item=queue_item, + pcm_format=AIRPLAY_PCM_FORMAT, + ) elif media.queue_id and media.queue_item_id: # regular queue (flow) stream request input_format = AIRPLAY_FLOW_PCM_FORMAT @@ -313,7 +331,7 @@ class AirplayProvider(PlayerProvider): assert queue start_queue_item = self.mass.player_queues.get_item(media.queue_id, media.queue_item_id) assert start_queue_item - audio_source = self.mass.streams.get_flow_stream( + audio_source = self.mass.streams.get_queue_flow_stream( queue=queue, start_queue_item=start_queue_item, pcm_format=input_format, @@ -610,8 +628,10 @@ class AirplayProvider(PlayerProvider): # device switched to another source (or is powered off) if raop_stream := airplay_player.raop_stream: # ignore this if we just started playing to prevent false positives - assert mass_player.elapsed_time - if mass_player.elapsed_time > 10 and mass_player.state == PlayerState.PLAYING: + elapsed_time = ( + 10 if mass_player.elapsed_time is None else mass_player.elapsed_time + ) + if elapsed_time > 10 and mass_player.state == PlayerState.PLAYING: raop_stream.prevent_playback = True self.mass.create_task(self.monitor_prevent_playback(player_id)) elif "device-prevent-playback=0" in path: diff --git a/music_assistant/providers/airplay/raop.py b/music_assistant/providers/airplay/raop.py index 844566da..7c67a8c6 100644 --- a/music_assistant/providers/airplay/raop.py +++ b/music_assistant/providers/airplay/raop.py @@ -421,10 +421,13 @@ class RaopStream: title = queue.current_item.name artist = "" album = "" - if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_metadata: + if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title: # stream title/metadata from radio/live stream - title = queue.current_item.streamdetails.stream_metadata.title or "" - artist = queue.current_item.streamdetails.stream_metadata.artist or "" + if " - " in queue.current_item.streamdetails.stream_title: + artist, title = queue.current_item.streamdetails.stream_title.split(" - ", 1) + else: + title = queue.current_item.streamdetails.stream_title + artist = "" # set album to radio station name album = queue.current_item.name elif media_item := queue.current_item.media_item: diff --git a/music_assistant/providers/hass/__init__.py b/music_assistant/providers/hass/__init__.py index 96af4f1d..eb34f5ec 100644 --- a/music_assistant/providers/hass/__init__.py +++ b/music_assistant/providers/hass/__init__.py @@ -213,6 +213,8 @@ async def _get_player_control_config_entries(hass: HomeAssistantClient) -> tuple all_mute_entities: list[ConfigValueOption] = [] all_volume_entities: list[ConfigValueOption] = [] # collect all entities that are usable for player controls + if not hass.connected: + return () for state in await hass.get_states(): if "friendly_name" not in state["attributes"]: # filter out invalid/unavailable players diff --git a/music_assistant/providers/player_group/__init__.py b/music_assistant/providers/player_group/__init__.py index c022b7c3..c1b89b90 100644 --- a/music_assistant/providers/player_group/__init__.py +++ b/music_assistant/providers/player_group/__init__.py @@ -481,9 +481,22 @@ class PlayerGroupProvider(PlayerProvider): output_format=UGP_FORMAT, use_pre_announce=media.custom_data["use_pre_announce"], ) + elif media.media_type == MediaType.PLUGIN_SOURCE: + # special case: plugin source stream + audio_source = self.mass.streams.get_plugin_source_stream( + plugin_source_id=media.custom_data["provider"], + output_format=UGP_FORMAT, + player_id=player_id, + ) + elif media.media_type == MediaType.RADIO: + # use single item stream request for radio streams + audio_source = self.mass.streams.get_queue_item_stream( + queue_item=self.mass.player_queues.get_item(media.queue_id, media.queue_item_id), + pcm_format=UGP_FORMAT, + ) elif media.queue_id and media.queue_item_id: # regular queue stream request - audio_source = self.mass.streams.get_flow_stream( + audio_source = self.mass.streams.get_queue_flow_stream( queue=self.mass.player_queues.get(media.queue_id), start_queue_item=self.mass.player_queues.get_item( media.queue_id, media.queue_item_id diff --git a/music_assistant/providers/siriusxm/__init__.py b/music_assistant/providers/siriusxm/__init__.py index 466eef25..45ddc7ae 100644 --- a/music_assistant/providers/siriusxm/__init__.py +++ b/music_assistant/providers/siriusxm/__init__.py @@ -24,7 +24,7 @@ from music_assistant_models.media_items import ( ProviderMapping, Radio, ) -from music_assistant_models.streamdetails import LivestreamMetadata, StreamDetails +from music_assistant_models.streamdetails import StreamDetails from tenacity import RetryError from music_assistant.helpers.util import select_free_port @@ -268,10 +268,8 @@ class SiriusXMProvider(MusicProvider): if latest_cut_marker: latest_cut = latest_cut_marker.cut title = latest_cut.title - self._current_stream_details.stream_metadata = LivestreamMetadata( - title=title, - artist=", ".join([a.name for a in latest_cut.artists]), - ) + artist = ", ".join([a.name for a in latest_cut.artists]) + self._current_stream_details.stream_title = f"{artist} - {title}" async def _refresh_channels(self) -> bool: self._channels = await self._client.channels diff --git a/music_assistant/providers/slimproto/__init__.py b/music_assistant/providers/slimproto/__init__.py index 64c5e248..d8a75624 100644 --- a/music_assistant/providers/slimproto/__init__.py +++ b/music_assistant/providers/slimproto/__init__.py @@ -377,15 +377,28 @@ class SlimprotoProvider(PlayerProvider): output_format=master_audio_format, use_pre_announce=media.custom_data["use_pre_announce"], ) + elif media.media_type == MediaType.PLUGIN_SOURCE: + # special case: plugin source stream + audio_source = self.mass.streams.get_plugin_source_stream( + plugin_source_id=media.custom_data["provider"], + output_format=master_audio_format, + player_id=player_id, + ) elif media.queue_id.startswith("ugp_"): # special case: UGP stream ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group") ugp_stream = ugp_provider.ugp_streams[media.queue_id] # Filter is later applied in MultiClientStream audio_source = ugp_stream.get_stream(master_audio_format, filter_params=None) + elif media.media_type == MediaType.RADIO: + # use single item stream request for radio streams + audio_source = self.mass.streams.get_queue_item_stream( + queue_item=self.mass.player_queues.get_item(media.queue_id, media.queue_item_id), + pcm_format=master_audio_format, + ) elif media.queue_id and media.queue_item_id: # regular queue stream request - audio_source = self.mass.streams.get_flow_stream( + audio_source = self.mass.streams.get_queue_flow_stream( queue=self.mass.player_queues.get(media.queue_id), start_queue_item=self.mass.player_queues.get_item( media.queue_id, media.queue_item_id diff --git a/music_assistant/providers/snapcast/__init__.py b/music_assistant/providers/snapcast/__init__.py index 580ff530..2191b0e7 100644 --- a/music_assistant/providers/snapcast/__init__.py +++ b/music_assistant/providers/snapcast/__init__.py @@ -482,7 +482,7 @@ class SnapCastProvider(PlayerProvider): self.mass.players.update(player_id, skip_forward=True) self.mass.players.update(mass_player.synced_to, skip_forward=True) - async def play_media(self, player_id: str, media: PlayerMedia) -> None: + async def play_media(self, player_id: str, media: PlayerMedia) -> None: # noqa: PLR0915 """Handle PLAY MEDIA on given player.""" player = self.mass.players.get(player_id) if player.synced_to: @@ -506,16 +506,39 @@ class SnapCastProvider(PlayerProvider): output_format=DEFAULT_SNAPCAST_FORMAT, use_pre_announce=media.custom_data["use_pre_announce"], ) + elif media.media_type == MediaType.PLUGIN_SOURCE: + # special case: plugin source stream + # consume the stream directly, so we can skip one step in between + # provider: PluginProvider = self.mass.get_provider(media.custom_data["provider"]) + # plugin_source = provider.get_source() + # audio_source = ( + # provider.get_audio_stream(player_id) + # if plugin_source.stream_type == StreamType.CUSTOM + # else plugin_source.path + # ) + input_format = DEFAULT_SNAPCAST_FORMAT + audio_source = self.mass.streams.get_plugin_source_stream( + plugin_source_id=media.custom_data["provider"], + output_format=DEFAULT_SNAPCAST_FORMAT, + player_id=player_id, + ) elif media.queue_id.startswith("ugp_"): # special case: UGP stream ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group") ugp_stream = ugp_provider.ugp_streams[media.queue_id] input_format = ugp_stream.base_pcm_format audio_source = ugp_stream.subscribe_raw() + elif media.media_type == MediaType.RADIO: + # use single item stream request for radio streams + input_format = DEFAULT_SNAPCAST_FORMAT + audio_source = self.mass.streams.get_queue_item_stream( + queue_item=self.mass.player_queues.get_item(media.queue_id, media.queue_item_id), + pcm_format=DEFAULT_SNAPCAST_FORMAT, + ) elif media.queue_id and media.queue_item_id: # regular queue (flow) stream request input_format = DEFAULT_SNAPCAST_PCM_FORMAT - audio_source = self.mass.streams.get_flow_stream( + audio_source = self.mass.streams.get_queue_flow_stream( queue=self.mass.player_queues.get(media.queue_id), start_queue_item=self.mass.player_queues.get_item( media.queue_id, media.queue_item_id diff --git a/music_assistant/providers/sonos/const.py b/music_assistant/providers/sonos/const.py index 98618b4b..f5892264 100644 --- a/music_assistant/providers/sonos/const.py +++ b/music_assistant/providers/sonos/const.py @@ -50,7 +50,7 @@ PLAYER_SOURCE_MAP = { ), SOURCE_AIRPLAY: PlayerSource( id=SOURCE_AIRPLAY, - name="Spotify", + name="Airplay", passive=True, can_play_pause=True, can_next_previous=True, @@ -66,7 +66,7 @@ PLAYER_SOURCE_MAP = { ), SOURCE_RADIO: PlayerSource( id=SOURCE_RADIO, - name="Spotify", + name="Radio", passive=True, can_play_pause=True, can_next_previous=True, diff --git a/music_assistant/providers/sonos/provider.py b/music_assistant/providers/sonos/provider.py index 5e9c9f43..0054e583 100644 --- a/music_assistant/providers/sonos/provider.py +++ b/music_assistant/providers/sonos/provider.py @@ -340,10 +340,8 @@ class SonosPlayerProvider(PlayerProvider): # play a single uri/url # note that this most probably will only work for (long running) radio streams - if self.mass.config.get_raw_player_config_value( - player_id, CONF_ENTRY_ENFORCE_MP3.key, CONF_ENTRY_ENFORCE_MP3.default_value - ): - media.uri = media.uri.replace(".flac", ".mp3") + # enforce mp3 here because Sonos really does not support FLAC streams without duration + media.uri = media.uri.replace(".flac", ".mp3") await sonos_player.client.player.group.play_stream_url( media.uri, {"name": media.title, "type": "track"} ) diff --git a/music_assistant/providers/spotify_connect/__init__.py b/music_assistant/providers/spotify_connect/__init__.py index 91b917ff..2e654dfa 100644 --- a/music_assistant/providers/spotify_connect/__init__.py +++ b/music_assistant/providers/spotify_connect/__init__.py @@ -22,23 +22,17 @@ from music_assistant_models.enums import ( EventType, MediaType, ProviderFeature, - QueueOption, StreamType, ) -from music_assistant_models.errors import MediaNotFoundError -from music_assistant_models.media_items import AudioFormat, PluginSource, ProviderMapping -from music_assistant_models.streamdetails import LivestreamMetadata, StreamDetails +from music_assistant_models.media_items import AudioFormat +from music_assistant_models.player import PlayerMedia from music_assistant.constants import CONF_ENTRY_WARN_PREVIEW -from music_assistant.helpers.audio import get_chunksize -from music_assistant.helpers.ffmpeg import get_ffmpeg_stream -from music_assistant.helpers.process import AsyncProcess -from music_assistant.models.music_provider import MusicProvider +from music_assistant.helpers.process import AsyncProcess, check_output +from music_assistant.models.plugin import PluginProvider, PluginSource from music_assistant.providers.spotify.helpers import get_librespot_binary if TYPE_CHECKING: - from collections.abc import AsyncGenerator - from aiohttp.web import Request from music_assistant_models.config_entries import ConfigValueType, ProviderConfig from music_assistant_models.event import MassEvent @@ -48,7 +42,6 @@ if TYPE_CHECKING: from music_assistant.models import ProviderInstanceType CONF_MASS_PLAYER_ID = "mass_player_id" -CONF_CUSTOM_NAME = "custom_name" CONF_HANDOFF_MODE = "handoff_mode" CONNECT_ITEM_ID = "spotify_connect" @@ -89,15 +82,6 @@ async def get_config_entries( ), required=True, ), - ConfigEntry( - key=CONF_CUSTOM_NAME, - type=ConfigEntryType.STRING, - label="Name for the Spotify Connect Player", - default_value="", - description="Select what name should be shown in the Spotify app as speaker name. " - "Leave blank to use the Music Assistant player's name", - required=False, - ), # ConfigEntry( # key=CONF_HANDOFF_MODE, # type=ConfigEntryType.BOOLEAN, @@ -113,14 +97,14 @@ async def get_config_entries( # "When enabling handoff mode, the Spotify Connect plugin will instead " # "forward the Spotify playback request to the Music Assistant Queue, so basically " # "the spotify app can be used to initiate playback, but then MA will take over " - # "the playback and manage the queue, the normal operating mode of MA. \n\n" + # "the playback and manage the queue, which is the normal operating mode of MA. \n\n" # "This mode however means that the Spotify app will not report the actual playback ", # required=False, # ), ) -class SpotifyConnectProvider(MusicProvider): +class SpotifyConnectProvider(PluginProvider): """Implementation of a Spotify Connect Plugin.""" def __init__( @@ -135,9 +119,31 @@ class SpotifyConnectProvider(MusicProvider): self._runner_task: asyncio.Task | None = None # type: ignore[type-arg] self._librespot_proc: AsyncProcess | None = None self._librespot_started = asyncio.Event() - self._player_connected: bool = False - self._current_streamdetails: StreamDetails | None = None - self._audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(60) + self.named_pipe = f"/tmp/{self.instance_id}" # noqa: S108 + self._source_details = PluginSource( + id=self.lookup_key, + name=self.manifest.name, + # we set passive to true because we + # dont allow this source to be selected directly + passive=True, + # TODO: implement controlling spotify from MA itself + can_play_pause=False, + can_seek=False, + can_next_previous=False, + audio_format=AudioFormat( + content_type=ContentType.PCM_S16LE, + codec_type=ContentType.PCM_S16LE, + sample_rate=44100, + bit_depth=16, + channels=2, + ), + metadata=PlayerMedia( + "Spotify Connect", + ), + stream_type=StreamType.NAMED_PIPE, + path=self.named_pipe, + ) + self._audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(10) self._on_unload_callbacks: list[Callable[..., None]] = [ self.mass.subscribe( self._on_mass_player_event, @@ -155,15 +161,6 @@ class SpotifyConnectProvider(MusicProvider): """Return the features supported by this Provider.""" return {ProviderFeature.AUDIO_SOURCE} - @property - def name(self) -> str: - """Return (custom) friendly name for this provider instance.""" - if custom_name := cast(str, self.config.get_value(CONF_CUSTOM_NAME)): - return f"{self.manifest.name}: {custom_name}" - if player := self.mass.players.get(self.mass_player_id): - return f"{self.manifest.name}: {player.display_name}" - return super().name - async def handle_async_init(self) -> None: """Handle async initialization of the provider.""" self._librespot_bin = await get_librespot_binary() @@ -178,75 +175,24 @@ class SpotifyConnectProvider(MusicProvider): for callback in self._on_unload_callbacks: callback() - async def get_sources(self) -> list[PluginSource]: - """Get all audio sources provided by this provider.""" - # we only have passive/hidden sources so no need to supply this listing - return [] - - async def get_source(self, prov_source_id: str) -> PluginSource: - """Get AudioSource details by id.""" - if prov_source_id != CONNECT_ITEM_ID: - raise MediaNotFoundError(f"Invalid source id: {prov_source_id}") - return PluginSource( - item_id=CONNECT_ITEM_ID, - provider=self.instance_id, - name="Spotify Connect", - provider_mappings={ - ProviderMapping( - item_id=CONNECT_ITEM_ID, - provider_domain=self.domain, - provider_instance=self.instance_id, - audio_format=AudioFormat(content_type=ContentType.OGG), - ) - }, - ) - - async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails: - """Return the streamdetails to stream an audiosource provided by this plugin.""" - self._current_streamdetails = streamdetails = StreamDetails( - item_id=CONNECT_ITEM_ID, - provider=self.instance_id, - audio_format=AudioFormat( - content_type=ContentType.OGG, - ), - media_type=MediaType.PLUGIN_SOURCE, - allow_seek=False, - can_seek=False, - stream_type=StreamType.CUSTOM, - ) - return streamdetails - - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """Return the audio stream for the provider item.""" - if not self._librespot_proc or self._librespot_proc.closed: - raise MediaNotFoundError(f"Librespot not ready for: {streamdetails.item_id}") - self._player_connected = True - try: - while True: - yield await self._audio_buffer.get() - finally: - self._player_connected = False - await asyncio.sleep(2) - if not self._player_connected: - # handle situation where the stream is disconnected from the MA player - # easiest way to unmark this librespot instance as active player is to close it - await self._librespot_proc.close(True) + def get_source(self) -> PluginSource: + """Get (audio)source details for this plugin.""" + return self._source_details async def _librespot_runner(self) -> None: """Run the spotify connect daemon in a background task.""" assert self._librespot_bin - if not (player := self.mass.players.get(self.mass_player_id)): - raise MediaNotFoundError(f"Player not found: {self.mass_player_id}") - name = cast(str, self.config.get_value(CONF_CUSTOM_NAME) or player.display_name) - self.logger.info("Starting Spotify Connect background daemon %s", name) + self.logger.info("Starting Spotify Connect background daemon") os.environ["MASS_CALLBACK"] = f"{self.mass.streams.base_url}/{self.instance_id}" + await check_output("rm", "-f", self.named_pipe) + await asyncio.sleep(0.5) + await check_output("mkfifo", self.named_pipe) + await asyncio.sleep(0.5) try: args: list[str] = [ self._librespot_bin, "--name", - name, + self.name, "--cache", self.cache_dir, "--disable-audio-cache", @@ -254,9 +200,10 @@ class SpotifyConnectProvider(MusicProvider): "320", "--backend", "pipe", + "--device", + self.named_pipe, "--dither", "none", - "--passthrough", # disable volume control "--mixer", "softvol", @@ -264,51 +211,37 @@ class SpotifyConnectProvider(MusicProvider): "fixed", "--initial-volume", "100", + "--enable-volume-normalisation", # forward events to the events script "--onevent", str(EVENTS_SCRIPT), "--emit-sink-events", ] self._librespot_proc = librespot = AsyncProcess( - args, stdout=True, stderr=True, name=f"librespot[{name}]" + args, stdout=False, stderr=True, name=f"librespot[{self.name}]" ) await librespot.start() # keep reading logging from stderr until exit - async def log_reader() -> None: - async for line in librespot.iter_stderr(): - if ( - not self._librespot_started.is_set() - and "Using StdoutSink (pipe) with format: S16" in line - ): - self._librespot_started.set() - if "error sending packet Os" in line: - continue - if "dropping truncated packet" in line: - continue - if "couldn't parse packet from " in line: - continue - self.logger.debug(line) - - async def audio_reader() -> None: - chunksize = get_chunksize(AudioFormat(content_type=ContentType.OGG)) - async for chunk in get_ffmpeg_stream( - librespot.iter_chunked(chunksize), - input_format=AudioFormat(content_type=ContentType.OGG), - output_format=AudioFormat(content_type=ContentType.OGG), - extra_input_args=["-readrate", "1.0"], + async for line in librespot.iter_stderr(): + if ( + not self._librespot_started.is_set() + and "Using StdoutSink (pipe) with format: S16" in line ): - if librespot.closed or self._stop_called: - break - if not self._player_connected: - continue - await self._audio_buffer.put(chunk) + self._librespot_started.set() + if "error sending packet Os" in line: + continue + if "dropping truncated packet" in line: + continue + if "couldn't parse packet from " in line: + continue + self.logger.debug(line) - await asyncio.gather(log_reader(), audio_reader()) except asyncio.CancelledError: await librespot.close(True) finally: - self.logger.info("Spotify Connect background daemon stopped for %s", name) + self.logger.info("Spotify Connect background daemon stopped for %s", self.name) + await check_output("rm", "-f", self.named_pipe) # auto restart if not stopped manually if not self._stop_called and self._librespot_started.is_set(): self._setup_player_daemon() @@ -338,34 +271,36 @@ class SpotifyConnectProvider(MusicProvider): # handle session connected event # this player has become the active spotify connect player # we need to start the playback - if not self._player_connected and json_data.get("event") in ( - "session_connected", - "play_request_id_changed", + self.logger.error("%s - %s", self.name, json_data.get("event")) + if not self._source_details.in_use_by and json_data.get("event") in ( + # "session_connected", + # "loading", + "sink", ): - # initiate playback by selecting the pluginsource mediaitem on the player - pluginsource_item = await self.get_source(CONNECT_ITEM_ID) + # initiate playback by selecting this source on the default player self.mass.create_task( - self.mass.player_queues.play_media( - queue_id=self.mass_player_id, - media=pluginsource_item, - option=QueueOption.REPLACE, - ) + self.mass.players.select_source(self.mass_player_id, self.lookup_key) ) - if self._current_streamdetails: - # parse metadata fields - if "common_metadata_fields" in json_data: - title = json_data["common_metadata_fields"].get("name", "Unknown") - if artists := json_data.get("track_metadata_fields", {}).get("artists"): - artist = artists[0] - else: - artist = "Unknown" - if images := json_data["common_metadata_fields"].get("covers"): - image_url = images[0] - else: - image_url = None - self._current_streamdetails.stream_metadata = LivestreamMetadata( - title=title, artist=artist, image_url=image_url - ) + # parse metadata fields + if "common_metadata_fields" in json_data: + uri = json_data["common_metadata_fields"].get("uri", "Unknown") + title = json_data["common_metadata_fields"].get("name", "Unknown") + if artists := json_data.get("track_metadata_fields", {}).get("artists"): + artist = artists[0] + else: + artist = "Unknown" + album = json_data["common_metadata_fields"].get("album", "Unknown") + if images := json_data["common_metadata_fields"].get("covers"): + image_url = images[0] + else: + image_url = None + if self._source_details.metadata is None: + self._source_details.metadata = PlayerMedia(uri, media_type=MediaType.TRACK) + self._source_details.metadata.uri = uri + self._source_details.metadata.title = title + self._source_details.metadata.artist = artist + self._source_details.metadata.album = album + self._source_details.metadata.image_url = image_url return Response() -- 2.34.1