Feat: Handle playback of plugin source as player source
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 16 Feb 2025 17:24:24 +0000 (18:24 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 16 Feb 2025 17:24:24 +0000 (18:24 +0100)
Handle playback of plugin source as player source and implement this in Spotify Connect

19 files changed:
music_assistant/controllers/player_queues.py
music_assistant/controllers/players.py
music_assistant/controllers/streams.py
music_assistant/helpers/audio.py
music_assistant/helpers/ffmpeg.py
music_assistant/mass.py
music_assistant/models/plugin.py
music_assistant/providers/_template_player_provider/__init__.py
music_assistant/providers/_template_plugin_provider/__init__.py
music_assistant/providers/airplay/provider.py
music_assistant/providers/airplay/raop.py
music_assistant/providers/hass/__init__.py
music_assistant/providers/player_group/__init__.py
music_assistant/providers/siriusxm/__init__.py
music_assistant/providers/slimproto/__init__.py
music_assistant/providers/snapcast/__init__.py
music_assistant/providers/sonos/const.py
music_assistant/providers/sonos/provider.py
music_assistant/providers/spotify_connect/__init__.py

index 0c7b4f1092f31cbaf70e15b829441a48bfbd1ad6..7a570ec985c341330f6326f10e32ce7c065a4194 100644 (file)
@@ -785,8 +785,8 @@ class PlayerQueuesController(CoreController):
         queue.index_in_buffer = index
         queue.flow_mode_stream_log = []
         queue.flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE)
-        # no point in enabled flow mode for radio or plugin sources
-        if queue_item.media_type in (MediaType.RADIO, MediaType.PLUGIN_SOURCE):
+        # no point in enabling flow mode for radio sources
+        if queue_item.media_type == MediaType.RADIO:
             queue.flow_mode = False
         queue.current_item = queue_item
 
