Chore: Several tweaks to plugin source streams
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 21 Feb 2025 00:18:19 +0000 (01:18 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 21 Feb 2025 00:18:19 +0000 (01:18 +0100)
music_assistant/controllers/players.py
music_assistant/controllers/streams.py
music_assistant/helpers/ffmpeg.py
music_assistant/helpers/process.py
music_assistant/models/plugin.py
music_assistant/providers/spotify/bin/librespot-macos-arm64
music_assistant/providers/spotify_connect/__init__.py

index b7fc935ec6fce46713172fc601dca364be633fc9..b339732f887692b2c053ef98caa91300c5805294 100644 (file)
@@ -1635,10 +1635,6 @@ class PlayerController(CoreController):
     ) -> None:
         """Handle playback/select of given plugin source on player."""
         plugin_source = plugin_prov.get_source()
-        if plugin_prov.in_use_by and plugin_prov.in_use_by != player.player_id:
-            raise PlayerCommandFailed(
-                f"Source {plugin_source.name} is already in use by another player"
-            )
         player.active_source = plugin_source.id
         stream_url = self.mass.streams.get_plugin_source_url(plugin_source.id, player.player_id)
         await self.play_media(
@@ -1669,9 +1665,9 @@ class PlayerController(CoreController):
         for plugin_prov in self.mass.get_providers(ProviderType.PLUGIN):
             if ProviderFeature.AUDIO_SOURCE not in plugin_prov.supported_features:
                 continue
-            if plugin_prov.in_use_by and plugin_prov.in_use_by != player.player_id:
-                continue
             plugin_source = plugin_prov.get_source()
+            if plugin_source.in_use_by and plugin_source.in_use_by != player.player_id:
+                continue
             if plugin_source.id in player_source_ids:
                 continue
             player.source_list.append(plugin_source)
index 496b8b1c23ec3923fcdd8af4e162d70eb9e54c0d..1d7f13acff5dda980b3f12a6dbbe758a256fd7d8 100644 (file)
@@ -8,6 +8,7 @@ the upnp callbacks and json rpc api for slimproto clients.
 
 from __future__ import annotations
 
+import asyncio
 import os
 import urllib.parse
 from collections.abc import AsyncGenerator
@@ -592,6 +593,7 @@ class StreamsController(CoreController):
             "Accept-Ranges": "none",
             "Content-Type": f"audio/{output_format.output_format_str}",
         }
