From b6e6ceea8a7b69e37293109b4f99f535ee5a2c5d Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Sat, 30 Aug 2025 13:57:43 +0200 Subject: [PATCH] Some small follow-up fixes after Players controller refactor (#2362) --- music_assistant/controllers/player_queues.py | 2 +- music_assistant/controllers/players.py | 6 +- music_assistant/providers/sonos/player.py | 4 +- .../providers/squeezelite/player.py | 24 ++++--- .../providers/squeezelite/provider.py | 68 +++++++++++++++---- 5 files changed, 75 insertions(+), 29 deletions(-) diff --git a/music_assistant/controllers/player_queues.py b/music_assistant/controllers/player_queues.py index 608b1e94..17d847fe 100644 --- a/music_assistant/controllers/player_queues.py +++ b/music_assistant/controllers/player_queues.py @@ -53,7 +53,6 @@ from music_assistant_models.media_items import ( media_from_dict, ) from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport -from music_assistant_models.player import PlayerMedia from music_assistant_models.player_queue import PlayerQueue from music_assistant_models.queue_item import QueueItem @@ -70,6 +69,7 @@ from music_assistant.helpers.audio import get_stream_details, get_stream_dsp_det from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER from music_assistant.helpers.util import get_changed_keys, percentage from music_assistant.models.core_controller import CoreController +from music_assistant.models.player import Player, PlayerMedia if TYPE_CHECKING: from collections.abc import Iterator diff --git a/music_assistant/controllers/players.py b/music_assistant/controllers/players.py index 91301a00..9d71ff3f 100644 --- a/music_assistant/controllers/players.py +++ b/music_assistant/controllers/players.py @@ -1163,9 +1163,6 @@ class PlayerController(CoreController): if not player.enabled: return - # register playerqueue for this player - self.mass.create_task(self.mass.player_queues.on_player_register(player)) - # register throttler for this player self._player_throttlers[player_id] = Throttler(1, 0.05) @@ -1196,6 +1193,9 @@ class PlayerController(CoreController): # signal event that a player was added self.mass.signal_event(EventType.PLAYER_ADDED, object_id=player.player_id, data=player) + # register playerqueue for this player + await self.mass.player_queues.on_player_register(player) + async def register_or_update(self, player: Player) -> None: """Register a new player on the controller or update existing one.""" if self.mass.closing: diff --git a/music_assistant/providers/sonos/player.py b/music_assistant/providers/sonos/player.py index c4069a1e..a492db5b 100644 --- a/music_assistant/providers/sonos/player.py +++ b/music_assistant/providers/sonos/player.py @@ -718,7 +718,7 @@ class SonosPlayer(Player): if not retry_on_fail or not self.mass_player: raise self._attr_available = False - self.mass.players.update(self.player_id) + self.update_state() self.reconnect(min(retry_on_fail + 30, 3600)) return self.connected = True @@ -739,7 +739,7 @@ class SonosPlayer(Player): # we rely on mdns to pick it up again later await self._disconnect() self._attr_available = False - self.mass.players.update(self.player_id) + self.update_state() self.reconnect(5) self._listen_task = self.mass.create_task(_listener()) diff --git a/music_assistant/providers/squeezelite/player.py b/music_assistant/providers/squeezelite/player.py index 33bde07f..f750dc28 100644 --- a/music_assistant/providers/squeezelite/player.py +++ b/music_assistant/providers/squeezelite/player.py @@ -19,6 +19,7 @@ from music_assistant_models.enums import ( ConfigEntryType, ContentType, MediaType, + PlaybackState, PlayerFeature, PlayerType, RepeatMode, @@ -90,7 +91,7 @@ class SqueezelitePlayer(Player): PlayerFeature.GAPLESS_PLAYBACK, } self._attr_can_group_with = {provider.lookup_key} - self._multi_client_stream: MultiClientStream | None = None + self.multi_client_stream: MultiClientStream | None = None self._sync_playpoints: deque[SyncPlayPoint] = deque(maxlen=MIN_REQ_PLAYPOINTS) self._do_not_resync_before: float = 0.0 @@ -201,7 +202,8 @@ class SqueezelitePlayer(Player): if not self.group_members: # Simple, single-player playback - await self._handle_play_url( + await self._handle_play_url_for_slimplayer( + self.client, url=media.uri, media=media, send_flush=True, @@ -255,7 +257,7 @@ class SqueezelitePlayer(Player): output_format=master_audio_format, ) # start the stream task - self._multi_client_stream = stream = MultiClientStream( + self.multi_client_stream = stream = MultiClientStream( audio_source=audio_source, audio_format=master_audio_format ) base_url = ( @@ -268,7 +270,7 @@ class SqueezelitePlayer(Player): url = f"{base_url}&child_player_id={slimplayer.player_id}" stream.expected_clients += 1 tg.create_task( - self._handle_play_url( + self._handle_play_url_for_slimplayer( slimplayer, url=url, media=media, @@ -279,7 +281,8 @@ class SqueezelitePlayer(Player): async def enqueue_next_media(self, media: PlayerMedia) -> None: """Handle enqueuing next media item.""" - await self._handle_play_url( + await self._handle_play_url_for_slimplayer( + self.client, url=media.uri, media=media, enqueue=True, @@ -330,8 +333,7 @@ class SqueezelitePlayer(Player): # always update the state after modifying group members self.update_state() - stream_session = self._multi_client_stream - if players_added and stream_session and not stream_session.done: + if players_added and self.current_media and self.playback_state == PlaybackState.PLAYING: # restart stream session if it was already playing # for now, we dont support late joining into an existing stream self.mass.create_task(self.play_media(self.current_media)) @@ -393,8 +395,9 @@ class SqueezelitePlayer(Player): else: self._attr_current_media = None - async def _handle_play_url( + async def _handle_play_url_for_slimplayer( self, + slimplayer: SlimClient, url: str, media: PlayerMedia, enqueue: bool = False, @@ -415,7 +418,7 @@ class SqueezelitePlayer(Player): if queue := self.mass.player_queues.get(media.queue_id): self.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode] self.extra_data["playlist shuffle"] = int(queue.shuffle_enabled) - await self.client.play_url( + await slimplayer.play_url( url=url, mime_type=f"audio/{url.split('.')[-1].split('?')[0]}", metadata=metadata, @@ -433,7 +436,7 @@ class SqueezelitePlayer(Player): if queue and queue.repeat_mode == RepeatMode.ONE: self.mass.call_later( 0.2, - self.client.play_url( + slimplayer.play_url( url=url, mime_type=f"audio/{url.split('.')[-1].split('?')[0]}", metadata=metadata, @@ -481,6 +484,7 @@ class SqueezelitePlayer(Player): childs_ready += 1 if childs_total == childs_ready: break + count += 1 # all child's ready (or timeout) - start play async with TaskManager(self.mass) as tg: diff --git a/music_assistant/providers/squeezelite/provider.py b/music_assistant/providers/squeezelite/provider.py index 9293a970..f6c8fef2 100644 --- a/music_assistant/providers/squeezelite/provider.py +++ b/music_assistant/providers/squeezelite/provider.py @@ -3,36 +3,28 @@ from __future__ import annotations import logging -from dataclasses import dataclass from typing import TYPE_CHECKING, cast +from aiohttp import web from aioslimproto.models import EventType as SlimEventType from aioslimproto.models import SlimEvent from aioslimproto.server import SlimServer -from music_assistant_models.enums import ProviderFeature +from music_assistant_models.enums import ContentType, ProviderFeature from music_assistant_models.errors import SetupFailedError +from music_assistant_models.media_items import AudioFormat from music_assistant.constants import CONF_PORT, CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL +from music_assistant.helpers.audio import get_player_filter_params from music_assistant.helpers.util import is_port_in_use from music_assistant.models.player_provider import PlayerProvider from .constants import CONF_CLI_JSON_PORT, CONF_CLI_TELNET_PORT -from .multi_client_stream import MultiClientStream from .player import SqueezelitePlayer if TYPE_CHECKING: from aioslimproto.client import SlimClient -@dataclass -class StreamInfo: - """Dataclass to store stream information.""" - - stream_id: str - players: list[str] - stream_obj: MultiClientStream - - class SqueezelitePlayerProvider(PlayerProvider): """Player provider for players using slimproto (like Squeezelite).""" @@ -41,7 +33,6 @@ class SqueezelitePlayerProvider(PlayerProvider): super().__init__(*args, **kwargs) self.slimproto: SlimServer | None = None self._players: dict[str, SqueezelitePlayer] = {} - self._multi_client_streams: dict[str, StreamInfo] = {} @property def supported_features(self) -> set[ProviderFeature]: @@ -144,3 +135,54 @@ class SqueezelitePlayerProvider(PlayerProvider): # forward all other events to the player itself player.handle_slim_event(event) + + async def _serve_multi_client_stream(self, request: web.Request) -> web.Response: + """Serve the multi-client flow stream audio to a player.""" + player_id = request.query.get("player_id") + fmt = request.query.get("fmt") + child_player_id = request.query.get("child_player_id") + + if not (sync_parent := self.mass.players.get(player_id)): + raise web.HTTPNotFound(reason=f"Unknown player: {player_id}") + sync_parent = cast("SqueezelitePlayer", sync_parent) + + if not (child_player := self.mass.players.get(child_player_id)): + raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}") + + if not (stream := sync_parent.multi_client_stream) or stream.done: + raise web.HTTPNotFound(f"There is no active stream for {player_id}!") + + resp = web.StreamResponse( + status=200, + reason="OK", + headers={ + "Content-Type": f"audio/{fmt}", + }, + ) + await resp.prepare(request) + + # return early if this is not a GET request + if request.method != "GET": + return resp + + # all checks passed, start streaming! + self.logger.debug( + "Start serving multi-client flow audio stream to %s", + child_player.display_name, + ) + output_format = AudioFormat(content_type=ContentType.try_parse(fmt)) + async for chunk in stream.get_stream( + output_format=output_format, + filter_params=get_player_filter_params( + self.mass, child_player_id, stream.audio_format, output_format + ) + if child_player_id + else None, + ): + try: + await resp.write(chunk) + except (BrokenPipeError, ConnectionResetError, ConnectionError): + # race condition + break + + return resp -- 2.34.1