index d233903d2ace2e841940bd6f963dc622ca9c49b8..d96ccd5a93b1c482efc34468c4c861fa0eee17bc 100644 (file)
@@ -25,6 +25,7 @@ from music_assistant_models.enums import (
     PlayerFeature,
     PlayerState,
     PlayerType,
+    ProviderFeature,
     ProviderType,
 )
 from music_assistant_models.errors import (
@@ -57,6 +58,7 @@ from music_assistant.helpers.throttle_retry import Throttler
 from music_assistant.helpers.util import TaskManager, get_changed_values
 from music_assistant.models.core_controller import CoreController
 from music_assistant.models.player_provider import PlayerProvider
+from music_assistant.models.plugin import PluginProvider, PluginSource
 
 if TYPE_CHECKING:
     from collections.abc import Awaitable, Callable, Coroutine, Iterator
@@ -715,6 +717,25 @@ class PlayerController(CoreController):
         - source: The ID of the source that needs to be activated/selected.
         """
         player = self.get(player_id, True)
+        if player.synced_to or player.active_group:
+            raise PlayerCommandFailed(f"Player {player.display_name} is currently grouped")
+        # check if player is already playing and source is different
+        # in that case we to stop the player first
+        if source != player.active_source and player.state != PlayerState.IDLE:
+            await self.cmd_stop(player_id)
+            await asyncio.sleep(0.5)  # small delay to allow stop to process
+        # check if source is a pluginsource
+        # in that case the source id is the lookup_key of the plugin provider
+        if plugin_prov := self.mass.get_provider(source):
+            await self._handle_select_plugin_source(player, plugin_prov)
+            return
+        # check if source is a mass queue
+        # this can be used to restore the queue after a source switch
+        if mass_queue := self.mass.player_queues.get(source):
+            player.active_source = mass_queue.queue_id
+            if mass_queue.items:
+                await self.mass.player_queues.play(mass_queue.queue_id)
+            return
         # basic check if player supports source selection
         if PlayerFeature.SELECT_SOURCE not in player.supported_features:
             raise UnsupportedFeaturedException(
@@ -982,6 +1003,8 @@ class PlayerController(CoreController):
         player = self._players[player_id]
         prev_state = self._prev_states.get(player_id, {})
         player.active_source = self._get_active_source(player)
+        # set player sources
+        self._set_player_sources(player)
         # prefer any overridden name from config
         player.display_name = (
             self.mass.config.get_raw_player_config_value(player.player_id, "name")
@@ -1376,6 +1399,23 @@ class PlayerController(CoreController):
         # if player has group active, return those details
         if player.active_group and (group_player := self.get(player.active_group)):
             return self._get_active_source(group_player)
+        # if player has plugin source active return that
+        for plugin_source in self._get_plugin_sources():
+            if (
+                plugin_source.in_use_by == player.player_id
+            ) or player.active_source == plugin_source.id:
+                # copy/set current media if available
+                if plugin_source.metadata:
+                    player.set_current_media(
+                        uri=plugin_source.metadata.uri,
+                        media_type=plugin_source.metadata.media_type,
+                        title=plugin_source.metadata.title,
+                        artist=plugin_source.metadata.artist,
+                        album=plugin_source.metadata.album,
+                        image_url=plugin_source.metadata.image_url,
+                        duration=plugin_source.metadata.duration,
+                    )
+                return plugin_source.id
         # defaults to the player's own player id if no active source set
         return player.active_source or player.player_id
 
@@ -1584,3 +1624,46 @@ class PlayerController(CoreController):
                         # always update player state
                         self.mass.loop.call_soon(self.update, player_id)
             await asyncio.sleep(1)
+
+    async def _handle_select_plugin_source(
+        self, player: Player, plugin_prov: PluginProvider
+    ) -> None:
+        """Handle playback/select of given plugin source on player."""
+        plugin_source = plugin_prov.get_source()
+        if plugin_source.in_use_by and plugin_source.in_use_by != player.player_id:
+            raise PlayerCommandFailed(
+                f"Source {plugin_source.name} is already in use by another player"
+            )
+        plugin_source.in_use_by = player.player_id
+        player.active_source = plugin_source.id
+        stream_url = self.mass.streams.get_plugin_source_url(plugin_source.id, player.player_id)
+        await self.play_media(
+            player_id=player.player_id,
+            media=PlayerMedia(
+                uri=stream_url,
+                media_type=MediaType.PLUGIN_SOURCE,
+                title=plugin_source.name,
+                custom_data={
+                    "provider": plugin_source.id,
+                    "audio_format": plugin_source.audio_format,
+                },
+            ),
+        )
+
+    def _get_plugin_sources(self) -> list[PluginSource]:
+        """Return all available plugin sources."""
+        return [
+            plugin_prov.get_source()
+            for plugin_prov in self.mass.get_providers(ProviderType.PLUGIN)
+            if ProviderFeature.AUDIO_SOURCE in plugin_prov.supported_features
+        ]
+
+    def _set_player_sources(self, player: Player) -> None:
+        """Set all available player sources."""
+        player_source_ids = [x.id for x in player.source_list]
+        for plugin_source in self._get_plugin_sources():
+            if plugin_source.id in player_source_ids:
+                continue
+            if plugin_source.passive and plugin_source.in_use_by != player.player_id:
+                continue
+            player.source_list.append(plugin_source)
index dfe6934ee06a1201d2f19708ddfec902cd7b22cd..6aae97e850fea9917031e416015e2b96ab4bd12e 100644 (file)
@@ -65,13 +65,13 @@ from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stre
 from music_assistant.helpers.util import get_ip, get_ips, select_free_port, try_parse_bool
 from music_assistant.helpers.webserver import Webserver
 from music_assistant.models.core_controller import CoreController
+from music_assistant.models.plugin import PluginProvider
 
 if TYPE_CHECKING:
     from music_assistant_models.config_entries import CoreConfig
     from music_assistant_models.player import Player
     from music_assistant_models.player_queue import PlayerQueue
     from music_assistant_models.queue_item import QueueItem
-    from music_assistant_models.streamdetails import StreamDetails
 
 
 isfile = wrap(os.path.isfile)
@@ -233,6 +233,11 @@ class StreamsController(CoreController):
                     "/announcement/{player_id}.{fmt}",
                     self.serve_announcement_stream,
                 ),
+                (
+                    "*",
+                    "/pluginsource/{plugin_source}/{player_id}.{fmt}",
+                    self.serve_plugin_source_stream,
+                ),
             ],
         )
 
@@ -295,6 +300,8 @@ class StreamsController(CoreController):
         headers = {
             **DEFAULT_STREAM_HEADERS,
             "icy-name": queue_item.name,
+            "Accept-Ranges": "none",
+            "Content-Type": f"audio/{output_format.output_format_str}",
         }
         resp = web.StreamResponse(
             status=200,
@@ -339,8 +346,8 @@ class StreamsController(CoreController):
         self.mass.player_queues.track_loaded_in_buffer(queue_id, queue_item_id)
 
         async for chunk in get_ffmpeg_stream(
-            audio_input=self.get_media_stream(
-                streamdetails=queue_item.streamdetails,
+            audio_input=self.get_queue_item_stream(
+                queue_item=queue_item,
                 pcm_format=pcm_format,
             ),
             input_format=pcm_format,
@@ -434,7 +441,7 @@ class StreamsController(CoreController):
         self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
 
         async for chunk in get_ffmpeg_stream(
-            audio_input=self.get_flow_stream(
+            audio_input=self.get_queue_flow_stream(
                 queue=queue,
                 start_queue_item=start_queue_item,
                 pcm_format=flow_pcm_format,
@@ -562,6 +569,73 @@ class StreamsController(CoreController):
 
         return resp
 
+    async def serve_plugin_source_stream(self, request: web.Request) -> web.Response:
+        """Stream PluginSource audio to a player."""
+        self._log_request(request)
+        plugin_source_id = request.match_info["plugin_source"]
+        provider: PluginProvider | None
+        if not (provider := self.mass.get_provider(plugin_source_id)):
+            raise web.HTTPNotFound(reason=f"Unknown PluginSource: {plugin_source_id}")
+        # work out output format/details
+        player_id = request.match_info["player_id"]
+        player = self.mass.players.get(player_id)
+        if not player:
+            raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
+        plugin_source = provider.get_source()
+        output_format = await self.get_output_format(
+            output_format_str=request.match_info["fmt"],
+            player=player,
+            default_sample_rate=plugin_source.audio_format.sample_rate,
+            default_bit_depth=plugin_source.audio_format.bit_depth,
+        )
+        headers = {
+            **DEFAULT_STREAM_HEADERS,
+            "icy-name": plugin_source.name,
+            "Accept-Ranges": "none",
+            "Content-Type": f"audio/{output_format.output_format_str}",
+        }
+        resp = web.StreamResponse(
+            status=200,
+            reason="OK",
+            headers=headers,
+        )
+        resp.content_type = f"audio/{output_format.output_format_str}"
+        http_profile: str = await self.mass.config.get_player_config_value(
+            player_id, CONF_HTTP_PROFILE
+        )
+        if http_profile == "forced_content_length":
+            # guess content length based on duration
+            resp.content_length = get_chunksize(output_format, 12 * 3600)
+        elif http_profile == "chunked":
+            resp.enable_chunked_encoding()
+
+        await resp.prepare(request)
+
+        # return early if this is not a GET request
+        if request.method != "GET":
+            return resp
+
+        # all checks passed, start streaming!
+        self.logger.debug(
+            "Start serving audio stream for PluginSource %s (%s) to %s",
+            plugin_source.name,
+            plugin_source.id,
+            player.display_name,
+        )
+        async for chunk in self.get_plugin_source_stream(
+            plugin_source_id=plugin_source_id,
+            output_format=output_format,
+            player_id=player_id,
+            player_filter_params=get_player_filter_params(
+                self.mass, player_id, plugin_source.audio_format, output_format
+            ),
+        ):
+            try:
+                await resp.write(chunk)
+            except (BrokenPipeError, ConnectionResetError, ConnectionError):
+                break
+        return resp
+
     def get_command_url(self, player_or_queue_id: str, command: str) -> str:
         """Get the url for the special command stream."""
         return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
@@ -582,8 +656,7 @@ class StreamsController(CoreController):
 
     def get_plugin_source_url(
         self,
-        provider: str,
-        source_id: str,
+        plugin_source: str,
         player_id: str,
         output_codec: ContentType = ContentType.FLAC,
     ) -> str:
@@ -592,9 +665,9 @@ class StreamsController(CoreController):
         # handle raw pcm without exact format specifiers
         if output_codec.is_pcm() and ";" not in fmt:
             fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
-        return f"{self._server.base_url}/pluginsource/{provider}/{source_id}/{player_id}.{fmt}"
+        return f"{self._server.base_url}/pluginsource/{plugin_source}/{player_id}.{fmt}"
 
-    async def get_flow_stream(
+    async def get_queue_flow_stream(
         self,
         queue: PlayerQueue,
         start_queue_item: QueueItem,
@@ -664,8 +737,8 @@ class StreamsController(CoreController):
             bytes_written = 0
             buffer = b""
             # handle incoming audio chunks
-            async for chunk in self.get_media_stream(
-                queue_track.streamdetails,
+            async for chunk in self.get_queue_item_stream(
+                queue_track,
                 pcm_format=pcm_format,
             ):
                 # buffer size needs to be big enough to include the crossfade part
@@ -785,13 +858,50 @@ class StreamsController(CoreController):
         ):
             yield chunk
 
-    async def get_media_stream(
+    async def get_plugin_source_stream(
+        self,
+        plugin_source_id: str,
+        output_format: AudioFormat,
+        player_id: str,
+        player_filter_params: list[str] | None = None,
+    ) -> AsyncGenerator[bytes, None]:
+        """Get the special plugin source stream."""
+        provider: PluginProvider = self.mass.get_provider(plugin_source_id)
+        plugin_source = provider.get_source()
+        if plugin_source.in_use_by and plugin_source.in_use_by != player_id:
+            raise RuntimeError(
+                f"PluginSource plugin_source.name is already in use by {plugin_source.in_use_by}"
+            )
+
+        audio_input = (
+            provider.get_audio_stream(player_id)
+            if plugin_source.stream_type == StreamType.CUSTOM
+            else plugin_source.path
+        )
+        chunk_size = int(get_chunksize(output_format, 1) / 10)
+        try:
+            plugin_source.in_use_by = player_id
+            async for chunk in get_ffmpeg_stream(
+                audio_input=audio_input,
+                input_format=plugin_source.audio_format,
+                output_format=output_format,
+                chunk_size=chunk_size,
+                filter_params=player_filter_params,
+                extra_input_args=["-re"],
+            ):
+                yield chunk
+        finally:
+            plugin_source.in_use_by = None
+
+    async def get_queue_item_stream(
         self,
-        streamdetails: StreamDetails,
+        queue_item: QueueItem,
         pcm_format: AudioFormat,
-    ) -> AsyncGenerator[tuple[bool, bytes], None]:
-        """Get the audio stream for the given streamdetails as raw pcm chunks."""
+    ) -> AsyncGenerator[bytes, None]:
+        """Get the audio stream for a single queue item as raw PCM audio."""
         # collect all arguments for ffmpeg
+        streamdetails = queue_item.streamdetails
+        assert streamdetails
         filter_params = []
         extra_input_args = streamdetails.extra_input_args or []
         # handle volume normalization
index 165b67a08ecdc8f56d4887befe6cbe9eaf6e2663..c0d40711fefbf39d990b7b233ee7dcb865adc9ca 100644 (file)
@@ -398,6 +398,7 @@ async def get_media_stream(
         collect_log_history=True,
         loglevel="debug" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "info",
     )
+
     try:
         await ffmpeg_proc.start()
         logger.debug(
@@ -543,7 +544,6 @@ async def get_media_stream(
                 VolumeNormalizationMode.FIXED_GAIN,
             )
             and (finished or (seconds_streamed >= 30))
-            and streamdetails.media_type != MediaType.PLUGIN_SOURCE
         ):
             # dynamic mode not allowed and no measurement known, we need to analyze the audio
             # add background task to start analyzing the audio
index 7c68bacae2d3c7f304dcac00bc075aae4e44d621..c4722826f279db71c6b32f1e2070775b950caa4b 100644 (file)
@@ -17,7 +17,7 @@ from music_assistant_models.helpers import get_global_cache_value, set_global_ca
 from music_assistant.constants import VERBOSE_LOG_LEVEL
 
 from .process import AsyncProcess, check_output
-from .util import TimedAsyncGenerator, close_async_generator
+from .util import close_async_generator
 
 if TYPE_CHECKING:
     from music_assistant_models.media_items import AudioFormat
@@ -147,7 +147,11 @@ class FFMpeg(AsyncProcess):
             # for data to arrive (e.g. when there is X amount of seconds in the buffer)
             # so this timeout is just to catch if the source is stuck and rpeort it and not
             # to recover from it.
-            async for chunk in TimedAsyncGenerator(self.audio_input, timeout=300):
+            # async for chunk in TimedAsyncGenerator(self.audio_input, timeout=300):
+            #     if self.closed:
+            #         return
+            #     await self.write(chunk)
+            async for chunk in self.audio_input:
                 if self.closed:
                     return
                 await self.write(chunk)
@@ -291,6 +295,8 @@ def get_ffmpeg_args(
             "-ac",
             str(output_format.channels),
         ]
+        if not output_format.content_type.is_pcm() and output_format.content_type.is_lossless():
+            output_args += ["-sample_fmt", f"s{output_format.bit_depth}"]
         if output_format.output_format_str == "flac":
             # use level 0 compression for fastest encoding
             output_args += ["-compression_level", "0"]
@@ -346,11 +352,12 @@ async def check_ffmpeg_version() -> None:
             "Please install ffmpeg on your OS to enable playback."
         )
     if returncode != 0:
-        raise AudioError(
-            "Error determining FFmpeg version on your system."
-            "Your CPU may be too old to run this version of FFmpeg."
-            f"Additional info: {returncode} {output.decode().strip()}"
-        )
+        err_msg = "Error determining FFmpeg version on your system."
+        if returncode < 0:
+            # error below 0 is often illegal instruction
+            err_msg += " - Your CPU may be too old to run this version of FFmpeg."
+        err_msg += f" - Additional info: {returncode} {output.decode().strip()}"
+        raise AudioError(err_msg)
     # parse version number from output
     try:
         version = output.decode().split("ffmpeg version ")[1].split(" ")[0].split("-")[0]
index 23c128cfc7f880e6759c2239a7fecd2ba07c6c2d..2bc9b135e7b2dbbe73bbcabd1b32907118ee3da3 100644 (file)
@@ -66,7 +66,7 @@ rmfile = wrap(os.remove)
 listdir = wrap(os.listdir)
 rename = wrap(os.rename)
 
-EventCallBackType = Callable[[MassEvent], None] | Coroutine[MassEvent, Any, None]
+EventCallBackType = Callable[[MassEvent], None] | Callable[[MassEvent], Coroutine[Any, Any, None]]
 EventSubscriptionType = tuple[
     EventCallBackType, tuple[EventType, ...] | None, tuple[str, ...] | None
 ]
@@ -307,7 +307,7 @@ class MusicAssistant:
                 continue
             if asyncio.iscoroutinefunction(cb_func):
                 if TYPE_CHECKING:
-                    cb_func = cast(Coroutine[Any, Any, None], cb_func)
+                    cb_func = cast(Callable[[MassEvent], Coroutine[Any, Any, None]], cb_func)
                 self.create_task(cb_func, event_obj)
             else:
                 if TYPE_CHECKING:
@@ -341,7 +341,7 @@ class MusicAssistant:
 
     def create_task(
         self,
-        target: Coroutine[Any, Any, _R] | Awaitable[_R],
+        target: Callable[[MassEvent], Coroutine[Any, Any, None]] | Awaitable[_R],
         *args: Any,
         task_id: str | None = None,
         abort_existing: bool = False,
index d7009bd6a93a687bb9de2c554af5b49cb13ab8b6..9bb01aefe2a04862713148d917968dd246dc6707 100644 (file)
@@ -3,18 +3,69 @@
 from __future__ import annotations
 
 from collections.abc import AsyncGenerator
-from typing import TYPE_CHECKING
+from dataclasses import dataclass, field
 
-from .provider import Provider
+from mashumaro import field_options, pass_through
+from music_assistant_models.enums import StreamType
+from music_assistant_models.player import PlayerMedia, PlayerSource
+from music_assistant_models.streamdetails import AudioFormat  # noqa: TC002
 
-if TYPE_CHECKING:
-    from music_assistant_models.enums import MediaType
-    from music_assistant_models.media_items import PluginSource
-    from music_assistant_models.streamdetails import StreamDetails
+from .provider import Provider
 
 # ruff: noqa: ARG001, ARG002
 
 
+@dataclass()
+class PluginSource(PlayerSource):
+    """
+    Model for a PluginSource, which is a player (audio)source provided by a plugin.
+
+    This (intermediate)  model is not exposed on the api,
+    but is used internally by the plugin provider.
+    """
+
+    # The output format that is sent to the player
+    # (or to the library/application that is used to send audio to the player)
+    audio_format: AudioFormat | None = field(
+        default=None,
+        compare=False,
+        metadata=field_options(serialize="omit", deserialize=pass_through),
+        repr=False,
+    )
+
+    # use this if the plugin can only provide a source to a single player at a time
+    in_use_by: str | None = field(
+        default=None,
+        compare=False,
+        metadata=field_options(serialize="omit", deserialize=pass_through),
+        repr=False,
+    )
+
+    # metadata of the current playing media (if known)
+    metadata: PlayerMedia | None = field(
+        default=None,
+        compare=False,
+        metadata=field_options(serialize="omit", deserialize=pass_through),
+        repr=False,
+    )
+
+    # The type of stream that is provided by this source
+    stream_type: StreamType | None = field(
+        default=StreamType.CUSTOM,
+        compare=False,
+        metadata=field_options(serialize="omit", deserialize=pass_through),
+        repr=False,
+    )
+
+    # The path to the source/audio (if streamtype is not custom)
+    path: str | None = field(
+        default=None,
+        compare=False,
+        metadata=field_options(serialize="omit", deserialize=pass_through),
+        repr=False,
+    )
+
+
 class PluginProvider(Provider):
     """
     Base representation of a Plugin for Music Assistant.
@@ -22,44 +73,20 @@ class PluginProvider(Provider):
     Plugin Provider implementations should inherit from this base model.
     """
 
-    async def get_sources(self) -> list[PluginSource]:  # type: ignore[return]
-        """Get all audio sources provided by this provider."""
-        # Will only be called if ProviderFeature.AUDIO_SOURCE is declared
-        raise NotImplementedError
-
-    async def get_source(self, prov_source_id: str) -> PluginSource:  # type: ignore[return]
-        """Get AudioSource details by id."""
+    def get_source(self) -> PluginSource:  # type: ignore[return]
+        """Get (audio)source details for this plugin."""
         # Will only be called if ProviderFeature.AUDIO_SOURCE is declared
         raise NotImplementedError
 
-    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
-        """Return the streamdetails to stream an (audio)source provided by this plugin."""
-        # Will only be called if ProviderFeature.AUDIO_SOURCE is declared
-        raise NotImplementedError
-
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
+    async def get_audio_stream(self, player_id: str) -> AsyncGenerator[bytes, None]:
         """
-        Return the (custom) audio stream for the provider item.
+        Return the (custom) audio stream for the audio source provided by this plugin.
+
+        Will only be called if this plugin is a PluginSource, meaning that
+        the ProviderFeature.AUDIO_SOURCE is declared AND if the streamtype is StreamType.CUSTOM.
 
-        Will only be called when the stream_type is set to CUSTOM.
+        The player_id is the id of the player that is requesting the stream.
         """
         if False:
             yield b""
         raise NotImplementedError
-
-    async def on_streamed(
-        self,
-        streamdetails: StreamDetails,
-    ) -> None:
-        """
-        Handle callback when given streamdetails completed streaming.
-
-        To get the number of seconds streamed, see streamdetails.seconds_streamed.
-        To get the number of seconds seeked/skipped, see streamdetails.seek_position.
-        Note that seconds_streamed is the total streamed seconds, so without seeked time.
-
-        NOTE: Due to internal and player buffering,
-        this may be called in advance of the actual completion.
-        """
index 9e5aa66ad450133e5de97f0b4695dbd3abb6828d..33ff6dea531d61c9032f12312893787049fe5012 100644 (file)
@@ -109,7 +109,7 @@ class MyDemoPlayerprovider(PlayerProvider):
         """Return the features supported by this Provider."""
         # MANDATORY
         # you should return a set of provider-level features
-        # here that your player provider supports or an empty tuple if none.
+        # here that your player provider supports or an empty set if none.
         # for example 'ProviderFeature.SYNC_PLAYERS' if you can sync players.
         return {ProviderFeature.SYNC_PLAYERS}
 
index 5e5fd807da7333556ad0b5623fe8d45138d23b24..fbed240056b8c48b05ad90b7121f694ff49997be 100644 (file)
@@ -38,7 +38,7 @@ from collections.abc import AsyncGenerator
 from typing import TYPE_CHECKING
 
 from music_assistant_models.enums import ContentType, EventType, ProviderFeature
-from music_assistant_models.streamdetails import AudioFormat
+from music_assistant_models.media_items.audio_format import AudioFormat
 
 from music_assistant.models.plugin import PluginProvider, PluginSource
 
index f952dabf13d95aa11bd2d3c8ad3ee64ed8f9d654..6d3c850c94d1955196ab868b86bda5ee659561c5 100644 (file)
@@ -300,12 +300,30 @@ class AirplayProvider(PlayerProvider):
                 output_format=AIRPLAY_PCM_FORMAT,
                 use_pre_announce=media.custom_data["use_pre_announce"],
             )
+        elif media.media_type == MediaType.PLUGIN_SOURCE:
+            # special case: plugin source stream
+            input_format = AIRPLAY_PCM_FORMAT
+            assert media.custom_data is not None  # for type checking
+            audio_source = self.mass.streams.get_plugin_source_stream(
+                plugin_source_id=media.custom_data["provider"],
+                output_format=AIRPLAY_PCM_FORMAT,
+                player_id=player_id,
+            )
         elif media.queue_id and media.queue_id.startswith("ugp_"):
             # special case: UGP stream
             ugp_provider = cast(PlayerGroupProvider, self.mass.get_provider("player_group"))
             ugp_stream = ugp_provider.ugp_streams[media.queue_id]
             input_format = ugp_stream.base_pcm_format
             audio_source = ugp_stream.subscribe_raw()
+        elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id:
+            # use single item stream request for radio streams
+            input_format = AIRPLAY_PCM_FORMAT
+            queue_item = self.mass.player_queues.get_item(media.queue_id, media.queue_item_id)
+            assert queue_item is not None  # for type checking
+            audio_source = self.mass.streams.get_queue_item_stream(
+                queue_item=queue_item,
+                pcm_format=AIRPLAY_PCM_FORMAT,
+            )
         elif media.queue_id and media.queue_item_id:
             # regular queue (flow) stream request
             input_format = AIRPLAY_FLOW_PCM_FORMAT
@@ -313,7 +331,7 @@ class AirplayProvider(PlayerProvider):
             assert queue
             start_queue_item = self.mass.player_queues.get_item(media.queue_id, media.queue_item_id)
             assert start_queue_item
-            audio_source = self.mass.streams.get_flow_stream(
+            audio_source = self.mass.streams.get_queue_flow_stream(
                 queue=queue,
                 start_queue_item=start_queue_item,
                 pcm_format=input_format,
@@ -610,8 +628,10 @@ class AirplayProvider(PlayerProvider):
                 # device switched to another source (or is powered off)
                 if raop_stream := airplay_player.raop_stream:
                     # ignore this if we just started playing to prevent false positives
-                    assert mass_player.elapsed_time
-                    if mass_player.elapsed_time > 10 and mass_player.state == PlayerState.PLAYING:
+                    elapsed_time = (
+                        10 if mass_player.elapsed_time is None else mass_player.elapsed_time
+                    )
+                    if elapsed_time > 10 and mass_player.state == PlayerState.PLAYING:
                         raop_stream.prevent_playback = True
                         self.mass.create_task(self.monitor_prevent_playback(player_id))
             elif "device-prevent-playback=0" in path:
index 844566da57de05d0c07ce4d7a6f57fc41c9f9b02..7c67a8c6f26a59ba911c9f555dd230b2c9a83dbf 100644 (file)
@@ -421,10 +421,13 @@ class RaopStream:
         title = queue.current_item.name
         artist = ""
         album = ""
-        if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_metadata:
+        if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title:
             # stream title/metadata from radio/live stream
-            title = queue.current_item.streamdetails.stream_metadata.title or ""
-            artist = queue.current_item.streamdetails.stream_metadata.artist or ""
+            if " - " in queue.current_item.streamdetails.stream_title:
+                artist, title = queue.current_item.streamdetails.stream_title.split(" - ", 1)
+            else:
+                title = queue.current_item.streamdetails.stream_title
+                artist = ""
             # set album to radio station name
             album = queue.current_item.name
         elif media_item := queue.current_item.media_item:
index 96af4f1dc001212262b7673036ad66e858e1e821..eb34f5ec3358a4ec644231c1dbd56e04b64f7994 100644 (file)
@@ -213,6 +213,8 @@ async def _get_player_control_config_entries(hass: HomeAssistantClient) -> tuple
     all_mute_entities: list[ConfigValueOption] = []
     all_volume_entities: list[ConfigValueOption] = []
     # collect all entities that are usable for player controls
+    if not hass.connected:
+        return ()
     for state in await hass.get_states():
         if "friendly_name" not in state["attributes"]:
             # filter out invalid/unavailable players
index c022b7c3a78e0f14f74bc1e4368f881f932fa8bf..c1b89b90c7b04c229ccd0592ccddaa88ec9b1eb4 100644 (file)
@@ -481,9 +481,22 @@ class PlayerGroupProvider(PlayerProvider):
                 output_format=UGP_FORMAT,
                 use_pre_announce=media.custom_data["use_pre_announce"],
             )
+        elif media.media_type == MediaType.PLUGIN_SOURCE:
+            # special case: plugin source stream
+            audio_source = self.mass.streams.get_plugin_source_stream(
+                plugin_source_id=media.custom_data["provider"],
+                output_format=UGP_FORMAT,
+                player_id=player_id,
+            )
+        elif media.media_type == MediaType.RADIO:
+            # use single item stream request for radio streams
+            audio_source = self.mass.streams.get_queue_item_stream(
+                queue_item=self.mass.player_queues.get_item(media.queue_id, media.queue_item_id),
+                pcm_format=UGP_FORMAT,
+            )
         elif media.queue_id and media.queue_item_id:
             # regular queue stream request
-            audio_source = self.mass.streams.get_flow_stream(
+            audio_source = self.mass.streams.get_queue_flow_stream(
                 queue=self.mass.player_queues.get(media.queue_id),
                 start_queue_item=self.mass.player_queues.get_item(
                     media.queue_id, media.queue_item_id
index 466eef2519aadfdb113a7dcd79da0252c3ca7099..45ddc7ae45b5bcffecfc2b9ba7640fa7422a72e3 100644 (file)
@@ -24,7 +24,7 @@ from music_assistant_models.media_items import (
     ProviderMapping,
     Radio,
 )
-from music_assistant_models.streamdetails import LivestreamMetadata, StreamDetails
+from music_assistant_models.streamdetails import StreamDetails
 from tenacity import RetryError
 
 from music_assistant.helpers.util import select_free_port
@@ -268,10 +268,8 @@ class SiriusXMProvider(MusicProvider):
         if latest_cut_marker:
             latest_cut = latest_cut_marker.cut
             title = latest_cut.title
-            self._current_stream_details.stream_metadata = LivestreamMetadata(
-                title=title,
-                artist=", ".join([a.name for a in latest_cut.artists]),
-            )
+            artist = ", ".join([a.name for a in latest_cut.artists])
+            self._current_stream_details.stream_title = f"{artist} - {title}"
 
     async def _refresh_channels(self) -> bool:
         self._channels = await self._client.channels
index 64c5e24810a000960f800fdfdf2683ed2be6054c..d8a75624e774abc16cb784b35026b5954a9f7652 100644 (file)
@@ -377,15 +377,28 @@ class SlimprotoProvider(PlayerProvider):
                 output_format=master_audio_format,
                 use_pre_announce=media.custom_data["use_pre_announce"],
             )
+        elif media.media_type == MediaType.PLUGIN_SOURCE:
+            # special case: plugin source stream
+            audio_source = self.mass.streams.get_plugin_source_stream(
+                plugin_source_id=media.custom_data["provider"],
+                output_format=master_audio_format,
+                player_id=player_id,
+            )
         elif media.queue_id.startswith("ugp_"):
             # special case: UGP stream
             ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
             ugp_stream = ugp_provider.ugp_streams[media.queue_id]
             # Filter is later applied in MultiClientStream
             audio_source = ugp_stream.get_stream(master_audio_format, filter_params=None)
+        elif media.media_type == MediaType.RADIO:
+            # use single item stream request for radio streams
+            audio_source = self.mass.streams.get_queue_item_stream(
+                queue_item=self.mass.player_queues.get_item(media.queue_id, media.queue_item_id),
+                pcm_format=master_audio_format,
+            )
         elif media.queue_id and media.queue_item_id:
             # regular queue stream request
-            audio_source = self.mass.streams.get_flow_stream(
+            audio_source = self.mass.streams.get_queue_flow_stream(
                 queue=self.mass.player_queues.get(media.queue_id),
                 start_queue_item=self.mass.player_queues.get_item(
                     media.queue_id, media.queue_item_id
index 580ff530af944168e84172e53ad3f9ee8b9a88f9..2191b0e7065bf18a38d17ff468bdacdcef5ca437 100644 (file)
@@ -482,7 +482,7 @@ class SnapCastProvider(PlayerProvider):
         self.mass.players.update(player_id, skip_forward=True)
         self.mass.players.update(mass_player.synced_to, skip_forward=True)
 
-    async def play_media(self, player_id: str, media: PlayerMedia) -> None:
+    async def play_media(self, player_id: str, media: PlayerMedia) -> None:  # noqa: PLR0915
         """Handle PLAY MEDIA on given player."""
         player = self.mass.players.get(player_id)
         if player.synced_to:
@@ -506,16 +506,39 @@ class SnapCastProvider(PlayerProvider):
                 output_format=DEFAULT_SNAPCAST_FORMAT,
                 use_pre_announce=media.custom_data["use_pre_announce"],
             )
+        elif media.media_type == MediaType.PLUGIN_SOURCE:
+            # special case: plugin source stream
+            # consume the stream directly, so we can skip one step in between
+            # provider: PluginProvider = self.mass.get_provider(media.custom_data["provider"])
+            # plugin_source = provider.get_source()
+            # audio_source = (
+            #     provider.get_audio_stream(player_id)
+            #     if plugin_source.stream_type == StreamType.CUSTOM
+            #     else plugin_source.path
+            # )
+            input_format = DEFAULT_SNAPCAST_FORMAT
+            audio_source = self.mass.streams.get_plugin_source_stream(
+                plugin_source_id=media.custom_data["provider"],
+                output_format=DEFAULT_SNAPCAST_FORMAT,
+                player_id=player_id,
+            )
         elif media.queue_id.startswith("ugp_"):
             # special case: UGP stream
             ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
             ugp_stream = ugp_provider.ugp_streams[media.queue_id]
             input_format = ugp_stream.base_pcm_format
             audio_source = ugp_stream.subscribe_raw()
+        elif media.media_type == MediaType.RADIO:
+            # use single item stream request for radio streams
+            input_format = DEFAULT_SNAPCAST_FORMAT
+            audio_source = self.mass.streams.get_queue_item_stream(
+                queue_item=self.mass.player_queues.get_item(media.queue_id, media.queue_item_id),
+                pcm_format=DEFAULT_SNAPCAST_FORMAT,
+            )
         elif media.queue_id and media.queue_item_id:
             # regular queue (flow) stream request
             input_format = DEFAULT_SNAPCAST_PCM_FORMAT
-            audio_source = self.mass.streams.get_flow_stream(
+            audio_source = self.mass.streams.get_queue_flow_stream(
                 queue=self.mass.player_queues.get(media.queue_id),
                 start_queue_item=self.mass.player_queues.get_item(
                     media.queue_id, media.queue_item_id
index 98618b4bec95f95fd9424c1d12810368ec9efca9..f5892264b592b8caa9376a91b21c467d8d223bd2 100644 (file)
@@ -50,7 +50,7 @@ PLAYER_SOURCE_MAP = {
     ),
     SOURCE_AIRPLAY: PlayerSource(
         id=SOURCE_AIRPLAY,
-        name="Spotify",
+        name="Airplay",
         passive=True,
         can_play_pause=True,
         can_next_previous=True,
@@ -66,7 +66,7 @@ PLAYER_SOURCE_MAP = {
     ),
     SOURCE_RADIO: PlayerSource(
         id=SOURCE_RADIO,
-        name="Spotify",
+        name="Radio",
         passive=True,
         can_play_pause=True,
         can_next_previous=True,
index 5e9c9f43d7def75a40d2a10c5bbcd212fb54bac0..0054e58373af3760bf9461530b812efe7e9de794 100644 (file)
@@ -340,10 +340,8 @@ class SonosPlayerProvider(PlayerProvider):
 
         # play a single uri/url
         # note that this most probably will only work for (long running) radio streams
-        if self.mass.config.get_raw_player_config_value(
-            player_id, CONF_ENTRY_ENFORCE_MP3.key, CONF_ENTRY_ENFORCE_MP3.default_value
-        ):
-            media.uri = media.uri.replace(".flac", ".mp3")
+        # enforce mp3 here because Sonos really does not support FLAC streams without duration
+        media.uri = media.uri.replace(".flac", ".mp3")
         await sonos_player.client.player.group.play_stream_url(
             media.uri, {"name": media.title, "type": "track"}
         )
index 91b917ff0323b5ade7d71a37194cb8e2ffb7d2b7..2e654dfaa16dfe73324641f1dc51b99b79e5a7ba 100644 (file)
@@ -22,23 +22,17 @@ from music_assistant_models.enums import (
     EventType,
     MediaType,
     ProviderFeature,
-    QueueOption,
     StreamType,
 )
-from music_assistant_models.errors import MediaNotFoundError
-from music_assistant_models.media_items import AudioFormat, PluginSource, ProviderMapping
-from music_assistant_models.streamdetails import LivestreamMetadata, StreamDetails
+from music_assistant_models.media_items import AudioFormat
+from music_assistant_models.player import PlayerMedia
 
 from music_assistant.constants import CONF_ENTRY_WARN_PREVIEW
-from music_assistant.helpers.audio import get_chunksize
-from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
-from music_assistant.helpers.process import AsyncProcess
-from music_assistant.models.music_provider import MusicProvider
+from music_assistant.helpers.process import AsyncProcess, check_output
+from music_assistant.models.plugin import PluginProvider, PluginSource
 from music_assistant.providers.spotify.helpers import get_librespot_binary
 
 if TYPE_CHECKING:
-    from collections.abc import AsyncGenerator
-
     from aiohttp.web import Request
     from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
     from music_assistant_models.event import MassEvent
@@ -48,7 +42,6 @@ if TYPE_CHECKING:
     from music_assistant.models import ProviderInstanceType
 
 CONF_MASS_PLAYER_ID = "mass_player_id"
-CONF_CUSTOM_NAME = "custom_name"
 CONF_HANDOFF_MODE = "handoff_mode"
 CONNECT_ITEM_ID = "spotify_connect"
 
@@ -89,15 +82,6 @@ async def get_config_entries(
             ),
             required=True,
         ),
-        ConfigEntry(
-            key=CONF_CUSTOM_NAME,
-            type=ConfigEntryType.STRING,
-            label="Name for the Spotify Connect Player",
-            default_value="",
-            description="Select what name should be shown in the Spotify app as speaker name. "
-            "Leave blank to use the Music Assistant player's name",
-            required=False,
-        ),
         # ConfigEntry(
         #     key=CONF_HANDOFF_MODE,
         #     type=ConfigEntryType.BOOLEAN,
@@ -113,14 +97,14 @@ async def get_config_entries(
         #     "When enabling handoff mode, the Spotify Connect plugin will instead "
         #     "forward the Spotify playback request to the Music Assistant Queue, so basically "
         #     "the spotify app can be used to initiate playback, but then MA will take over "
-        #     "the playback and manage the queue, the normal operating mode of MA. \n\n"
+        #     "the playback and manage the queue, which is the normal operating mode of MA. \n\n"
         #     "This mode however means that the Spotify app will not report the actual playback ",
         #     required=False,
         # ),
     )
 
 
-class SpotifyConnectProvider(MusicProvider):
+class SpotifyConnectProvider(PluginProvider):
     """Implementation of a Spotify Connect Plugin."""
 
     def __init__(
@@ -135,9 +119,31 @@ class SpotifyConnectProvider(MusicProvider):
         self._runner_task: asyncio.Task | None = None  # type: ignore[type-arg]
         self._librespot_proc: AsyncProcess | None = None
         self._librespot_started = asyncio.Event()
-        self._player_connected: bool = False
-        self._current_streamdetails: StreamDetails | None = None
-        self._audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(60)
+        self.named_pipe = f"/tmp/{self.instance_id}"  # noqa: S108
+        self._source_details = PluginSource(
+            id=self.lookup_key,
+            name=self.manifest.name,
+            # we set passive to true because we
+            # dont allow this source to be selected directly
+            passive=True,
+            # TODO: implement controlling spotify from MA itself
+            can_play_pause=False,
+            can_seek=False,
+            can_next_previous=False,
+            audio_format=AudioFormat(
+                content_type=ContentType.PCM_S16LE,
+                codec_type=ContentType.PCM_S16LE,
+                sample_rate=44100,
+                bit_depth=16,
+                channels=2,
+            ),
+            metadata=PlayerMedia(
+                "Spotify Connect",
+            ),
+            stream_type=StreamType.NAMED_PIPE,
+            path=self.named_pipe,
+        )
+        self._audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(10)
         self._on_unload_callbacks: list[Callable[..., None]] = [
             self.mass.subscribe(
                 self._on_mass_player_event,
@@ -155,15 +161,6 @@ class SpotifyConnectProvider(MusicProvider):
         """Return the features supported by this Provider."""
         return {ProviderFeature.AUDIO_SOURCE}
 
-    @property
-    def name(self) -> str:
-        """Return (custom) friendly name for this provider instance."""
-        if custom_name := cast(str, self.config.get_value(CONF_CUSTOM_NAME)):
-            return f"{self.manifest.name}: {custom_name}"
-        if player := self.mass.players.get(self.mass_player_id):
-            return f"{self.manifest.name}: {player.display_name}"
-        return super().name
-
     async def handle_async_init(self) -> None:
         """Handle async initialization of the provider."""
         self._librespot_bin = await get_librespot_binary()
@@ -178,75 +175,24 @@ class SpotifyConnectProvider(MusicProvider):
         for callback in self._on_unload_callbacks:
             callback()
 
-    async def get_sources(self) -> list[PluginSource]:
-        """Get all audio sources provided by this provider."""
-        # we only have passive/hidden sources so no need to supply this listing
-        return []
-
-    async def get_source(self, prov_source_id: str) -> PluginSource:
-        """Get AudioSource details by id."""
-        if prov_source_id != CONNECT_ITEM_ID:
-            raise MediaNotFoundError(f"Invalid source id: {prov_source_id}")
-        return PluginSource(
-            item_id=CONNECT_ITEM_ID,
-            provider=self.instance_id,
-            name="Spotify Connect",
-            provider_mappings={
-                ProviderMapping(
-                    item_id=CONNECT_ITEM_ID,
-                    provider_domain=self.domain,
-                    provider_instance=self.instance_id,
-                    audio_format=AudioFormat(content_type=ContentType.OGG),
-                )
-            },
-        )
-
-    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
-        """Return the streamdetails to stream an audiosource provided by this plugin."""
-        self._current_streamdetails = streamdetails = StreamDetails(
-            item_id=CONNECT_ITEM_ID,
-            provider=self.instance_id,
-            audio_format=AudioFormat(
-                content_type=ContentType.OGG,
-            ),
-            media_type=MediaType.PLUGIN_SOURCE,
-            allow_seek=False,
-            can_seek=False,
-            stream_type=StreamType.CUSTOM,
-        )
-        return streamdetails
-
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """Return the audio stream for the provider item."""
-        if not self._librespot_proc or self._librespot_proc.closed:
-            raise MediaNotFoundError(f"Librespot not ready for: {streamdetails.item_id}")
-        self._player_connected = True
-        try:
-            while True:
-                yield await self._audio_buffer.get()
-        finally:
-            self._player_connected = False
-            await asyncio.sleep(2)
-            if not self._player_connected:
-                # handle situation where the stream is disconnected from the MA player
-                # easiest way to unmark this librespot instance as active player is to close it
-                await self._librespot_proc.close(True)
+    def get_source(self) -> PluginSource:
+        """Get (audio)source details for this plugin."""
+        return self._source_details
 
     async def _librespot_runner(self) -> None:
         """Run the spotify connect daemon in a background task."""
         assert self._librespot_bin
-        if not (player := self.mass.players.get(self.mass_player_id)):
-            raise MediaNotFoundError(f"Player not found: {self.mass_player_id}")
-        name = cast(str, self.config.get_value(CONF_CUSTOM_NAME) or player.display_name)
-        self.logger.info("Starting Spotify Connect background daemon %s", name)
+        self.logger.info("Starting Spotify Connect background daemon")
         os.environ["MASS_CALLBACK"] = f"{self.mass.streams.base_url}/{self.instance_id}"
+        await check_output("rm", "-f", self.named_pipe)
+        await asyncio.sleep(0.5)
+        await check_output("mkfifo", self.named_pipe)
+        await asyncio.sleep(0.5)
         try:
             args: list[str] = [
                 self._librespot_bin,
                 "--name",
-                name,
+                self.name,
                 "--cache",
                 self.cache_dir,
                 "--disable-audio-cache",
@@ -254,9 +200,10 @@ class SpotifyConnectProvider(MusicProvider):
                 "320",
                 "--backend",
                 "pipe",
+                "--device",
+                self.named_pipe,
                 "--dither",
                 "none",
-                "--passthrough",
                 # disable volume control
                 "--mixer",
                 "softvol",
@@ -264,51 +211,37 @@ class SpotifyConnectProvider(MusicProvider):
                 "fixed",
                 "--initial-volume",
                 "100",
+                "--enable-volume-normalisation",
                 # forward events to the events script
                 "--onevent",
                 str(EVENTS_SCRIPT),
                 "--emit-sink-events",
             ]
             self._librespot_proc = librespot = AsyncProcess(
-                args, stdout=True, stderr=True, name=f"librespot[{name}]"
+                args, stdout=False, stderr=True, name=f"librespot[{self.name}]"
             )
             await librespot.start()
 
             # keep reading logging from stderr until exit
-            async def log_reader() -> None:
-                async for line in librespot.iter_stderr():
-                    if (
-                        not self._librespot_started.is_set()
-                        and "Using StdoutSink (pipe) with format: S16" in line
-                    ):
-                        self._librespot_started.set()
-                    if "error sending packet Os" in line:
-                        continue
-                    if "dropping truncated packet" in line:
-                        continue
-                    if "couldn't parse packet from " in line:
-                        continue
-                    self.logger.debug(line)
-
-            async def audio_reader() -> None:
-                chunksize = get_chunksize(AudioFormat(content_type=ContentType.OGG))
-                async for chunk in get_ffmpeg_stream(
-                    librespot.iter_chunked(chunksize),
-                    input_format=AudioFormat(content_type=ContentType.OGG),
-                    output_format=AudioFormat(content_type=ContentType.OGG),
-                    extra_input_args=["-readrate", "1.0"],
+            async for line in librespot.iter_stderr():
+                if (
+                    not self._librespot_started.is_set()
+                    and "Using StdoutSink (pipe) with format: S16" in line
                 ):
-                    if librespot.closed or self._stop_called:
-                        break
-                    if not self._player_connected:
-                        continue
-                    await self._audio_buffer.put(chunk)
+                    self._librespot_started.set()
+                if "error sending packet Os" in line:
+                    continue
+                if "dropping truncated packet" in line:
+                    continue
+                if "couldn't parse packet from " in line:
+                    continue
+                self.logger.debug(line)
 
-            await asyncio.gather(log_reader(), audio_reader())
         except asyncio.CancelledError:
             await librespot.close(True)
         finally:
-            self.logger.info("Spotify Connect background daemon stopped for %s", name)
+            self.logger.info("Spotify Connect background daemon stopped for %s", self.name)
+            await check_output("rm", "-f", self.named_pipe)
             # auto restart if not stopped manually
             if not self._stop_called and self._librespot_started.is_set():
                 self._setup_player_daemon()
@@ -338,34 +271,36 @@ class SpotifyConnectProvider(MusicProvider):
         # handle session connected event
         # this player has become the active spotify connect player
         # we need to start the playback
-        if not self._player_connected and json_data.get("event") in (
-            "session_connected",
-            "play_request_id_changed",
+        self.logger.error("%s - %s", self.name, json_data.get("event"))
+        if not self._source_details.in_use_by and json_data.get("event") in (
+            # "session_connected",
+            # "loading",
+            "sink",
         ):
-            # initiate playback by selecting the pluginsource mediaitem on the player
-            pluginsource_item = await self.get_source(CONNECT_ITEM_ID)
+            # initiate playback by selecting this source on the default player
             self.mass.create_task(
-                self.mass.player_queues.play_media(
-                    queue_id=self.mass_player_id,
-                    media=pluginsource_item,
-                    option=QueueOption.REPLACE,
-                )
+                self.mass.players.select_source(self.mass_player_id, self.lookup_key)
             )
 
-        if self._current_streamdetails:
-            # parse metadata fields
-            if "common_metadata_fields" in json_data:
-                title = json_data["common_metadata_fields"].get("name", "Unknown")
-                if artists := json_data.get("track_metadata_fields", {}).get("artists"):
-                    artist = artists[0]
-                else:
-                    artist = "Unknown"
-                if images := json_data["common_metadata_fields"].get("covers"):
-                    image_url = images[0]
-                else:
-                    image_url = None
-                self._current_streamdetails.stream_metadata = LivestreamMetadata(
-                    title=title, artist=artist, image_url=image_url
-                )
+        # parse metadata fields
+        if "common_metadata_fields" in json_data:
+            uri = json_data["common_metadata_fields"].get("uri", "Unknown")
+            title = json_data["common_metadata_fields"].get("name", "Unknown")
+            if artists := json_data.get("track_metadata_fields", {}).get("artists"):
+                artist = artists[0]
+            else:
+                artist = "Unknown"
+            album = json_data["common_metadata_fields"].get("album", "Unknown")
+            if images := json_data["common_metadata_fields"].get("covers"):
+                image_url = images[0]
+            else:
+                image_url = None
+            if self._source_details.metadata is None:
+                self._source_details.metadata = PlayerMedia(uri, media_type=MediaType.TRACK)
+            self._source_details.metadata.uri = uri
+            self._source_details.metadata.title = title
+            self._source_details.metadata.artist = artist
+            self._source_details.metadata.album = album
+            self._source_details.metadata.image_url = image_url
 
         return Response()