+
         resp = web.StreamResponse(
             status=200,
             reason="OK",
@@ -614,12 +616,6 @@ class StreamsController(CoreController):
             return resp
 
         # all checks passed, start streaming!
-        self.logger.debug(
-            "Start serving audio stream for PluginSource %s (%s) to %s",
-            plugin_source.name,
-            plugin_source.id,
-            player.display_name,
-        )
         async for chunk in self.get_plugin_source_stream(
             plugin_source_id=plugin_source_id,
             output_format=output_format,
@@ -867,29 +863,34 @@ class StreamsController(CoreController):
         player = self.mass.players.get(player_id)
         plugin_prov: PluginProvider = self.mass.get_provider(plugin_source_id)
         plugin_source = plugin_prov.get_source()
-        if plugin_prov.in_use_by and plugin_prov.in_use_by != player_id:
+        if plugin_source.in_use_by and plugin_source.in_use_by != player_id:
             raise RuntimeError(
-                f"PluginSource plugin_source.name is already in use by {plugin_prov.in_use_by}"
+                f"PluginSource plugin_source.name is already in use by {plugin_source.in_use_by}"
             )
+        self.logger.debug("Start streaming PluginSource %s to %s", plugin_source_id, player_id)
         audio_input = (
             plugin_prov.get_audio_stream(player_id)
             if plugin_source.stream_type == StreamType.CUSTOM
             else plugin_source.path
         )
-        chunk_size = int(get_chunksize(output_format, 1) / 10)
         player.active_source = plugin_source_id
+        plugin_source.in_use_by = player_id
         try:
             async for chunk in get_ffmpeg_stream(
                 audio_input=audio_input,
                 input_format=plugin_source.audio_format,
                 output_format=output_format,
-                chunk_size=chunk_size,
                 filter_params=player_filter_params,
                 extra_input_args=["-re"],
             ):
                 yield chunk
         finally:
+            self.logger.debug(
+                "Finished streaming PluginSource %s to %s", plugin_source_id, player_id
+            )
+            await asyncio.sleep(0.5)
             player.active_source = player.player_id
+            plugin_source.in_use_by = None
 
     async def get_queue_item_stream(
         self,
index b19ffb4ebb355753b1b16ecb7657bca69b10ff50..7c1c8d83cd37f274161d30da66cc028edfd79ca9 100644 (file)
@@ -226,9 +226,12 @@ def get_ffmpeg_args(
         "-ignore_unknown",
         "-protocol_whitelist",
         "file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp,concat",
+        "-probesize",
+        "8192",
     ]
     # collect input args
     input_args = []
+
     if extra_input_args:
         input_args += extra_input_args
     if input_path.startswith("http"):
index 5becba6113709dc61316e6d4290c3598a35cd90e..2503632e3ed963f0f1e6b9ca59020271a2991302 100644 (file)
@@ -90,27 +90,12 @@ class AsyncProcess:
 
     async def start(self) -> None:
         """Perform Async init of process."""
-        for attempt in range(2):
-            try:
-                self.proc = await asyncio.create_subprocess_exec(
-                    *self._args,
-                    stdin=asyncio.subprocess.PIPE if self._stdin is True else self._stdin,
-                    stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout,
-                    stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr,
-                    # because we're exchanging big amounts of (audio) data with pipes
-                    # it makes sense to extend the pipe size and (buffer) limits a bit
-                    limit=1000000 if attempt == 0 else 65536,
-                    pipesize=1000000 if attempt == 0 else -1,
-                )
-                break
-            except PermissionError:
-                if attempt > 0:
-                    raise
-                LOGGER.error(
-                    "Detected that you are running the (docker) container without "
-                    "permissive access rights. This will impact performance !"
-                )
-
+        self.proc = await asyncio.create_subprocess_exec(
+            *self._args,
+            stdin=asyncio.subprocess.PIPE if self._stdin is True else self._stdin,
+            stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout,
+            stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr,
+        )
         self.logger.log(
             VERBOSE_LOG_LEVEL, "Process %s started with PID %s", self.name, self.proc.pid
         )
index d0023cefaefde7b6c0fa915695383fa1ab9f7dc1..e8d77461016e33570b1b1a8a592422308c67391a 100644 (file)
@@ -56,6 +56,13 @@ class PluginSource(PlayerSource):
         metadata=field_options(serialize="omit", deserialize=pass_through),
         repr=False,
     )
+    # in_use_by specifies the player id that is currently using this plugin (if any)
+    in_use_by: str | None = field(
+        default=None,
+        compare=False,
+        metadata=field_options(serialize="omit", deserialize=pass_through),
+        repr=False,
+    )
 
 
 class PluginProvider(Provider):
@@ -65,14 +72,6 @@ class PluginProvider(Provider):
     Plugin Provider implementations should inherit from this base model.
     """
 
-    @property
-    def in_use_by(self) -> str | None:
-        """Return player id that is currently using this plugin (if any)."""
-        for player in self.mass.players:
-            if player.active_source == self.lookup_key:
-                return player.player_id
-        return None
-
     def get_source(self) -> PluginSource:  # type: ignore[return]
         """Get (audio)source details for this plugin."""
         # Will only be called if ProviderFeature.AUDIO_SOURCE is declared
index 730bb78b336d55d592060c673e7993b802f25744..bb8d47154e2cea17e7ebf6a52bef6c62ec2fc686 100755 (executable)
Binary files a/music_assistant/providers/spotify/bin/librespot-macos-arm64 and b/music_assistant/providers/spotify/bin/librespot-macos-arm64 differ
index 55a6233fc2fb3232135cf28752a380570465e6b6..eadbfc3859d25f9fcced1b4402edad286dc85855 100644 (file)
@@ -272,14 +272,13 @@ class SpotifyConnectProvider(PluginProvider):
         # handle session connected event
         # this player has become the active spotify connect player
         # we need to start the playback
-        if json_data.get("event") in ("sink",) and (
-            not self.in_use_by
-            or ((player := self.mass.players.get(self.in_use_by)) and player.state == "idle")
-        ):
+        if json_data.get("event") in ("sink", "playing") and (not self._source_details.in_use_by):
             # initiate playback by selecting this source on the default player
+            self.logger.error("Initiating playback on %s", self.mass_player_id)
             self.mass.create_task(
                 self.mass.players.select_source(self.mass_player_id, self.lookup_key)
             )
+            self._source_details.in_use_by = self.mass_player_id
 
         # parse metadata fields
         if "common_metadata_fields" in json_data: