From 74f153d6a8f3c5cad1952ed9d42fac869d437fd7 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 28 Aug 2024 01:51:13 +0200 Subject: [PATCH] Fixes and enhancements to syncgroups and UGP groups (#1621) --- music_assistant/common/models/player.py | 3 + music_assistant/constants.py | 1 + music_assistant/server/controllers/players.py | 143 ++++++++++++++---- .../server/models/player_provider.py | 98 ++++-------- .../server/providers/airplay/__init__.py | 4 +- .../server/providers/qobuz/__init__.py | 16 +- .../server/providers/slimproto/__init__.py | 3 +- .../slimproto}/multi_client_stream.py | 17 ++- .../server/providers/snapcast/__init__.py | 4 +- .../server/providers/ugp/__init__.py | 138 +++++++++++------ 10 files changed, 266 insertions(+), 161 deletions(-) rename music_assistant/server/{helpers => providers/slimproto}/multi_client_stream.py (87%) diff --git a/music_assistant/common/models/player.py b/music_assistant/common/models/player.py index 5ee0e07c..567b1b12 100644 --- a/music_assistant/common/models/player.py +++ b/music_assistant/common/models/player.py @@ -134,6 +134,9 @@ class Player(DataClassDictMixin): # last_poll: when was the player last polled (used with needs_poll) last_poll: float = 0 + # internal use only + _prev_volume_level: int = 0 + @property def corrected_elapsed_time(self) -> float: """Return the corrected/realtime elapsed time.""" diff --git a/music_assistant/constants.py b/music_assistant/constants.py index f1e6f05d..4732a768 100644 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -65,6 +65,7 @@ CONF_ICON: Final[str] = "icon" CONF_LANGUAGE: Final[str] = "language" CONF_SAMPLE_RATES: Final[str] = "sample_rates" CONF_HTTP_PROFILE: Final[str] = "http_profile" +CONF_SYNC_LEADER: Final[str] = "sync_leader" # config default values DEFAULT_HOST: Final[str] = "0.0.0.0" diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index b6fda5c9..0e861801 100644 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -44,6 +44,7 @@ from music_assistant.constants import ( CONF_GROUP_MEMBERS, CONF_HIDE_PLAYER, CONF_PLAYERS, + CONF_SYNC_LEADER, CONF_TTS_PRE_ANNOUNCE, SYNCGROUP_PREFIX, ) @@ -294,49 +295,80 @@ class PlayerController(CoreController): if player.powered == powered: return # nothing to do + # grab info about any groups this player is active in + # to handle actions on the group when a (sync)group child turns on/off + if active_group_player_id := self._get_active_player_group(player): + active_group_player = self.get(active_group_player_id) + group_player_state = active_group_player.state + else: + active_group_player = None + # always stop player at power off if ( not powered + and player.powered and player.state in (PlayerState.PLAYING, PlayerState.PAUSED) and not player.synced_to - and player.powered ): await self.cmd_stop(player_id) # unsync player at power off if not powered: - if player.synced_to is not None: + if player.synced_to or player.group_childs: await self.cmd_unsync(player_id) - for child in self.iter_group_members(player): - if not child.synced_to: - continue - await self.cmd_unsync(child.player_id) + if PlayerFeature.POWER in player.supported_features: - # forward to player provider + # player supports power command: forward to player provider player_provider = self.get_player_provider(player_id) async with self._player_throttlers[player_id]: await player_provider.cmd_power(player_id, powered) else: # allow the stop command to process and prevent race conditions await asyncio.sleep(0.2) + # always optimistically set the power state to update the UI # as fast as possible and prevent race conditions player.powered = powered - # always MA as active source on power ON - player.active_source = player_id if powered else None + # reset active source + player.active_source = None self.update(player_id) - # handle actions when a (sync)group child turns on/off - if active_group_player := self._get_active_player_group(player): - player_prov = self.get_player_provider(active_group_player) - player_prov.on_child_power(active_group_player, player.player_id, powered) + # handle 'auto play on power on' feature - elif ( - powered + if ( + not active_group_player + and powered and self.mass.config.get_raw_player_config_value(player_id, CONF_AUTO_PLAY, False) and player.active_source in (None, player_id) ): await self.mass.player_queues.resume(player_id) + # handle group player actions + if not (active_group_player and active_group_player.powered): + return + + # run actions suitable for every type of group player + powered_childs = list(self.mass.players.iter_group_members(active_group_player, True)) + if not powered and player in powered_childs: + powered_childs.remove(player.player_id) + elif powered and player.player_id not in powered_childs: + powered_childs.append(player.player_id) + # if the last player of a group turned off, turn off the group + if len(powered_childs) == 0: + self.logger.debug( + "Group %s has no more powered members, turning off group player", + active_group_player.display_name, + ) + self.mass.create_task(self.mass.players.cmd_power(active_group_player.player_id, False)) + return + # forward to either syncgroup logic or group player logic + if active_group_player.type == PlayerType.SYNC_GROUP: + self._on_syncgroup_child_power(active_group_player, player, powered, group_player_state) + elif active_group_player.type == PlayerType.GROUP: + player_prov = self.mass.get_provider(active_group_player.provider) + player_prov.on_group_child_power( + active_group_player, player, powered, group_player_state + ) + @api_command("players/cmd/volume_set") @handle_player_command async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: @@ -417,6 +449,10 @@ class PlayerController(CoreController): await self.cmd_power(player_id, power) return + if not (group_player.type == PlayerType.SYNC_GROUP or group_player.group_childs): + # this is not a (temporary) sync group - nothing to do + raise UnsupportedFeaturedException("Player is not a sync group") + # make sure to update the group power state group_player.powered = power @@ -424,6 +460,8 @@ class PlayerController(CoreController): if not power and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED): await self.cmd_stop(player_id) + # handle syncgroup - this will also work for temporary syncgroups + # where players are manually synced against a group leader any_member_powered = False async with TaskManager(self.mass) as tg: for member in self.iter_group_members(group_player, only_powered=True): @@ -465,8 +503,17 @@ class PlayerController(CoreController): player = self.get(player_id, True) assert player if PlayerFeature.VOLUME_MUTE not in player.supported_features: - msg = f"Player {player.display_name} does not support muting" - raise UnsupportedFeaturedException(msg) + self.logger.info( + "Player %s does not support muting, using volume instead", player.display_name + ) + if muted: + player._prev_volume_level = player.volume_level + player.volume_muted = True + await self.cmd_volume_set(player_id, 0) + else: + player.volume_muted = False + await self.cmd_volume_set(player_id, player._prev_volume_level) + return player_provider = self.get_player_provider(player_id) async with self._player_throttlers[player_id]: await player_provider.cmd_volume_mute(player_id, muted) @@ -640,6 +687,10 @@ class PlayerController(CoreController): - player_id: player_id of the player to handle the command. """ + if (player := self.get(player_id)) and player.group_childs: + # this player is a syncgroup leader, unsync all children + await self.cmd_unsync_many(player.group_childs) + return await self.cmd_unsync_many([player_id]) @api_command("players/cmd/sync_many") @@ -692,8 +743,7 @@ class PlayerController(CoreController): async def cmd_unsync_many(self, player_ids: list[str]) -> None: """Handle UNSYNC command for all the given players.""" # filter all player ids on compatibility and availability - final_player_ids: UniqueList[str] = UniqueList() - for player_id in player_ids: + for player_id in list(player_ids): if not (child_player := self.get(player_id)): self.logger.warning("Player %s is not available", player_id) continue @@ -702,16 +752,13 @@ class PlayerController(CoreController): "Player %s does not support (un)sync commands", child_player.name ) continue - final_player_ids.append(player_id) - # reset active source player if is unsynced + if not child_player.synced_to: + continue + # reset active source player if it is unsynced child_player.active_source = None - - if not final_player_ids: - return - - # forward command to the player provider after all (base) sanity checks - player_provider = self.get_player_provider(final_player_ids[0]) - await player_provider.cmd_unsync_many(final_player_ids) + # forward command to the player provider + if player_provider := self.get_player_provider(player_id): + await player_provider.cmd_unsync(player_id) def set(self, player: Player) -> None: """Set/Update player details on the controller.""" @@ -1167,6 +1214,11 @@ class PlayerController(CoreController): ): if child_player.group_childs: return child_player + pref_sync_leader = self.mass.config.get_raw_player_config_value( + group_player.player_id, CONF_SYNC_LEADER, "auto" + ) + if pref_sync_leader != "auto" and (player := self.get(pref_sync_leader)): + return player # select new sync leader: return the first playing player for child_player in self.iter_group_members( group_player, only_powered=True, only_playing=True @@ -1196,6 +1248,41 @@ class PlayerController(CoreController): continue await self.cmd_sync(member.player_id, sync_leader.player_id) + def _on_syncgroup_child_power( + self, group_player: Player, child_player: Player, new_power: bool, group_state: PlayerState + ) -> None: + """ + Call when a power command was executed on one of the child players of a SyncGroup. + + This is used to handle special actions such as (re)syncing. + The group state is sent with the state BEFORE the power command was executed. + """ + group_playing = group_state == PlayerState.PLAYING + sync_leader = self.mass.players.get_sync_leader(group_player) + is_sync_leader = child_player.player_id == sync_leader.player_id + if group_playing and not new_power and is_sync_leader: + # the current sync leader player turned OFF while the group player + # should still be playing - we need to select a new sync leader and resume + self.logger.warning( + "Syncleader %s turned off while syncgroup is playing, " + "a forced resync for syngroup %s will be attempted...", + child_player.display_name, + group_player.display_name, + ) + + async def full_resync() -> None: + await self.mass.players.sync_syncgroup(group_player.player_id) + await self.mass.player_queues.resume(group_player.player_id) + + self.mass.call_later(2, full_resync, task_id=f"forced_resync_{group_player.player_id}") + return + elif new_power: + # if a child player turned ON while the group is already active, we need to resync + if sync_leader.player_id != child_player.player_id: + self.mass.create_task( + self.cmd_sync(child_player.player_id, sync_leader.player_id), + ) + async def _register_syncgroups(self) -> None: """Register all (virtual/fake) syncgroup players.""" player_configs = await self.mass.config.get_player_configs() diff --git a/music_assistant/server/models/player_provider.py b/music_assistant/server/models/player_provider.py index f5c89c1c..48711dc8 100644 --- a/music_assistant/server/models/player_provider.py +++ b/music_assistant/server/models/player_provider.py @@ -23,9 +23,9 @@ from music_assistant.common.models.config_entries import ( ConfigValueOption, PlayerConfig, ) -from music_assistant.common.models.enums import ConfigEntryType, PlayerState, ProviderFeature +from music_assistant.common.models.enums import ConfigEntryType, PlayerState from music_assistant.common.models.player import Player, PlayerMedia -from music_assistant.constants import CONF_GROUP_MEMBERS, SYNCGROUP_PREFIX +from music_assistant.constants import CONF_GROUP_MEMBERS, CONF_SYNC_LEADER, SYNCGROUP_PREFIX from .provider import Provider @@ -71,6 +71,28 @@ class PlayerProvider(Provider): multi_value=True, required=True, ), + ConfigEntry( + key=CONF_SYNC_LEADER, + type=ConfigEntryType.STRING, + label="Preferred sync leader", + default_value="auto", + options=( + *tuple( + ConfigValueOption(x.display_name, x.player_id) + for x in self.mass.players.all(True, False) + if x.player_id + in self.mass.config.get_raw_player_config_value( + player_id, CONF_GROUP_MEMBERS, [] + ) + ), + ConfigValueOption("Select automatically", "auto"), + ), + description="By default Music Assistant will automatically assign a " + "(random) player as sync leader, meaning the other players in the sync group " + "will be synced to that player. If you want to force a specific player to be " + "the sync leader, select it here.", + required=True, + ), CONF_ENTRY_PLAYER_ICON_GROUP, ) if not player_id.startswith(SYNCGROUP_PREFIX): @@ -212,17 +234,6 @@ class PlayerProvider(Provider): # default implementation, simply call the cmd_sync for all child players await self.cmd_sync(child_id, target_player) - async def cmd_unsync_many(self, player_ids: str) -> None: - """Handle UNSYNC command for all the given players. - - Remove the given player from any syncgroups it currently is synced to. - - - player_id: player_id of the player to handle the command. - """ - for player_id in player_ids: - # default implementation, simply call the cmd_sync for all player_ids - await self.cmd_unsync(player_id) - async def poll_player(self, player_id: str) -> None: """Poll player for state updates. @@ -230,66 +241,15 @@ class PlayerProvider(Provider): if 'needs_poll' is set to True in the player object. """ - def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None: + def on_group_child_power( + self, group_player: Player, child_player: Player, new_power: bool, group_state: PlayerState + ) -> None: """ - Call when a power command was executed on one of the child players of a Sync/Player group. + Call when a power command was executed on one of the child players of a PlayerGroup. This is used to handle special actions such as (re)syncing. + The group state is sent with the state BEFORE the power command was executed. """ - group_player = self.mass.players.get(player_id) - child_player = self.mass.players.get(child_player_id) - - if not group_player.powered: - # guard, this should be caught in the player controller but just in case... - return - - powered_childs = list(self.mass.players.iter_group_members(group_player, True)) - if not new_power and child_player in powered_childs: - powered_childs.remove(child_player) - if new_power and child_player not in powered_childs: - powered_childs.append(child_player) - - # if the last player of a group turned off, turn off the group - if len(powered_childs) == 0: - self.logger.debug( - "Group %s has no more powered members, turning off group player", - group_player.display_name, - ) - self.mass.create_task(self.mass.players.cmd_power(player_id, False)) - return - - # the below actions are only suitable for syncgroups - if ProviderFeature.SYNC_PLAYERS not in self.supported_features: - return - - group_playing = group_player.state == PlayerState.PLAYING - is_sync_leader = ( - len(child_player.group_childs) > 0 - and child_player.active_source == group_player.player_id - ) - if group_playing and not new_power and is_sync_leader: - # the current sync leader player turned OFF while the group player - # should still be playing - we need to select a new sync leader and resume - self.logger.warning( - "Syncleader %s turned off while syncgroup is playing, " - "a forced resume for syngroup %s will be attempted in 5 seconds...", - child_player.display_name, - group_player.display_name, - ) - - async def full_resync() -> None: - await self.mass.players.sync_syncgroup(group_player.player_id) - await self.mass.player_queues.resume(group_player.player_id) - - self.mass.call_later(5, full_resync, task_id=f"forced_resync_{player_id}") - return - elif new_power: - # if a child player turned ON while the group is already active, we need to resync - sync_leader = self.mass.players.get_sync_leader(group_player) - if sync_leader.player_id != child_player_id: - self.mass.create_task( - self.cmd_sync(child_player_id, sync_leader.player_id), - ) # DO NOT OVERRIDE BELOW diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 8f7dd0b5..4efdd496 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -669,8 +669,8 @@ class AirplayProvider(PlayerProvider): # special case: UGP stream ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp") ugp_stream = ugp_provider.streams[media.queue_id] - input_format = ugp_stream.audio_format - audio_source = ugp_stream.subscribe_raw() + input_format = ugp_stream.output_format + audio_source = ugp_stream.subscribe() elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id: # radio stream - consume media stream directly input_format = AIRPLAY_PCM_FORMAT diff --git a/music_assistant/server/providers/qobuz/__init__.py b/music_assistant/server/providers/qobuz/__init__.py index c745e50d..1ca34487 100644 --- a/music_assistant/server/providers/qobuz/__init__.py +++ b/music_assistant/server/providers/qobuz/__init__.py @@ -471,17 +471,19 @@ class QobuzProvider(MusicProvider): "format_id": format_id, } ] - await self._post_data("track/reportStreamingStart", data=events) + async with self.throttler.bypass(): + await self._post_data("track/reportStreamingStart", data=events) async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None: """Handle callback when an item completed streaming.""" user_id = self._user_auth_info["user"]["id"] - await self._get_data( - "/track/reportStreamingEnd", - user_id=user_id, - track_id=str(streamdetails.item_id), - duration=try_parse_int(seconds_streamed), - ) + async with self.throttler.bypass(): + await self._get_data( + "/track/reportStreamingEnd", + user_id=user_id, + track_id=str(streamdetails.item_id), + duration=try_parse_int(seconds_streamed), + ) def _parse_artist(self, artist_obj: dict): """Parse qobuz artist object to generic layout.""" diff --git a/music_assistant/server/providers/slimproto/__init__.py b/music_assistant/server/providers/slimproto/__init__.py index cf7e8c00..fbe5ae42 100644 --- a/music_assistant/server/providers/slimproto/__init__.py +++ b/music_assistant/server/providers/slimproto/__init__.py @@ -59,11 +59,12 @@ from music_assistant.constants import ( VERBOSE_LOG_LEVEL, ) from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params -from music_assistant.server.helpers.multi_client_stream import MultiClientStream from music_assistant.server.helpers.util import TaskManager from music_assistant.server.models.player_provider import PlayerProvider from music_assistant.server.providers.ugp import UniversalGroupProvider +from .multi_client_stream import MultiClientStream + if TYPE_CHECKING: from aioslimproto.models import SlimEvent diff --git a/music_assistant/server/helpers/multi_client_stream.py b/music_assistant/server/providers/slimproto/multi_client_stream.py similarity index 87% rename from music_assistant/server/helpers/multi_client_stream.py rename to music_assistant/server/providers/slimproto/multi_client_stream.py index bd31d63c..5ba2a53b 100644 --- a/music_assistant/server/helpers/multi_client_stream.py +++ b/music_assistant/server/providers/slimproto/multi_client_stream.py @@ -44,7 +44,9 @@ class MultiClientStream: empty_queue(sub_queue) async def get_stream( - self, output_format: AudioFormat, filter_params: list[str] | None = None + self, + output_format: AudioFormat, + filter_params: list[str] | None = None, ) -> AsyncGenerator[bytes, None]: """Get (client specific encoded) ffmpeg stream.""" async for chunk in get_ffmpeg_stream( @@ -75,20 +77,23 @@ class MultiClientStream: # wait for first/all subscriber count = 0 while count < 50: - await asyncio.sleep(0.5) + await asyncio.sleep(0.1) count += 1 if len(self.subscribers) >= expected_clients: break - if count == 50: - return LOGGER.debug( "Starting multi-client stream with %s/%s clients", len(self.subscribers), self.expected_clients, ) async for chunk in self.audio_source: - if len(self.subscribers) == 0: - return + fail_count = 0 + while len(self.subscribers) == 0: + await asyncio.sleep(0.1) + fail_count += 1 + if fail_count > 50: + LOGGER.warning("No clients connected, stopping stream") + return await asyncio.gather( *[sub.put(chunk) for sub in self.subscribers], return_exceptions=True ) diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index 2a609295..43943bd0 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -472,8 +472,8 @@ class SnapCastProvider(PlayerProvider): # special case: UGP stream ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp") ugp_stream = ugp_provider.streams[media.queue_id] - input_format = ugp_stream.audio_format - audio_source = ugp_stream.subscribe_raw() + input_format = ugp_stream.output_format + audio_source = ugp_stream.subscribe() elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id: # radio stream - consume media stream directly input_format = DEFAULT_SNAPCAST_FORMAT diff --git a/music_assistant/server/providers/ugp/__init__.py b/music_assistant/server/providers/ugp/__init__.py index fdf0caaa..3d8f2433 100644 --- a/music_assistant/server/providers/ugp/__init__.py +++ b/music_assistant/server/providers/ugp/__init__.py @@ -7,6 +7,8 @@ allowing the user to create player groups from all players known in the system. from __future__ import annotations +import asyncio +from collections.abc import AsyncGenerator, Awaitable, Callable from time import time from typing import TYPE_CHECKING, Final, cast @@ -36,12 +38,7 @@ from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia from music_assistant.constants import CONF_GROUP_MEMBERS, CONF_HTTP_PROFILE, SYNCGROUP_PREFIX from music_assistant.server.controllers.streams import DEFAULT_STREAM_HEADERS -from music_assistant.server.helpers.audio import ( - get_chunksize, - get_ffmpeg_stream, - get_player_filter_params, -) -from music_assistant.server.helpers.multi_client_stream import MultiClientStream +from music_assistant.server.helpers.audio import get_ffmpeg_stream from music_assistant.server.helpers.util import TaskManager from music_assistant.server.models.player_provider import PlayerProvider @@ -57,13 +54,13 @@ if TYPE_CHECKING: # ruff: noqa: ARG002 UGP_FORMAT = AudioFormat( - content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24 + content_type=ContentType.from_bit_depth(16), sample_rate=44100, bit_depth=16 ) UGP_PREFIX = "ugp_" CONF_ACTION_CREATE_PLAYER = "create_player" CONF_ACTION_CREATE_PLAYER_SAVE = "create_player_save" -CONF_ENTRY_SAMPLE_RATES_UGP = create_sample_rates_config_entry(48000, 24, 48000, 24, True) +CONF_ENTRY_SAMPLE_RATES_UGP = create_sample_rates_config_entry(44100, 16, 44100, 16, True) CONF_GROUP_PLAYERS: Final[str] = "group_players" CONF_NEW_GROUP_NAME: Final[str] = "name" CONF_NEW_GROUP_MEMBERS: Final[list[str]] = "members" @@ -170,7 +167,7 @@ class UniversalGroupProvider(PlayerProvider): """Initialize MusicProvider.""" super().__init__(mass, manifest, config) self._registered_routes: set[str] = set() - self.streams: dict[str, MultiClientStream] = {} + self.streams: dict[str, UGPStream] = {} async def loaded_in_mass(self) -> None: """Call after the provider has been loaded.""" @@ -310,7 +307,7 @@ class UniversalGroupProvider(PlayerProvider): group_player = self.mass.players.get(player_id) # stop any existing stream first if (existing := self.streams.pop(player_id, None)) and not existing.done: - existing.task.cancel() + await existing.stop() # select audio source if media.media_type == MediaType.ANNOUNCEMENT: @@ -339,10 +336,8 @@ class UniversalGroupProvider(PlayerProvider): ) # start the stream task - self.streams[player_id] = MultiClientStream( - audio_source=audio_source, audio_format=UGP_FORMAT - ) - base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.flac" + self.streams[player_id] = UGPStream(audio_source=audio_source, audio_format=UGP_FORMAT) + base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.aac" # forward to downstream play_media commands async with TaskManager(self.mass) as tg: @@ -414,23 +409,21 @@ class UniversalGroupProvider(PlayerProvider): group_childs=set(members), ) self.mass.players.register_or_update(player) - # register dynamic routes for the ugp stream (both flac and mp3) - for fmt in ("mp3", "flac"): - route_path = f"/ugp/{group_player_id}.{fmt}" - self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream) - self._registered_routes.add(route_path) + # register dynamic route for the ugp stream + route_path = f"/ugp/{group_player_id}.aac" + self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream) + self._registered_routes.add(route_path) return player - def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None: + def on_group_child_power( + self, group_player: Player, child_player: Player, new_power: bool, group_state: PlayerState + ) -> None: """ Call when a power command was executed on one of the child player of a PlayerGroup. - This is used to handle special actions such as (re)syncing. + The group state is sent with the state BEFORE the power command was executed. """ - group_player = self.mass.players.get(player_id) - child_player = self.mass.players.get(child_player_id) - if not group_player.powered: # guard, this should be caught in the player controller but just in case... return None @@ -438,7 +431,7 @@ class UniversalGroupProvider(PlayerProvider): powered_childs = [ x for x in self.mass.players.iter_group_members(group_player, True) - if not (not new_power and x.player_id == child_player_id) + if not (not new_power and x.player_id == child_player.player_id) ] if new_power and child_player not in powered_childs: powered_childs.append(child_player) @@ -449,13 +442,13 @@ class UniversalGroupProvider(PlayerProvider): "Group %s has no more powered members, turning off group player", group_player.display_name, ) - self.mass.create_task(self.cmd_power(player_id, False)) + self.mass.create_task(self.cmd_power(group_player.player_id, False)) return False # if a child player turned ON while the group player is already playing # we just direct it to the existing stream (we dont care about the audio being in sync) - if new_power and group_player.state == PlayerState.PLAYING: - base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.flac" + if new_power and group_state == PlayerState.PLAYING: + base_url = f"{self.mass.streams.base_url}/ugp/{group_player.player_id}.aac" self.mass.create_task( self.mass.players.play_media( child_player.player_id, @@ -473,7 +466,6 @@ class UniversalGroupProvider(PlayerProvider): async def _serve_ugp_stream(self, request: web.Request) -> web.Response: """Serve the UGP (multi-client) flow stream audio to a player.""" ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1] - fmt = request.path.rsplit(".")[-1] child_player_id = request.query.get("player_id") # optional! if not (ugp_player := self.mass.players.get(ugp_player_id)): @@ -482,18 +474,12 @@ class UniversalGroupProvider(PlayerProvider): if not (stream := self.streams.get(ugp_player_id, None)) or stream.done: raise web.HTTPNotFound(body=f"There is no active UGP stream for {ugp_player_id}!") - output_format = AudioFormat( - content_type=ContentType.try_parse(fmt), - sample_rate=stream.audio_format.sample_rate, - bit_depth=stream.audio_format.bit_depth, - ) - http_profile: str = await self.mass.config.get_player_config_value( child_player_id, CONF_HTTP_PROFILE ) headers = { **DEFAULT_STREAM_HEADERS, - "Content-Type": f"audio/{fmt}", + "Content-Type": "audio/aac", "Accept-Ranges": "none", "Cache-Control": "no-cache", "Connection": "close", @@ -501,7 +487,7 @@ class UniversalGroupProvider(PlayerProvider): resp = web.StreamResponse(status=200, reason="OK", headers=headers) if http_profile == "forced_content_length": - resp.content_length = get_chunksize(output_format, 24 * 3600) + resp.content_length = 4294967296 elif http_profile == "chunked": resp.enable_chunked_encoding() @@ -517,17 +503,10 @@ class UniversalGroupProvider(PlayerProvider): ugp_player.display_name, child_player_id or request.remote, ) - - async for chunk in stream.get_stream( - output_format=output_format, - filter_params=get_player_filter_params(self.mass, child_player_id) - if child_player_id - else None, - ): + async for chunk in stream.subscribe(): try: await resp.write(chunk) - except (BrokenPipeError, ConnectionResetError): - # race condition + except (ConnectionError, ConnectionResetError): break return resp @@ -550,3 +529,70 @@ class UniversalGroupProvider(PlayerProvider): and x not in syncgroup_childs and not x.startswith(UGP_PREFIX) ] + + +class UGPStream: + """ + Implementation of a Stream for the Universal Group Player. + + Basiclaly this is like a fake radio radio stream (AAC) format with multiple subscribers. + The AAC format is chosen because it is widely supported and has a good balance between + quality and bandwidth and also allows for mid-stream joining of (extra) players. + """ + + def __init__( + self, + audio_source: AsyncGenerator[bytes, None], + audio_format: AudioFormat, + ) -> None: + """Initialize UGP Stream.""" + self.audio_source = audio_source + self.input_format = audio_format + self.output_format = AudioFormat(content_type=ContentType.AAC) + self.subscribers: list[Callable[[bytes], Awaitable]] = [] + self._task: asyncio.Task | None = None + self._done: asyncio.Event = asyncio.Event() + + @property + def done(self) -> bool: + """Return if this stream is already done.""" + return self._done.is_set() and self._task and self._task.done() + + async def stop(self) -> None: + """Stop/cancel the stream.""" + if self._done.is_set(): + return + if self._task and not self._task.done(): + self._task.cancel() + self._done.set() + + async def subscribe(self) -> AsyncGenerator[bytes, None]: + """Subscribe to the raw/unaltered audio stream.""" + # start the runner as soon as the (first) client connects + if not self._task: + self._task = asyncio.create_task(self._runner()) + queue = asyncio.Queue(1) + try: + self.subscribers.append(queue.put) + while True: + chunk = await queue.get() + if not chunk: + break + yield chunk + finally: + self.subscribers.remove(queue.put) + + async def _runner(self) -> None: + """Run the stream for the given audio source.""" + await asyncio.sleep(0.25) # small delay to allow subscribers to connect + async for chunk in get_ffmpeg_stream( + audio_input=self.audio_source, + input_format=self.input_format, + output_format=self.output_format, + # TODO: enable readrate limiting + initial burst once we have a newer ffmpeg version + # extra_input_args=["-readrate", "1.15"], + ): + await asyncio.gather(*[sub(chunk) for sub in self.subscribers], return_exceptions=True) + # empty chunk when done + await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True) + self._done.set() -- 2.34.1