From d72912e1821bf4f14df65baf9d6dbe309a095687 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Sat, 18 Jan 2025 01:30:12 +0100 Subject: [PATCH] Some small fixes for plugin source handling --- music_assistant/controllers/player_queues.py | 6 + music_assistant/controllers/streams.py | 119 +----------------- music_assistant/helpers/audio.py | 1 + .../providers/spotify_connect/__init__.py | 57 ++++----- 4 files changed, 38 insertions(+), 145 deletions(-) diff --git a/music_assistant/controllers/player_queues.py b/music_assistant/controllers/player_queues.py index 15d896c7..e4d91a7e 100644 --- a/music_assistant/controllers/player_queues.py +++ b/music_assistant/controllers/player_queues.py @@ -771,6 +771,9 @@ class PlayerQueuesController(CoreController): queue.index_in_buffer = index queue.flow_mode_stream_log = [] queue.flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE) + # no point in enabled flow mode for radio or plugin sources + if queue_item.media_type in (MediaType.RADIO, MediaType.PLUGIN_SOURCE): + queue.flow_mode = False queue.current_item = queue_item # handle resume point of audiobook(chapter) or podcast(episode) @@ -1242,6 +1245,9 @@ class PlayerQueuesController(CoreController): # enqueue next track on the player if we're not in flow mode task_id = f"enqueue_next_item_{queue_id}" self.mass.call_later(2, self._enqueue_next_item, queue_id, item_id, task_id=task_id) + # repeat this one time because some players + # don't accept the next track when still buffering one + self.mass.call_later(30, self._enqueue_next_item, queue_id, item_id, task_id=task_id) # Main queue manipulation methods diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index efe075ec..825577cc 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -65,7 +65,6 @@ from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stre from music_assistant.helpers.util import get_ip, get_ips, select_free_port, try_parse_bool from music_assistant.helpers.webserver import Webserver from music_assistant.models.core_controller import CoreController -from music_assistant.models.plugin import PluginProvider if TYPE_CHECKING: from music_assistant_models.config_entries import CoreConfig @@ -182,6 +181,7 @@ class StreamsController(CoreController): "This is an advanced setting that should normally " "not be adjusted in regular setups.", category="advanced", + required=False, ), ConfigEntry( key=CONF_BIND_IP, @@ -194,6 +194,7 @@ class StreamsController(CoreController): "This is an advanced setting that should normally " "not be adjusted in regular setups.", category="advanced", + required=False, ), ) @@ -232,11 +233,6 @@ class StreamsController(CoreController): "/announcement/{player_id}.{fmt}", self.serve_announcement_stream, ), - ( - "*", - "/pluginsource/{provider_id}/{source_id}/{player_id}.{fmt}", - self.serve_plugin_source_stream, - ), ], ) @@ -568,85 +564,6 @@ class StreamsController(CoreController): return resp - async def serve_plugin_source_stream(self, request: web.Request) -> web.Response: - """Stream PluginSource audio to a player.""" - self._log_request(request) - provider_id = request.match_info["provider_id"] - provider: PluginProvider | None - if not (provider := self.mass.get_provider(provider_id)): - raise web.HTTPNotFound(reason=f"Unknown Provider: {provider_id}") - source_id = request.match_info["source_id"] - if not (source := await provider.get_source(source_id)): - raise web.HTTPNotFound(reason=f"Unknown PluginSource: {source_id}") - try: - streamdetails = await provider.get_stream_details(source_id, "plugin_source") - except Exception: - err_msg = f"No streamdetails for PluginSource: {source_id}" - self.logger.error(err_msg) - raise web.HTTPNotFound(reason=err_msg) - - # work out output format/details - player_id = request.match_info["player_id"] - player = self.mass.players.get(player_id) - if not player: - raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}") - output_format = await self._get_output_format( - output_format_str=request.match_info["fmt"], - player=player, - default_sample_rate=streamdetails.audio_format.sample_rate, - default_bit_depth=streamdetails.audio_format.bit_depth, - ) - - # prepare request, add some DLNA/UPNP compatible headers - headers = { - **DEFAULT_STREAM_HEADERS, - "icy-name": source.name, - } - resp = web.StreamResponse( - status=200, - reason="OK", - headers=headers, - ) - resp.content_type = f"audio/{output_format.output_format_str}" - http_profile: str = await self.mass.config.get_player_config_value( - player_id, CONF_HTTP_PROFILE - ) - if http_profile == "forced_content_length" and streamdetails.duration: - # guess content length based on duration - resp.content_length = get_chunksize(output_format, streamdetails.duration) - elif http_profile == "chunked": - resp.enable_chunked_encoding() - - await resp.prepare(request) - - # return early if this is not a GET request - if request.method != "GET": - return resp - - # all checks passed, start streaming! - self.logger.debug( - "Start serving audio stream for PluginSource %s (%s) to %s", - source.name, - source.id, - player.display_name, - ) - async for chunk in self.get_plugin_source_stream( - streamdetails, - output_format=output_format, - ): - try: - await resp.write(chunk) - except (BrokenPipeError, ConnectionResetError, ConnectionError): - break - if streamdetails.stream_error: - self.logger.error( - "Error streaming PluginSource %s (%s) to %s", - source.name, - source.uri, - player.display_name, - ) - return resp - def get_command_url(self, player_or_queue_id: str, command: str) -> str: """Get the url for the special command stream.""" return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3" @@ -960,38 +877,6 @@ class StreamsController(CoreController): ): yield chunk - async def get_plugin_source_stream( - self, - streamdetails: StreamDetails, - output_format: AudioFormat, - ) -> AsyncGenerator[tuple[bool, bytes], None]: - """Get the audio stream for a PluginSource.""" - streamdetails.seek_position = 0 - extra_input_args = ["-re"] - # work out audio source for these streamdetails - if streamdetails.stream_type == StreamType.CUSTOM: - audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream( - streamdetails, - seek_position=streamdetails.seek_position, - ) - elif streamdetails.stream_type == StreamType.HLS: - substream = await get_hls_substream(self.mass, streamdetails.path) - audio_source = substream.path - else: - audio_source = streamdetails.path - - # add support for decryption key provided in streamdetails - if streamdetails.decryption_key: - extra_input_args += ["-decryption_key", streamdetails.decryption_key] - - async for chunk in get_ffmpeg_stream( - audio_input=audio_source, - input_format=streamdetails.audio_format, - output_format=output_format, - extra_input_args=extra_input_args, - ): - yield chunk - def _log_request(self, request: web.Request) -> None: """Log request.""" if not self.logger.isEnabledFor(VERBOSE_LOG_LEVEL): diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index ef3774b0..9a94d642 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -510,6 +510,7 @@ async def get_media_stream( VolumeNormalizationMode.FIXED_GAIN, ) and (finished or (seconds_streamed >= 30)) + and streamdetails.media_type != MediaType.PLUGIN_SOURCE ): # dynamic mode not allowed and no measurement known, we need to analyze the audio # add background task to start analyzing the audio diff --git a/music_assistant/providers/spotify_connect/__init__.py b/music_assistant/providers/spotify_connect/__init__.py index d788af79..fd4088d4 100644 --- a/music_assistant/providers/spotify_connect/__init__.py +++ b/music_assistant/providers/spotify_connect/__init__.py @@ -136,6 +136,7 @@ class SpotifyConnectProvider(MusicProvider): self._librespot_started = asyncio.Event() self._player_connected: bool = False self._current_streamdetails: StreamDetails | None = None + self._audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(60) self._on_unload_callbacks: list[Callable[..., None]] = [ self.mass.subscribe( self._on_mass_player_event, @@ -207,13 +208,12 @@ class SpotifyConnectProvider(MusicProvider): item_id=CONNECT_ITEM_ID, provider=self.instance_id, audio_format=AudioFormat( - content_type=ContentType.PCM_S16LE, + content_type=ContentType.OGG, ), media_type=MediaType.PLUGIN_SOURCE, allow_seek=False, can_seek=False, stream_type=StreamType.CUSTOM, - extra_input_args=["-re"], ) return streamdetails @@ -224,19 +224,8 @@ class SpotifyConnectProvider(MusicProvider): if not self._librespot_proc or self._librespot_proc.closed: raise MediaNotFoundError(f"Librespot not ready for: {streamdetails.item_id}") self._player_connected = True - chunksize = get_chunksize(streamdetails.audio_format) - try: - async for chunk in self._librespot_proc.iter_chunked(chunksize): - if self._librespot_proc.closed or self._stop_called: - break - yield chunk - finally: - self._player_connected = False - await asyncio.sleep(2) - if not self._player_connected: - # handle situation where the stream is disconnected from the MA player - # easiest way to unmark this librespot instance as active player is to close it - await self._librespot_proc.close(True) + while True: + yield await self._audio_buffer.get() async def _librespot_runner(self) -> None: """Run the spotify connect daemon in a background task.""" @@ -260,6 +249,7 @@ class SpotifyConnectProvider(MusicProvider): "pipe", "--dither", "none", + "--passthrough", # disable volume control "--mixer", "softvol", @@ -276,20 +266,31 @@ class SpotifyConnectProvider(MusicProvider): args, stdout=True, stderr=True, name=f"librespot[{name}]" ) await librespot.start() + # keep reading logging from stderr until exit - async for line in librespot.iter_stderr(): - if ( - not self._librespot_started.is_set() - and "Using StdoutSink (pipe) with format: S16" in line - ): - self._librespot_started.set() - if "error sending packet Os" in line: - continue - if "dropping truncated packet" in line: - continue - if "couldn't parse packet from " in line: - continue - self.logger.debug(line) + async def log_reader() -> None: + async for line in librespot.iter_stderr(): + if ( + not self._librespot_started.is_set() + and "Using StdoutSink (pipe) with format: S16" in line + ): + self._librespot_started.set() + if "error sending packet Os" in line: + continue + if "dropping truncated packet" in line: + continue + if "couldn't parse packet from " in line: + continue + self.logger.debug(line) + + async def audio_reader() -> None: + chunksize = get_chunksize(AudioFormat(content_type=ContentType.OGG)) + async for chunk in librespot.iter_chunked(chunksize): + if librespot.closed or self._stop_called: + break + await self._audio_buffer.put(chunk) + + await asyncio.gather(log_reader(), audio_reader()) except asyncio.CancelledError: await librespot.close(True) finally: -- 2.34.1