Some small follow-up fixes after Players controller refactor (#2362)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 30 Aug 2025 11:57:43 +0000 (13:57 +0200)
committerGitHub <noreply@github.com>
Sat, 30 Aug 2025 11:57:43 +0000 (13:57 +0200)
music_assistant/controllers/player_queues.py
music_assistant/controllers/players.py
music_assistant/providers/sonos/player.py
music_assistant/providers/squeezelite/player.py
music_assistant/providers/squeezelite/provider.py

index 608b1e94e22c7062e17d5952f461838e64334a08..17d847fed6c711c5f94fdcdec4f04b18524f446f 100644 (file)
@@ -53,7 +53,6 @@ from music_assistant_models.media_items import (
     media_from_dict,
 )
 from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
-from music_assistant_models.player import PlayerMedia
 from music_assistant_models.player_queue import PlayerQueue
 from music_assistant_models.queue_item import QueueItem
 
@@ -70,6 +69,7 @@ from music_assistant.helpers.audio import get_stream_details, get_stream_dsp_det
 from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
 from music_assistant.helpers.util import get_changed_keys, percentage
 from music_assistant.models.core_controller import CoreController
+from music_assistant.models.player import Player, PlayerMedia
 
 if TYPE_CHECKING:
     from collections.abc import Iterator
index 91301a005d199c93b684614a1223596ea63a566f..9d71ff3ff979fc6af8d55842b94c352b3c207591 100644 (file)
@@ -1163,9 +1163,6 @@ class PlayerController(CoreController):
         if not player.enabled:
             return
 
-        # register playerqueue for this player
-        self.mass.create_task(self.mass.player_queues.on_player_register(player))
-
         # register throttler for this player
         self._player_throttlers[player_id] = Throttler(1, 0.05)
 
@@ -1196,6 +1193,9 @@ class PlayerController(CoreController):
         # signal event that a player was added
         self.mass.signal_event(EventType.PLAYER_ADDED, object_id=player.player_id, data=player)
 
+        # register playerqueue for this player
+        await self.mass.player_queues.on_player_register(player)
+
     async def register_or_update(self, player: Player) -> None:
         """Register a new player on the controller or update existing one."""
         if self.mass.closing:
index c4069a1efc9d89a6297cb41c27bb9e7b0ca63a00..a492db5b8563dbb537c5974aa585bc4641e317d2 100644 (file)
@@ -718,7 +718,7 @@ class SonosPlayer(Player):
             if not retry_on_fail or not self.mass_player:
                 raise
             self._attr_available = False
-            self.mass.players.update(self.player_id)
+            self.update_state()
             self.reconnect(min(retry_on_fail + 30, 3600))
             return
         self.connected = True
@@ -739,7 +739,7 @@ class SonosPlayer(Player):
                     # we rely on mdns to pick it up again later
                     await self._disconnect()
                     self._attr_available = False
-                    self.mass.players.update(self.player_id)
+                    self.update_state()
                     self.reconnect(5)
 
         self._listen_task = self.mass.create_task(_listener())
index 33bde07f900c2b803a25bec37e43666200c52125..f750dc28eda9e1b0154ce9afde954a866fcdc97b 100644 (file)
@@ -19,6 +19,7 @@ from music_assistant_models.enums import (
     ConfigEntryType,
     ContentType,
     MediaType,
+    PlaybackState,
     PlayerFeature,
     PlayerType,
     RepeatMode,
@@ -90,7 +91,7 @@ class SqueezelitePlayer(Player):
             PlayerFeature.GAPLESS_PLAYBACK,
         }
         self._attr_can_group_with = {provider.lookup_key}
-        self._multi_client_stream: MultiClientStream | None = None
+        self.multi_client_stream: MultiClientStream | None = None
         self._sync_playpoints: deque[SyncPlayPoint] = deque(maxlen=MIN_REQ_PLAYPOINTS)
         self._do_not_resync_before: float = 0.0
 
@@ -201,7 +202,8 @@ class SqueezelitePlayer(Player):
 
         if not self.group_members:
             # Simple, single-player playback
-            await self._handle_play_url(
+            await self._handle_play_url_for_slimplayer(
+                self.client,
                 url=media.uri,
                 media=media,
                 send_flush=True,
@@ -255,7 +257,7 @@ class SqueezelitePlayer(Player):
                 output_format=master_audio_format,
             )
         # start the stream task
-        self._multi_client_stream = stream = MultiClientStream(
+        self.multi_client_stream = stream = MultiClientStream(
             audio_source=audio_source, audio_format=master_audio_format
         )
         base_url = (
@@ -268,7 +270,7 @@ class SqueezelitePlayer(Player):
                 url = f"{base_url}&child_player_id={slimplayer.player_id}"
                 stream.expected_clients += 1
                 tg.create_task(
-                    self._handle_play_url(
+                    self._handle_play_url_for_slimplayer(
                         slimplayer,
                         url=url,
                         media=media,
@@ -279,7 +281,8 @@ class SqueezelitePlayer(Player):
 
     async def enqueue_next_media(self, media: PlayerMedia) -> None:
         """Handle enqueuing next media item."""
-        await self._handle_play_url(
+        await self._handle_play_url_for_slimplayer(
+            self.client,
             url=media.uri,
             media=media,
             enqueue=True,
@@ -330,8 +333,7 @@ class SqueezelitePlayer(Player):
         # always update the state after modifying group members
         self.update_state()
 
-        stream_session = self._multi_client_stream
-        if players_added and stream_session and not stream_session.done:
+        if players_added and self.current_media and self.playback_state == PlaybackState.PLAYING:
             # restart stream session if it was already playing
             # for now, we dont support late joining into an existing stream
             self.mass.create_task(self.play_media(self.current_media))
@@ -393,8 +395,9 @@ class SqueezelitePlayer(Player):
         else:
             self._attr_current_media = None
 
-    async def _handle_play_url(
+    async def _handle_play_url_for_slimplayer(
         self,
+        slimplayer: SlimClient,
         url: str,
         media: PlayerMedia,
         enqueue: bool = False,
@@ -415,7 +418,7 @@ class SqueezelitePlayer(Player):
         if queue := self.mass.player_queues.get(media.queue_id):
             self.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode]
             self.extra_data["playlist shuffle"] = int(queue.shuffle_enabled)
-        await self.client.play_url(
+        await slimplayer.play_url(
             url=url,
             mime_type=f"audio/{url.split('.')[-1].split('?')[0]}",
             metadata=metadata,
@@ -433,7 +436,7 @@ class SqueezelitePlayer(Player):
         if queue and queue.repeat_mode == RepeatMode.ONE:
             self.mass.call_later(
                 0.2,
-                self.client.play_url(
+                slimplayer.play_url(
                     url=url,
                     mime_type=f"audio/{url.split('.')[-1].split('?')[0]}",
                     metadata=metadata,
@@ -481,6 +484,7 @@ class SqueezelitePlayer(Player):
                     childs_ready += 1
             if childs_total == childs_ready:
                 break
+            count += 1
 
         # all child's ready (or timeout) - start play
         async with TaskManager(self.mass) as tg:
index 9293a970204a740be4f2607a53421c147b5c074b..f6c8fef2fde344cd23960ab0aabc88ff435a9112 100644 (file)
@@ -3,36 +3,28 @@
 from __future__ import annotations
 
 import logging
-from dataclasses import dataclass
 from typing import TYPE_CHECKING, cast
 
+from aiohttp import web
 from aioslimproto.models import EventType as SlimEventType
 from aioslimproto.models import SlimEvent
 from aioslimproto.server import SlimServer
-from music_assistant_models.enums import ProviderFeature
+from music_assistant_models.enums import ContentType, ProviderFeature
 from music_assistant_models.errors import SetupFailedError
+from music_assistant_models.media_items import AudioFormat
 
 from music_assistant.constants import CONF_PORT, CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
+from music_assistant.helpers.audio import get_player_filter_params
 from music_assistant.helpers.util import is_port_in_use
 from music_assistant.models.player_provider import PlayerProvider
 
 from .constants import CONF_CLI_JSON_PORT, CONF_CLI_TELNET_PORT
-from .multi_client_stream import MultiClientStream
 from .player import SqueezelitePlayer
 
 if TYPE_CHECKING:
     from aioslimproto.client import SlimClient
 
 
-@dataclass
-class StreamInfo:
-    """Dataclass to store stream information."""
-
-    stream_id: str
-    players: list[str]
-    stream_obj: MultiClientStream
-
-
 class SqueezelitePlayerProvider(PlayerProvider):
     """Player provider for players using slimproto (like Squeezelite)."""
 
@@ -41,7 +33,6 @@ class SqueezelitePlayerProvider(PlayerProvider):
         super().__init__(*args, **kwargs)
         self.slimproto: SlimServer | None = None
         self._players: dict[str, SqueezelitePlayer] = {}
-        self._multi_client_streams: dict[str, StreamInfo] = {}
 
     @property
     def supported_features(self) -> set[ProviderFeature]:
@@ -144,3 +135,54 @@ class SqueezelitePlayerProvider(PlayerProvider):
 
         # forward all other events to the player itself
         player.handle_slim_event(event)
+
+    async def _serve_multi_client_stream(self, request: web.Request) -> web.Response:
+        """Serve the multi-client flow stream audio to a player."""
+        player_id = request.query.get("player_id")
+        fmt = request.query.get("fmt")
+        child_player_id = request.query.get("child_player_id")
+
+        if not (sync_parent := self.mass.players.get(player_id)):
+            raise web.HTTPNotFound(reason=f"Unknown player: {player_id}")
+        sync_parent = cast("SqueezelitePlayer", sync_parent)
+
+        if not (child_player := self.mass.players.get(child_player_id)):
+            raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")
+
+        if not (stream := sync_parent.multi_client_stream) or stream.done:
+            raise web.HTTPNotFound(f"There is no active stream for {player_id}!")
+
+        resp = web.StreamResponse(
+            status=200,
+            reason="OK",
+            headers={
+                "Content-Type": f"audio/{fmt}",
+            },
+        )
+        await resp.prepare(request)
+
+        # return early if this is not a GET request
+        if request.method != "GET":
+            return resp
+
+        # all checks passed, start streaming!
+        self.logger.debug(
+            "Start serving multi-client flow audio stream to %s",
+            child_player.display_name,
+        )
+        output_format = AudioFormat(content_type=ContentType.try_parse(fmt))
+        async for chunk in stream.get_stream(
+            output_format=output_format,
+            filter_params=get_player_filter_params(
+                self.mass, child_player_id, stream.audio_format, output_format
+            )
+            if child_player_id
+            else None,
+        ):
+            try:
+                await resp.write(chunk)
+            except (BrokenPipeError, ConnectionResetError, ConnectionError):
+                # race condition
+                break
+
+        return resp