Some small fixes for plugin source handling
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 18 Jan 2025 00:30:12 +0000 (01:30 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 18 Jan 2025 00:30:12 +0000 (01:30 +0100)
music_assistant/controllers/player_queues.py
music_assistant/controllers/streams.py
music_assistant/helpers/audio.py
music_assistant/providers/spotify_connect/__init__.py

index 15d896c70c967b6057e60ab36009b39127968d29..e4d91a7efb089d8624d0fef59125b09027270ea0 100644 (file)
@@ -771,6 +771,9 @@ class PlayerQueuesController(CoreController):
         queue.index_in_buffer = index
         queue.flow_mode_stream_log = []
         queue.flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE)
+        # no point in enabled flow mode for radio or plugin sources
+        if queue_item.media_type in (MediaType.RADIO, MediaType.PLUGIN_SOURCE):
+            queue.flow_mode = False
         queue.current_item = queue_item
 
         # handle resume point of audiobook(chapter) or podcast(episode)
@@ -1242,6 +1245,9 @@ class PlayerQueuesController(CoreController):
         # enqueue next track on the player if we're not in flow mode
         task_id = f"enqueue_next_item_{queue_id}"
         self.mass.call_later(2, self._enqueue_next_item, queue_id, item_id, task_id=task_id)
+        # repeat this one time because some players
+        # don't accept the next track when still buffering one
+        self.mass.call_later(30, self._enqueue_next_item, queue_id, item_id, task_id=task_id)
 
     # Main queue manipulation methods
 
index efe075ec3e9e1d37b1c896bc64a5542b96f05beb..825577cc7a927a4811db5912dc8fa52e57a6088e 100644 (file)
@@ -65,7 +65,6 @@ from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stre
 from music_assistant.helpers.util import get_ip, get_ips, select_free_port, try_parse_bool
 from music_assistant.helpers.webserver import Webserver
 from music_assistant.models.core_controller import CoreController
-from music_assistant.models.plugin import PluginProvider
 
 if TYPE_CHECKING:
     from music_assistant_models.config_entries import CoreConfig
@@ -182,6 +181,7 @@ class StreamsController(CoreController):
                 "This is an advanced setting that should normally "
                 "not be adjusted in regular setups.",
                 category="advanced",
+                required=False,
             ),
             ConfigEntry(
                 key=CONF_BIND_IP,
@@ -194,6 +194,7 @@ class StreamsController(CoreController):
                 "This is an advanced setting that should normally "
                 "not be adjusted in regular setups.",
                 category="advanced",
+                required=False,
             ),
         )
 
@@ -232,11 +233,6 @@ class StreamsController(CoreController):
                     "/announcement/{player_id}.{fmt}",
                     self.serve_announcement_stream,
                 ),
-                (
-                    "*",
-                    "/pluginsource/{provider_id}/{source_id}/{player_id}.{fmt}",
-                    self.serve_plugin_source_stream,
-                ),
             ],
         )
 
@@ -568,85 +564,6 @@ class StreamsController(CoreController):
 
         return resp
 
