Chore: Small tweaks to plugin source playback
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 16 Feb 2025 22:46:39 +0000 (23:46 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 16 Feb 2025 22:46:39 +0000 (23:46 +0100)
music_assistant/controllers/players.py
music_assistant/controllers/streams.py
music_assistant/models/plugin.py
music_assistant/providers/airplay/provider.py
music_assistant/providers/snapcast/__init__.py
music_assistant/providers/spotify_connect/__init__.py

index d96ccd5a93b1c482efc34468c4c861fa0eee17bc..4ade6123ccdf9d730b2da20bfa58d71fad23fc2b 100644 (file)
@@ -720,10 +720,12 @@ class PlayerController(CoreController):
         if player.synced_to or player.active_group:
             raise PlayerCommandFailed(f"Player {player.display_name} is currently grouped")
         # check if player is already playing and source is different
-        # in that case we to stop the player first
-        if source != player.active_source and player.state != PlayerState.IDLE:
+        # in that case we need to stop the player first
+        prev_source = player.active_source
+        if prev_source and source != prev_source and player.state != PlayerState.IDLE:
             await self.cmd_stop(player_id)
             await asyncio.sleep(0.5)  # small delay to allow stop to process
+            player.active_source = None
         # check if source is a pluginsource
         # in that case the source id is the lookup_key of the plugin provider
         if plugin_prov := self.mass.get_provider(source):
@@ -733,8 +735,6 @@ class PlayerController(CoreController):
         # this can be used to restore the queue after a source switch
         if mass_queue := self.mass.player_queues.get(source):
             player.active_source = mass_queue.queue_id
-            if mass_queue.items:
-                await self.mass.player_queues.play(mass_queue.queue_id)
             return
         # basic check if player supports source selection
         if PlayerFeature.SELECT_SOURCE not in player.supported_features:
@@ -1401,9 +1401,9 @@ class PlayerController(CoreController):
             return self._get_active_source(group_player)
         # if player has plugin source active return that
         for plugin_source in self._get_plugin_sources():
-            if (
-                plugin_source.in_use_by == player.player_id
-            ) or player.active_source == plugin_source.id:
+            if player.active_source == plugin_source.id or (
+                player.current_media and plugin_source.id == player.current_media.queue_id
+            ):
                 # copy/set current media if available
                 if plugin_source.metadata:
                     player.set_current_media(
@@ -1630,11 +1630,10 @@ class PlayerController(CoreController):
     ) -> None:
         """Handle playback/select of given plugin source on player."""
         plugin_source = plugin_prov.get_source()
-        if plugin_source.in_use_by and plugin_source.in_use_by != player.player_id:
+        if plugin_prov.in_use_by and plugin_prov.in_use_by != player.player_id:
             raise PlayerCommandFailed(
                 f"Source {plugin_source.name} is already in use by another player"
             )
-        plugin_source.in_use_by = player.player_id
         player.active_source = plugin_source.id
         stream_url = self.mass.streams.get_plugin_source_url(plugin_source.id, player.player_id)
         await self.play_media(
@@ -1643,6 +1642,7 @@ class PlayerController(CoreController):
                 uri=stream_url,
                 media_type=MediaType.PLUGIN_SOURCE,
                 title=plugin_source.name,
+                queue_id=plugin_source.id,
                 custom_data={
                     "provider": plugin_source.id,
                     "audio_format": plugin_source.audio_format,
@@ -1661,9 +1661,12 @@ class PlayerController(CoreController):
     def _set_player_sources(self, player: Player) -> None:
         """Set all available player sources."""
         player_source_ids = [x.id for x in player.source_list]
-        for plugin_source in self._get_plugin_sources():
-            if plugin_source.id in player_source_ids:
+        for plugin_prov in self.mass.get_providers(ProviderType.PLUGIN):
+            if ProviderFeature.AUDIO_SOURCE not in plugin_prov.supported_features:
+                continue
+            if plugin_prov.in_use_by and plugin_prov.in_use_by != player.player_id:
                 continue
-            if plugin_source.passive and plugin_source.in_use_by != player.player_id:
+            plugin_source = plugin_prov.get_source()
+            if plugin_source.id in player_source_ids:
                 continue
             player.source_list.append(plugin_source)
index 6aae97e850fea9917031e416015e2b96ab4bd12e..b2213731509ea58db88c439c8ddde6b5236f364c 100644 (file)
@@ -866,32 +866,29 @@ class StreamsController(CoreController):
         player_filter_params: list[str] | None = None,
     ) -> AsyncGenerator[bytes, None]:
         """Get the special plugin source stream."""
-        provider: PluginProvider = self.mass.get_provider(plugin_source_id)
-        plugin_source = provider.get_source()
-        if plugin_source.in_use_by and plugin_source.in_use_by != player_id:
+        player = self.mass.players.get(player_id)
+        plugin_prov: PluginProvider = self.mass.get_provider(plugin_source_id)
+        plugin_source = plugin_prov.get_source()
+        if plugin_prov.in_use_by and plugin_prov.in_use_by != player_id:
             raise RuntimeError(
-                f"PluginSource plugin_source.name is already in use by {plugin_source.in_use_by}"
+                f"PluginSource plugin_source.name is already in use by {plugin_prov.in_use_by}"
             )
-
         audio_input = (
-            provider.get_audio_stream(player_id)
+            plugin_prov.get_audio_stream(player_id)
             if plugin_source.stream_type == StreamType.CUSTOM
             else plugin_source.path
         )
         chunk_size = int(get_chunksize(output_format, 1) / 10)
-        try:
-            plugin_source.in_use_by = player_id
-            async for chunk in get_ffmpeg_stream(
-                audio_input=audio_input,
-                input_format=plugin_source.audio_format,
-                output_format=output_format,
-                chunk_size=chunk_size,
-                filter_params=player_filter_params,
-                extra_input_args=["-re"],
-            ):
-                yield chunk
-        finally:
-            plugin_source.in_use_by = None
+        player.active_source = plugin_source_id
+        async for chunk in get_ffmpeg_stream(
+            audio_input=audio_input,
+            input_format=plugin_source.audio_format,
+            output_format=output_format,
+            chunk_size=chunk_size,
+            filter_params=player_filter_params,
+            extra_input_args=["-re"],
+        ):
+            yield chunk
 
     async def get_queue_item_stream(
         self,
index 9bb01aefe2a04862713148d917968dd246dc6707..d0023cefaefde7b6c0fa915695383fa1ab9f7dc1 100644 (file)
@@ -33,14 +33,6 @@ class PluginSource(PlayerSource):
         repr=False,
     )
 
-    # use this if the plugin can only provide a source to a single player at a time
-    in_use_by: str | None = field(
-        default=None,
-        compare=False,
-        metadata=field_options(serialize="omit", deserialize=pass_through),
-        repr=False,
-    )
-
     # metadata of the current playing media (if known)
     metadata: PlayerMedia | None = field(
         default=None,
@@ -73,6 +65,14 @@ class PluginProvider(Provider):
     Plugin Provider implementations should inherit from this base model.
     """
 
+    @property
+    def in_use_by(self) -> str | None:
+        """Return player id that is currently using this plugin (if any)."""
+        for player in self.mass.players:
+            if player.active_source == self.lookup_key:
+                return player.player_id
+        return None
+
     def get_source(self) -> PluginSource:  # type: ignore[return]
         """Get (audio)source details for this plugin."""
         # Will only be called if ProviderFeature.AUDIO_SOURCE is declared
index 6d3c850c94d1955196ab868b86bda5ee659561c5..369cbd0b46f8241391ec209b34b2c0b0f1b7b234 100644 (file)
@@ -17,6 +17,7 @@ from music_assistant_models.enums import (
     PlayerState,
     PlayerType,
     ProviderFeature,
+    StreamType,
 )
 from music_assistant_models.media_items import AudioFormat
 from music_assistant_models.player import DeviceInfo, Player, PlayerMedia
@@ -38,6 +39,7 @@ from music_assistant.helpers.datetime import utc
 from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
 from music_assistant.helpers.util import TaskManager, get_ip_pton, lock, select_free_port
 from music_assistant.models.player_provider import PlayerProvider
+from music_assistant.models.plugin import PluginProvider
 from music_assistant.providers.airplay.raop import RaopStreamSession
 from music_assistant.providers.player_group import PlayerGroupProvider
 
@@ -302,12 +304,16 @@ class AirplayProvider(PlayerProvider):
             )
         elif media.media_type == MediaType.PLUGIN_SOURCE:
             # special case: plugin source stream
-            input_format = AIRPLAY_PCM_FORMAT
+            # consume the stream directly, so we can skip one step in between
             assert media.custom_data is not None  # for type checking
-            audio_source = self.mass.streams.get_plugin_source_stream(
-                plugin_source_id=media.custom_data["provider"],
-                output_format=AIRPLAY_PCM_FORMAT,
-                player_id=player_id,
+            provider = cast(PluginProvider, self.mass.get_provider(media.custom_data["provider"]))
+            plugin_source = provider.get_source()
+            assert plugin_source.audio_format is not None  # for type checking
+            input_format = plugin_source.audio_format
+            audio_source = (
+                provider.get_audio_stream(player_id)  # type: ignore[assignment]
+                if plugin_source.stream_type == StreamType.CUSTOM
+                else cast(str, plugin_source.path)
             )
         elif media.queue_id and media.queue_id.startswith("ugp_"):
             # special case: UGP stream
index 2191b0e7065bf18a38d17ff468bdacdcef5ca437..1cae193f4ef6d79ac83e3666f528ffd3cc68ae61 100644 (file)
@@ -22,6 +22,7 @@ from music_assistant_models.enums import (
     PlayerState,
     PlayerType,
     ProviderFeature,
+    StreamType,
 )
 from music_assistant_models.errors import SetupFailedError
 from music_assistant_models.media_items import AudioFormat
@@ -42,6 +43,7 @@ from music_assistant.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_
 from music_assistant.helpers.process import AsyncProcess, check_output
 from music_assistant.helpers.util import get_ip_pton
 from music_assistant.models.player_provider import PlayerProvider
+from music_assistant.models.plugin import PluginProvider
 
 if TYPE_CHECKING:
     from music_assistant_models.config_entries import ProviderConfig
@@ -433,11 +435,16 @@ class SnapCastProvider(PlayerProvider):
         if stream_task := self._stream_tasks.pop(player_id, None):
             if not stream_task.done():
                 stream_task.cancel()
+                with suppress(asyncio.CancelledError):
+                    await stream_task
+        # assign default/empty stream to the player
+        await self._get_snapgroup(player_id).set_stream("default")
+        await asyncio.sleep(0.5)
         player.state = PlayerState.IDLE
+        player.current_media = None
+        player.active_source = None
         self._set_childs_state(player_id)
         self.mass.players.update(player_id)
-        # assign default/empty stream to the player
-        await self._get_snapgroup(player_id).set_stream("default")
 
     async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
         """Send MUTE command to given player."""
@@ -509,18 +516,15 @@ class SnapCastProvider(PlayerProvider):
         elif media.media_type == MediaType.PLUGIN_SOURCE:
             # special case: plugin source stream
             # consume the stream directly, so we can skip one step in between
-            # provider: PluginProvider = self.mass.get_provider(media.custom_data["provider"])
-            # plugin_source = provider.get_source()
-            # audio_source = (
-            #     provider.get_audio_stream(player_id)
-            #     if plugin_source.stream_type == StreamType.CUSTOM
-            #     else plugin_source.path
-            # )
-            input_format = DEFAULT_SNAPCAST_FORMAT
-            audio_source = self.mass.streams.get_plugin_source_stream(
-                plugin_source_id=media.custom_data["provider"],
-                output_format=DEFAULT_SNAPCAST_FORMAT,
-                player_id=player_id,
+            assert media.custom_data is not None  # for type checking
+            provider = cast(PluginProvider, self.mass.get_provider(media.custom_data["provider"]))
+            plugin_source = provider.get_source()
+            assert plugin_source.audio_format is not None  # for type checking
+            input_format = plugin_source.audio_format
+            audio_source = (
+                provider.get_audio_stream(player_id)
+                if plugin_source.stream_type == StreamType.CUSTOM
+                else plugin_source.path
             )
         elif media.queue_id.startswith("ugp_"):
             # special case: UGP stream
@@ -568,6 +572,7 @@ class SnapCastProvider(PlayerProvider):
                         self.mass, player_id, input_format, DEFAULT_SNAPCAST_FORMAT
                     ),
                     audio_output=stream_path,
+                    extra_input_args=["-re"],
                 ) as ffmpeg_proc:
                     player.state = PlayerState.PLAYING
                     player.current_media = media
index 2e654dfaa16dfe73324641f1dc51b99b79e5a7ba..7c96224d5c6fc09ef6273ae0f76a476254918dd5 100644 (file)
@@ -12,6 +12,7 @@ import asyncio
 import os
 import pathlib
 from collections.abc import Callable
+from contextlib import suppress
 from typing import TYPE_CHECKING, cast
 
 from aiohttp.web import Response
@@ -172,6 +173,8 @@ class SpotifyConnectProvider(PluginProvider):
         self._stop_called = True
         if self._runner_task and not self._runner_task.done():
             self._runner_task.cancel()
+            with suppress(asyncio.CancelledError):
+                await self._runner_task
         for callback in self._on_unload_callbacks:
             callback()
 
@@ -185,9 +188,9 @@ class SpotifyConnectProvider(PluginProvider):
         self.logger.info("Starting Spotify Connect background daemon")
         os.environ["MASS_CALLBACK"] = f"{self.mass.streams.base_url}/{self.instance_id}"
         await check_output("rm", "-f", self.named_pipe)
-        await asyncio.sleep(0.5)
+        await asyncio.sleep(0.1)
         await check_output("mkfifo", self.named_pipe)
-        await asyncio.sleep(0.5)
+        await asyncio.sleep(0.1)
         try:
             args: list[str] = [
                 self._librespot_bin,
@@ -236,10 +239,8 @@ class SpotifyConnectProvider(PluginProvider):
                 if "couldn't parse packet from " in line:
                     continue
                 self.logger.debug(line)
-
-        except asyncio.CancelledError:
-            await librespot.close(True)
         finally:
+            await librespot.close(True)
             self.logger.info("Spotify Connect background daemon stopped for %s", self.name)
             await check_output("rm", "-f", self.named_pipe)
             # auto restart if not stopped manually
@@ -271,11 +272,9 @@ class SpotifyConnectProvider(PluginProvider):
         # handle session connected event
         # this player has become the active spotify connect player
         # we need to start the playback
-        self.logger.error("%s - %s", self.name, json_data.get("event"))
-        if not self._source_details.in_use_by and json_data.get("event") in (
-            # "session_connected",
-            # "loading",
-            "sink",
+        if json_data.get("event") in ("sink",) and (
+            not self.in_use_by
+            or ((player := self.mass.players.get(self.in_use_by)) and player.state == "idle")
         ):
             # initiate playback by selecting this source on the default player
             self.mass.create_task(