-    async def serve_plugin_source_stream(self, request: web.Request) -> web.Response:
-        """Stream PluginSource audio to a player."""
-        self._log_request(request)
-        provider_id = request.match_info["provider_id"]
-        provider: PluginProvider | None
-        if not (provider := self.mass.get_provider(provider_id)):
-            raise web.HTTPNotFound(reason=f"Unknown Provider: {provider_id}")
-        source_id = request.match_info["source_id"]
-        if not (source := await provider.get_source(source_id)):
-            raise web.HTTPNotFound(reason=f"Unknown PluginSource: {source_id}")
-        try:
-            streamdetails = await provider.get_stream_details(source_id, "plugin_source")
-        except Exception:
-            err_msg = f"No streamdetails for PluginSource: {source_id}"
-            self.logger.error(err_msg)
-            raise web.HTTPNotFound(reason=err_msg)
-
-        # work out output format/details
-        player_id = request.match_info["player_id"]
-        player = self.mass.players.get(player_id)
-        if not player:
-            raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
-        output_format = await self._get_output_format(
-            output_format_str=request.match_info["fmt"],
-            player=player,
-            default_sample_rate=streamdetails.audio_format.sample_rate,
-            default_bit_depth=streamdetails.audio_format.bit_depth,
-        )
-
-        # prepare request, add some DLNA/UPNP compatible headers
-        headers = {
-            **DEFAULT_STREAM_HEADERS,
-            "icy-name": source.name,
-        }
-        resp = web.StreamResponse(
-            status=200,
-            reason="OK",
-            headers=headers,
-        )
-        resp.content_type = f"audio/{output_format.output_format_str}"
-        http_profile: str = await self.mass.config.get_player_config_value(
-            player_id, CONF_HTTP_PROFILE
-        )
-        if http_profile == "forced_content_length" and streamdetails.duration:
-            # guess content length based on duration
-            resp.content_length = get_chunksize(output_format, streamdetails.duration)
-        elif http_profile == "chunked":
-            resp.enable_chunked_encoding()
-
-        await resp.prepare(request)
-
-        # return early if this is not a GET request
-        if request.method != "GET":
-            return resp
-
-        # all checks passed, start streaming!
-        self.logger.debug(
-            "Start serving audio stream for PluginSource %s (%s) to %s",
-            source.name,
-            source.id,
-            player.display_name,
-        )
-        async for chunk in self.get_plugin_source_stream(
-            streamdetails,
-            output_format=output_format,
-        ):
-            try:
-                await resp.write(chunk)
-            except (BrokenPipeError, ConnectionResetError, ConnectionError):
-                break
-        if streamdetails.stream_error:
-            self.logger.error(
-                "Error streaming PluginSource %s (%s) to %s",
-                source.name,
-                source.uri,
-                player.display_name,
-            )
-        return resp
-
     def get_command_url(self, player_or_queue_id: str, command: str) -> str:
         """Get the url for the special command stream."""
         return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
@@ -960,38 +877,6 @@ class StreamsController(CoreController):
         ):
             yield chunk
 
-    async def get_plugin_source_stream(
-        self,
-        streamdetails: StreamDetails,
-        output_format: AudioFormat,
-    ) -> AsyncGenerator[tuple[bool, bytes], None]:
-        """Get the audio stream for a PluginSource."""
-        streamdetails.seek_position = 0
-        extra_input_args = ["-re"]
-        # work out audio source for these streamdetails
-        if streamdetails.stream_type == StreamType.CUSTOM:
-            audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
-                streamdetails,
-                seek_position=streamdetails.seek_position,
-            )
-        elif streamdetails.stream_type == StreamType.HLS:
-            substream = await get_hls_substream(self.mass, streamdetails.path)
-            audio_source = substream.path
-        else:
-            audio_source = streamdetails.path
-
-        # add support for decryption key provided in streamdetails
-        if streamdetails.decryption_key:
-            extra_input_args += ["-decryption_key", streamdetails.decryption_key]
-
-        async for chunk in get_ffmpeg_stream(
-            audio_input=audio_source,
-            input_format=streamdetails.audio_format,
-            output_format=output_format,
-            extra_input_args=extra_input_args,
-        ):
-            yield chunk
-
     def _log_request(self, request: web.Request) -> None:
         """Log request."""
         if not self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
index ef3774b08e738875daf413e1f6d325e782053b3c..9a94d6427c6dee5d02751e8024e5ffe82fadd72a 100644 (file)
@@ -510,6 +510,7 @@ async def get_media_stream(
                 VolumeNormalizationMode.FIXED_GAIN,
             )
             and (finished or (seconds_streamed >= 30))
+            and streamdetails.media_type != MediaType.PLUGIN_SOURCE
         ):
             # dynamic mode not allowed and no measurement known, we need to analyze the audio
             # add background task to start analyzing the audio
index d788af791313acb180357130931493dd14290399..fd4088d402687d78277460921cb1adc5f86eee84 100644 (file)
@@ -136,6 +136,7 @@ class SpotifyConnectProvider(MusicProvider):
         self._librespot_started = asyncio.Event()
         self._player_connected: bool = False
         self._current_streamdetails: StreamDetails | None = None
+        self._audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(60)
         self._on_unload_callbacks: list[Callable[..., None]] = [
             self.mass.subscribe(
                 self._on_mass_player_event,
@@ -207,13 +208,12 @@ class SpotifyConnectProvider(MusicProvider):
             item_id=CONNECT_ITEM_ID,
             provider=self.instance_id,
             audio_format=AudioFormat(
-                content_type=ContentType.PCM_S16LE,
+                content_type=ContentType.OGG,
             ),
             media_type=MediaType.PLUGIN_SOURCE,
             allow_seek=False,
             can_seek=False,
             stream_type=StreamType.CUSTOM,
-            extra_input_args=["-re"],
         )
         return streamdetails
 
@@ -224,19 +224,8 @@ class SpotifyConnectProvider(MusicProvider):
         if not self._librespot_proc or self._librespot_proc.closed:
             raise MediaNotFoundError(f"Librespot not ready for: {streamdetails.item_id}")
         self._player_connected = True
-        chunksize = get_chunksize(streamdetails.audio_format)
-        try:
-            async for chunk in self._librespot_proc.iter_chunked(chunksize):
-                if self._librespot_proc.closed or self._stop_called:
-                    break
-                yield chunk
-        finally:
-            self._player_connected = False
-            await asyncio.sleep(2)
-            if not self._player_connected:
-                # handle situation where the stream is disconnected from the MA player
-                # easiest way to unmark this librespot instance as active player is to close it
-                await self._librespot_proc.close(True)
+        while True:
+            yield await self._audio_buffer.get()
 
     async def _librespot_runner(self) -> None:
         """Run the spotify connect daemon in a background task."""
@@ -260,6 +249,7 @@ class SpotifyConnectProvider(MusicProvider):
                 "pipe",
                 "--dither",
                 "none",
+                "--passthrough",
                 # disable volume control
                 "--mixer",
                 "softvol",
@@ -276,20 +266,31 @@ class SpotifyConnectProvider(MusicProvider):
                 args, stdout=True, stderr=True, name=f"librespot[{name}]"
             )
             await librespot.start()
+
             # keep reading logging from stderr until exit
-            async for line in librespot.iter_stderr():
-                if (
-                    not self._librespot_started.is_set()
-                    and "Using StdoutSink (pipe) with format: S16" in line
-                ):
-                    self._librespot_started.set()
-                if "error sending packet Os" in line:
-                    continue
-                if "dropping truncated packet" in line:
-                    continue
-                if "couldn't parse packet from " in line:
-                    continue
-                self.logger.debug(line)
+            async def log_reader() -> None:
+                async for line in librespot.iter_stderr():
+                    if (
+                        not self._librespot_started.is_set()
+                        and "Using StdoutSink (pipe) with format: S16" in line
+                    ):
+                        self._librespot_started.set()
+                    if "error sending packet Os" in line:
+                        continue
+                    if "dropping truncated packet" in line:
+                        continue
+                    if "couldn't parse packet from " in line:
+                        continue
+                    self.logger.debug(line)
+
+            async def audio_reader() -> None:
+                chunksize = get_chunksize(AudioFormat(content_type=ContentType.OGG))
+                async for chunk in librespot.iter_chunked(chunksize):
+                    if librespot.closed or self._stop_called:
+                        break
+                    await self._audio_buffer.put(chunk)
+
+            await asyncio.gather(log_reader(), audio_reader())
         except asyncio.CancelledError:
             await librespot.close(True)
         finally: