Fixes and enhancements to syncgroups and UGP groups (#1621)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 27 Aug 2024 23:51:13 +0000 (01:51 +0200)
committerGitHub <noreply@github.com>
Tue, 27 Aug 2024 23:51:13 +0000 (01:51 +0200)
music_assistant/common/models/player.py
music_assistant/constants.py
music_assistant/server/controllers/players.py
music_assistant/server/helpers/multi_client_stream.py [deleted file]
music_assistant/server/models/player_provider.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/qobuz/__init__.py
music_assistant/server/providers/slimproto/__init__.py
music_assistant/server/providers/slimproto/multi_client_stream.py [new file with mode: 0644]
music_assistant/server/providers/snapcast/__init__.py
music_assistant/server/providers/ugp/__init__.py

index 5ee0e07c2de1b9ff5be2d6058fd697e3d943fffb..567b1b124ee30ffda1a370a76ac37c99336b0069 100644 (file)
@@ -134,6 +134,9 @@ class Player(DataClassDictMixin):
     # last_poll: when was the player last polled (used with needs_poll)
     last_poll: float = 0
 
+    # internal use only
+    _prev_volume_level: int = 0
+
     @property
     def corrected_elapsed_time(self) -> float:
         """Return the corrected/realtime elapsed time."""
index f1e6f05d1f18c697b3b1d7f9d03ee0f4f2ffbd8e..4732a768f35a4880f042c25923e0e84db35bfb0e 100644 (file)
@@ -65,6 +65,7 @@ CONF_ICON: Final[str] = "icon"
 CONF_LANGUAGE: Final[str] = "language"
 CONF_SAMPLE_RATES: Final[str] = "sample_rates"
 CONF_HTTP_PROFILE: Final[str] = "http_profile"
+CONF_SYNC_LEADER: Final[str] = "sync_leader"
 
 # config default values
 DEFAULT_HOST: Final[str] = "0.0.0.0"
index b6fda5c949fe741e62b3683fe8dc130d980eda45..0e8618016ca0c7cc41332bde6fc26a7575e7855b 100644 (file)
@@ -44,6 +44,7 @@ from music_assistant.constants import (
     CONF_GROUP_MEMBERS,
     CONF_HIDE_PLAYER,
     CONF_PLAYERS,
+    CONF_SYNC_LEADER,
     CONF_TTS_PRE_ANNOUNCE,
     SYNCGROUP_PREFIX,
 )
@@ -294,49 +295,80 @@ class PlayerController(CoreController):
         if player.powered == powered:
             return  # nothing to do
 
+        # grab info about any groups this player is active in
+        # to handle actions on the group when a (sync)group child turns on/off
+        if active_group_player_id := self._get_active_player_group(player):
+            active_group_player = self.get(active_group_player_id)
+            group_player_state = active_group_player.state
+        else:
+            active_group_player = None
+
         # always stop player at power off
         if (
             not powered
+            and player.powered
             and player.state in (PlayerState.PLAYING, PlayerState.PAUSED)
             and not player.synced_to
-            and player.powered
         ):
             await self.cmd_stop(player_id)
 
         # unsync player at power off
         if not powered:
-            if player.synced_to is not None:
+            if player.synced_to or player.group_childs:
                 await self.cmd_unsync(player_id)
-            for child in self.iter_group_members(player):
-                if not child.synced_to:
-                    continue
-                await self.cmd_unsync(child.player_id)
+
         if PlayerFeature.POWER in player.supported_features:
-            # forward to player provider
+            # player supports power command: forward to player provider
             player_provider = self.get_player_provider(player_id)
             async with self._player_throttlers[player_id]:
                 await player_provider.cmd_power(player_id, powered)
         else:
             # allow the stop command to process and prevent race conditions
             await asyncio.sleep(0.2)
+
         # always optimistically set the power state to update the UI
         # as fast as possible and prevent race conditions
         player.powered = powered
-        # always MA as active source on power ON
-        player.active_source = player_id if powered else None
+        # reset active source
+        player.active_source = None
         self.update(player_id)
-        # handle actions when a (sync)group child turns on/off
-        if active_group_player := self._get_active_player_group(player):
-            player_prov = self.get_player_provider(active_group_player)
-            player_prov.on_child_power(active_group_player, player.player_id, powered)
+
         # handle 'auto play on power on'  feature
-        elif (
-            powered
+        if (
+            not active_group_player
+            and powered
             and self.mass.config.get_raw_player_config_value(player_id, CONF_AUTO_PLAY, False)
             and player.active_source in (None, player_id)
         ):
             await self.mass.player_queues.resume(player_id)
 
+        # handle group player actions
+        if not (active_group_player and active_group_player.powered):
+            return
+
+        # run actions suitable for every type of group player
+        powered_childs = list(self.mass.players.iter_group_members(active_group_player, True))
+        if not powered and player in powered_childs:
+            powered_childs.remove(player.player_id)
+        elif powered and player.player_id not in powered_childs:
+            powered_childs.append(player.player_id)
+        # if the last player of a group turned off, turn off the group
+        if len(powered_childs) == 0:
+            self.logger.debug(
+                "Group %s has no more powered members, turning off group player",
+                active_group_player.display_name,
+            )
+            self.mass.create_task(self.mass.players.cmd_power(active_group_player.player_id, False))
+            return
+        # forward to either syncgroup logic or group player logic
+        if active_group_player.type == PlayerType.SYNC_GROUP:
+            self._on_syncgroup_child_power(active_group_player, player, powered, group_player_state)
+        elif active_group_player.type == PlayerType.GROUP:
+            player_prov = self.mass.get_provider(active_group_player.provider)
+            player_prov.on_group_child_power(
+                active_group_player, player, powered, group_player_state
+            )
+
     @api_command("players/cmd/volume_set")
     @handle_player_command
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
@@ -417,6 +449,10 @@ class PlayerController(CoreController):
             await self.cmd_power(player_id, power)
             return
 
+        if not (group_player.type == PlayerType.SYNC_GROUP or group_player.group_childs):
+            # this is not a (temporary) sync group - nothing to do
+            raise UnsupportedFeaturedException("Player is not a sync group")
+
         # make sure to update the group power state
         group_player.powered = power
 
@@ -424,6 +460,8 @@ class PlayerController(CoreController):
         if not power and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
             await self.cmd_stop(player_id)
 
+        # handle syncgroup - this will also work for temporary syncgroups
+        # where players are manually synced against a group leader
         any_member_powered = False
         async with TaskManager(self.mass) as tg:
             for member in self.iter_group_members(group_player, only_powered=True):
@@ -465,8 +503,17 @@ class PlayerController(CoreController):
         player = self.get(player_id, True)
         assert player
         if PlayerFeature.VOLUME_MUTE not in player.supported_features:
-            msg = f"Player {player.display_name} does not support muting"
-            raise UnsupportedFeaturedException(msg)
+            self.logger.info(
+                "Player %s does not support muting, using volume instead", player.display_name
+            )
+            if muted:
+                player._prev_volume_level = player.volume_level
+                player.volume_muted = True
+                await self.cmd_volume_set(player_id, 0)
+            else:
+                player.volume_muted = False
+                await self.cmd_volume_set(player_id, player._prev_volume_level)
+            return
         player_provider = self.get_player_provider(player_id)
         async with self._player_throttlers[player_id]:
             await player_provider.cmd_volume_mute(player_id, muted)
@@ -640,6 +687,10 @@ class PlayerController(CoreController):
 
             - player_id: player_id of the player to handle the command.
         """
+        if (player := self.get(player_id)) and player.group_childs:
+            # this player is a syncgroup leader, unsync all children
+            await self.cmd_unsync_many(player.group_childs)
+            return
         await self.cmd_unsync_many([player_id])
 
     @api_command("players/cmd/sync_many")
@@ -692,8 +743,7 @@ class PlayerController(CoreController):
     async def cmd_unsync_many(self, player_ids: list[str]) -> None:
         """Handle UNSYNC command for all the given players."""
         # filter all player ids on compatibility and availability
-        final_player_ids: UniqueList[str] = UniqueList()
-        for player_id in player_ids:
+        for player_id in list(player_ids):
             if not (child_player := self.get(player_id)):
                 self.logger.warning("Player %s is not available", player_id)
                 continue
@@ -702,16 +752,13 @@ class PlayerController(CoreController):
                     "Player %s does not support (un)sync commands", child_player.name
                 )
                 continue
-            final_player_ids.append(player_id)
-            # reset active source player if is unsynced
+            if not child_player.synced_to:
+                continue
+            # reset active source player if it is unsynced
             child_player.active_source = None
-
-        if not final_player_ids:
-            return
-
-        # forward command to the player provider after all (base) sanity checks
-        player_provider = self.get_player_provider(final_player_ids[0])
-        await player_provider.cmd_unsync_many(final_player_ids)
+            # forward command to the player provider
+            if player_provider := self.get_player_provider(player_id):
+                await player_provider.cmd_unsync(player_id)
 
     def set(self, player: Player) -> None:
         """Set/Update player details on the controller."""
@@ -1167,6 +1214,11 @@ class PlayerController(CoreController):
         ):
             if child_player.group_childs:
                 return child_player
+        pref_sync_leader = self.mass.config.get_raw_player_config_value(
+            group_player.player_id, CONF_SYNC_LEADER, "auto"
+        )
+        if pref_sync_leader != "auto" and (player := self.get(pref_sync_leader)):
+            return player
         # select new sync leader: return the first playing player
         for child_player in self.iter_group_members(
             group_player, only_powered=True, only_playing=True
@@ -1196,6 +1248,41 @@ class PlayerController(CoreController):
                 continue
             await self.cmd_sync(member.player_id, sync_leader.player_id)
 
+    def _on_syncgroup_child_power(
+        self, group_player: Player, child_player: Player, new_power: bool, group_state: PlayerState
+    ) -> None:
+        """
+        Call when a power command was executed on one of the child players of a SyncGroup.
+
+        This is used to handle special actions such as (re)syncing.
+        The group state is sent with the state BEFORE the power command was executed.
+        """
+        group_playing = group_state == PlayerState.PLAYING
+        sync_leader = self.mass.players.get_sync_leader(group_player)
+        is_sync_leader = child_player.player_id == sync_leader.player_id
+        if group_playing and not new_power and is_sync_leader:
+            # the current sync leader player turned OFF while the group player
+            # should still be playing - we need to select a new sync leader and resume
+            self.logger.warning(
+                "Syncleader %s turned off while syncgroup is playing, "
+                "a forced resync for syngroup %s will be attempted...",
+                child_player.display_name,
+                group_player.display_name,
+            )
+
+            async def full_resync() -> None:
+                await self.mass.players.sync_syncgroup(group_player.player_id)
+                await self.mass.player_queues.resume(group_player.player_id)
+
+            self.mass.call_later(2, full_resync, task_id=f"forced_resync_{group_player.player_id}")
+            return
+        elif new_power:
+            # if a child player turned ON while the group is already active, we need to resync
+            if sync_leader.player_id != child_player.player_id:
+                self.mass.create_task(
+                    self.cmd_sync(child_player.player_id, sync_leader.player_id),
+                )
+
     async def _register_syncgroups(self) -> None:
         """Register all (virtual/fake) syncgroup players."""
         player_configs = await self.mass.config.get_player_configs()
diff --git a/music_assistant/server/helpers/multi_client_stream.py b/music_assistant/server/helpers/multi_client_stream.py
deleted file mode 100644 (file)
index bd31d63..0000000
+++ /dev/null
@@ -1,96 +0,0 @@
-"""Implementation of a simple multi-client stream task/job."""
-
-import asyncio
-import logging
-from collections.abc import AsyncGenerator
-from contextlib import suppress
-
-from music_assistant.common.helpers.util import empty_queue
-from music_assistant.common.models.media_items import AudioFormat
-from music_assistant.server.helpers.audio import get_ffmpeg_stream
-
-LOGGER = logging.getLogger(__name__)
-
-
-class MultiClientStream:
-    """Implementation of a simple multi-client (audio) stream task/job."""
-
-    def __init__(
-        self,
-        audio_source: AsyncGenerator[bytes, None],
-        audio_format: AudioFormat,
-        expected_clients: int = 0,
-    ) -> None:
-        """Initialize MultiClientStream."""
-        self.audio_source = audio_source
-        self.audio_format = audio_format
-        self.subscribers: list[asyncio.Queue] = []
-        self.expected_clients = expected_clients
-        self.task = asyncio.create_task(self._runner())
-
-    @property
-    def done(self) -> bool:
-        """Return if this stream is already done."""
-        return self.task.done()
-
-    async def stop(self) -> None:
-        """Stop/cancel the stream."""
-        if self.done:
-            return
-        self.task.cancel()
-        with suppress(asyncio.CancelledError):
-            await self.task
-        for sub_queue in list(self.subscribers):
-            empty_queue(sub_queue)
-
-    async def get_stream(
-        self, output_format: AudioFormat, filter_params: list[str] | None = None
-    ) -> AsyncGenerator[bytes, None]:
-        """Get (client specific encoded) ffmpeg stream."""
-        async for chunk in get_ffmpeg_stream(
-            audio_input=self.subscribe_raw(),
-            input_format=self.audio_format,
-            output_format=output_format,
-            filter_params=filter_params,
-        ):
-            yield chunk
-
-    async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
-        """Subscribe to the raw/unaltered audio stream."""
-        try:
-            queue = asyncio.Queue(2)
-            self.subscribers.append(queue)
-            while True:
-                chunk = await queue.get()
-                if chunk == b"":
-                    break
-                yield chunk
-        finally:
-            with suppress(ValueError):
-                self.subscribers.remove(queue)
-
-    async def _runner(self) -> None:
-        """Run the stream for the given audio source."""
-        expected_clients = self.expected_clients or 1
-        # wait for first/all subscriber
-        count = 0
-        while count < 50:
-            await asyncio.sleep(0.5)
-            count += 1
-            if len(self.subscribers) >= expected_clients:
-                break
-            if count == 50:
-                return
-        LOGGER.debug(
-            "Starting multi-client stream with %s/%s clients",
-            len(self.subscribers),
-            self.expected_clients,
-        )
-        async for chunk in self.audio_source:
-            if len(self.subscribers) == 0:
-                return
-            await asyncio.gather(
-                *[sub.put(chunk) for sub in self.subscribers], return_exceptions=True
-            )
-        # EOF: send empty chunk
-        await asyncio.gather(*[sub.put(b"") for sub in self.subscribers], return_exceptions=True)
index f5c89c1c6d1661e498c9344f9a5990061db735d7..48711dc8a0beca893176cfef9116af183fdbf96e 100644 (file)
@@ -23,9 +23,9 @@ from music_assistant.common.models.config_entries import (
     ConfigValueOption,
     PlayerConfig,
 )
-from music_assistant.common.models.enums import ConfigEntryType, PlayerState, ProviderFeature
+from music_assistant.common.models.enums import ConfigEntryType, PlayerState
 from music_assistant.common.models.player import Player, PlayerMedia
-from music_assistant.constants import CONF_GROUP_MEMBERS, SYNCGROUP_PREFIX
+from music_assistant.constants import CONF_GROUP_MEMBERS, CONF_SYNC_LEADER, SYNCGROUP_PREFIX
 
 from .provider import Provider
 
@@ -71,6 +71,28 @@ class PlayerProvider(Provider):
                     multi_value=True,
                     required=True,
                 ),
+                ConfigEntry(
+                    key=CONF_SYNC_LEADER,
+                    type=ConfigEntryType.STRING,
+                    label="Preferred sync leader",
+                    default_value="auto",
+                    options=(
+                        *tuple(
+                            ConfigValueOption(x.display_name, x.player_id)
+                            for x in self.mass.players.all(True, False)
+                            if x.player_id
+                            in self.mass.config.get_raw_player_config_value(
+                                player_id, CONF_GROUP_MEMBERS, []
+                            )
+                        ),
+                        ConfigValueOption("Select automatically", "auto"),
+                    ),
+                    description="By default Music Assistant will automatically assign a "
+                    "(random) player as sync leader, meaning the other players in the sync group "
+                    "will be synced to that player. If you want to force a specific player to be "
+                    "the sync leader, select it here.",
+                    required=True,
+                ),
                 CONF_ENTRY_PLAYER_ICON_GROUP,
             )
         if not player_id.startswith(SYNCGROUP_PREFIX):
@@ -212,17 +234,6 @@ class PlayerProvider(Provider):
             # default implementation, simply call the cmd_sync for all child players
             await self.cmd_sync(child_id, target_player)
 
-    async def cmd_unsync_many(self, player_ids: str) -> None:
-        """Handle UNSYNC command for all the given players.
-
-        Remove the given player from any syncgroups it currently is synced to.
-
-            - player_id: player_id of the player to handle the command.
-        """
-        for player_id in player_ids:
-            # default implementation, simply call the cmd_sync for all player_ids
-            await self.cmd_unsync(player_id)
-
     async def poll_player(self, player_id: str) -> None:
         """Poll player for state updates.
 
@@ -230,66 +241,15 @@ class PlayerProvider(Provider):
         if 'needs_poll' is set to True in the player object.
         """
 
-    def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None:
+    def on_group_child_power(
+        self, group_player: Player, child_player: Player, new_power: bool, group_state: PlayerState
+    ) -> None:
         """
-        Call when a power command was executed on one of the child players of a Sync/Player group.
+        Call when a power command was executed on one of the child players of a PlayerGroup.
 
         This is used to handle special actions such as (re)syncing.
+        The group state is sent with the state BEFORE the power command was executed.
         """
-        group_player = self.mass.players.get(player_id)
-        child_player = self.mass.players.get(child_player_id)
-
-        if not group_player.powered:
-            # guard, this should be caught in the player controller but just in case...
-            return
-
-        powered_childs = list(self.mass.players.iter_group_members(group_player, True))
-        if not new_power and child_player in powered_childs:
-            powered_childs.remove(child_player)
-        if new_power and child_player not in powered_childs:
-            powered_childs.append(child_player)
-
-        # if the last player of a group turned off, turn off the group
-        if len(powered_childs) == 0:
-            self.logger.debug(
-                "Group %s has no more powered members, turning off group player",
-                group_player.display_name,
-            )
-            self.mass.create_task(self.mass.players.cmd_power(player_id, False))
-            return
-
-        # the below actions are only suitable for syncgroups
-        if ProviderFeature.SYNC_PLAYERS not in self.supported_features:
-            return
-
-        group_playing = group_player.state == PlayerState.PLAYING
-        is_sync_leader = (
-            len(child_player.group_childs) > 0
-            and child_player.active_source == group_player.player_id
-        )
-        if group_playing and not new_power and is_sync_leader:
-            # the current sync leader player turned OFF while the group player
-            # should still be playing - we need to select a new sync leader and resume
-            self.logger.warning(
-                "Syncleader %s turned off while syncgroup is playing, "
-                "a forced resume for syngroup %s will be attempted in 5 seconds...",
-                child_player.display_name,
-                group_player.display_name,
-            )
-
-            async def full_resync() -> None:
-                await self.mass.players.sync_syncgroup(group_player.player_id)
-                await self.mass.player_queues.resume(group_player.player_id)
-
-            self.mass.call_later(5, full_resync, task_id=f"forced_resync_{player_id}")
-            return
-        elif new_power:
-            # if a child player turned ON while the group is already active, we need to resync
-            sync_leader = self.mass.players.get_sync_leader(group_player)
-            if sync_leader.player_id != child_player_id:
-                self.mass.create_task(
-                    self.cmd_sync(child_player_id, sync_leader.player_id),
-                )
 
     # DO NOT OVERRIDE BELOW
 
index 8f7dd0b52c0fc6d962bbf12d192c851f295a0c79..4efdd496a6a1d961677f716f9d44793698df47a5 100644 (file)
@@ -669,8 +669,8 @@ class AirplayProvider(PlayerProvider):
                 # special case: UGP stream
                 ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
                 ugp_stream = ugp_provider.streams[media.queue_id]
-                input_format = ugp_stream.audio_format
-                audio_source = ugp_stream.subscribe_raw()
+                input_format = ugp_stream.output_format
+                audio_source = ugp_stream.subscribe()
             elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id:
                 # radio stream - consume media stream directly
                 input_format = AIRPLAY_PCM_FORMAT
index c745e50dfa79438ffeecda9873fc8f0f6b6db0e9..1ca3448703583932b1d2c4a9baa60bc2cffef374 100644 (file)
@@ -471,17 +471,19 @@ class QobuzProvider(MusicProvider):
                 "format_id": format_id,
             }
         ]
-        await self._post_data("track/reportStreamingStart", data=events)
+        async with self.throttler.bypass():
+            await self._post_data("track/reportStreamingStart", data=events)
 
     async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
         """Handle callback when an item completed streaming."""
         user_id = self._user_auth_info["user"]["id"]
-        await self._get_data(
-            "/track/reportStreamingEnd",
-            user_id=user_id,
-            track_id=str(streamdetails.item_id),
-            duration=try_parse_int(seconds_streamed),
-        )
+        async with self.throttler.bypass():
+            await self._get_data(
+                "/track/reportStreamingEnd",
+                user_id=user_id,
+                track_id=str(streamdetails.item_id),
+                duration=try_parse_int(seconds_streamed),
+            )
 
     def _parse_artist(self, artist_obj: dict):
         """Parse qobuz artist object to generic layout."""
index cf7e8c006a8dd25e183802c5a1766343ac252df4..fbe5ae429ea4f7105e76ae73de40911e7897c036 100644 (file)
@@ -59,11 +59,12 @@ from music_assistant.constants import (
     VERBOSE_LOG_LEVEL,
 )
 from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params
-from music_assistant.server.helpers.multi_client_stream import MultiClientStream
 from music_assistant.server.helpers.util import TaskManager
 from music_assistant.server.models.player_provider import PlayerProvider
 from music_assistant.server.providers.ugp import UniversalGroupProvider
 
+from .multi_client_stream import MultiClientStream
+
 if TYPE_CHECKING:
     from aioslimproto.models import SlimEvent
 
diff --git a/music_assistant/server/providers/slimproto/multi_client_stream.py b/music_assistant/server/providers/slimproto/multi_client_stream.py
new file mode 100644 (file)
index 0000000..5ba2a53
--- /dev/null
@@ -0,0 +1,101 @@
+"""Implementation of a simple multi-client stream task/job."""
+
+import asyncio
+import logging
+from collections.abc import AsyncGenerator
+from contextlib import suppress
+
+from music_assistant.common.helpers.util import empty_queue
+from music_assistant.common.models.media_items import AudioFormat
+from music_assistant.server.helpers.audio import get_ffmpeg_stream
+
+LOGGER = logging.getLogger(__name__)
+
+
+class MultiClientStream:
+    """Implementation of a simple multi-client (audio) stream task/job."""
+
+    def __init__(
+        self,
+        audio_source: AsyncGenerator[bytes, None],
+        audio_format: AudioFormat,
+        expected_clients: int = 0,
+    ) -> None:
+        """Initialize MultiClientStream."""
+        self.audio_source = audio_source
+        self.audio_format = audio_format
+        self.subscribers: list[asyncio.Queue] = []
+        self.expected_clients = expected_clients
+        self.task = asyncio.create_task(self._runner())
+
+    @property
+    def done(self) -> bool:
+        """Return if this stream is already done."""
+        return self.task.done()
+
+    async def stop(self) -> None:
+        """Stop/cancel the stream."""
+        if self.done:
+            return
+        self.task.cancel()
+        with suppress(asyncio.CancelledError):
+            await self.task
+        for sub_queue in list(self.subscribers):
+            empty_queue(sub_queue)
+
+    async def get_stream(
+        self,
+        output_format: AudioFormat,
+        filter_params: list[str] | None = None,
+    ) -> AsyncGenerator[bytes, None]:
+        """Get (client specific encoded) ffmpeg stream."""
+        async for chunk in get_ffmpeg_stream(
+            audio_input=self.subscribe_raw(),
+            input_format=self.audio_format,
+            output_format=output_format,
+            filter_params=filter_params,
+        ):
+            yield chunk
+
+    async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
+        """Subscribe to the raw/unaltered audio stream."""
+        try:
+            queue = asyncio.Queue(2)
+            self.subscribers.append(queue)
+            while True:
+                chunk = await queue.get()
+                if chunk == b"":
+                    break
+                yield chunk
+        finally:
+            with suppress(ValueError):
+                self.subscribers.remove(queue)
+
+    async def _runner(self) -> None:
+        """Run the stream for the given audio source."""
+        expected_clients = self.expected_clients or 1
+        # wait for first/all subscriber
+        count = 0
+        while count < 50:
+            await asyncio.sleep(0.1)
+            count += 1
+            if len(self.subscribers) >= expected_clients:
+                break
+        LOGGER.debug(
+            "Starting multi-client stream with %s/%s clients",
+            len(self.subscribers),
+            self.expected_clients,
+        )
+        async for chunk in self.audio_source:
+            fail_count = 0
+            while len(self.subscribers) == 0:
+                await asyncio.sleep(0.1)
+                fail_count += 1
+                if fail_count > 50:
+                    LOGGER.warning("No clients connected, stopping stream")
+                    return
+            await asyncio.gather(
+                *[sub.put(chunk) for sub in self.subscribers], return_exceptions=True
+            )
+        # EOF: send empty chunk
+        await asyncio.gather(*[sub.put(b"") for sub in self.subscribers], return_exceptions=True)
index 2a609295a152c2b374cb99e2b2e632456038306d..43943bd0a0568d2e25c518cc2955a6f26ef3b706 100644 (file)
@@ -472,8 +472,8 @@ class SnapCastProvider(PlayerProvider):
             # special case: UGP stream
             ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
             ugp_stream = ugp_provider.streams[media.queue_id]
-            input_format = ugp_stream.audio_format
-            audio_source = ugp_stream.subscribe_raw()
+            input_format = ugp_stream.output_format
+            audio_source = ugp_stream.subscribe()
         elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id:
             # radio stream - consume media stream directly
             input_format = DEFAULT_SNAPCAST_FORMAT
index fdf0caaae17945c5246ea5efffc337505c893de0..3d8f24333ad40fcd4ee16d64d01f2c93657c3176 100644 (file)
@@ -7,6 +7,8 @@ allowing the user to create player groups from all players known in the system.
 
 from __future__ import annotations
 
+import asyncio
+from collections.abc import AsyncGenerator, Awaitable, Callable
 from time import time
 from typing import TYPE_CHECKING, Final, cast
 
@@ -36,12 +38,7 @@ from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
 from music_assistant.constants import CONF_GROUP_MEMBERS, CONF_HTTP_PROFILE, SYNCGROUP_PREFIX
 from music_assistant.server.controllers.streams import DEFAULT_STREAM_HEADERS
-from music_assistant.server.helpers.audio import (
-    get_chunksize,
-    get_ffmpeg_stream,
-    get_player_filter_params,
-)
-from music_assistant.server.helpers.multi_client_stream import MultiClientStream
+from music_assistant.server.helpers.audio import get_ffmpeg_stream
 from music_assistant.server.helpers.util import TaskManager
 from music_assistant.server.models.player_provider import PlayerProvider
 
@@ -57,13 +54,13 @@ if TYPE_CHECKING:
 # ruff: noqa: ARG002
 
 UGP_FORMAT = AudioFormat(
-    content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24
+    content_type=ContentType.from_bit_depth(16), sample_rate=44100, bit_depth=16
 )
 UGP_PREFIX = "ugp_"
 
 CONF_ACTION_CREATE_PLAYER = "create_player"
 CONF_ACTION_CREATE_PLAYER_SAVE = "create_player_save"
-CONF_ENTRY_SAMPLE_RATES_UGP = create_sample_rates_config_entry(48000, 24, 48000, 24, True)
+CONF_ENTRY_SAMPLE_RATES_UGP = create_sample_rates_config_entry(44100, 16, 44100, 16, True)
 CONF_GROUP_PLAYERS: Final[str] = "group_players"
 CONF_NEW_GROUP_NAME: Final[str] = "name"
 CONF_NEW_GROUP_MEMBERS: Final[list[str]] = "members"
@@ -170,7 +167,7 @@ class UniversalGroupProvider(PlayerProvider):
         """Initialize MusicProvider."""
         super().__init__(mass, manifest, config)
         self._registered_routes: set[str] = set()
-        self.streams: dict[str, MultiClientStream] = {}
+        self.streams: dict[str, UGPStream] = {}
 
     async def loaded_in_mass(self) -> None:
         """Call after the provider has been loaded."""
@@ -310,7 +307,7 @@ class UniversalGroupProvider(PlayerProvider):
         group_player = self.mass.players.get(player_id)
         # stop any existing stream first
         if (existing := self.streams.pop(player_id, None)) and not existing.done:
-            existing.task.cancel()
+            await existing.stop()
 
         # select audio source
         if media.media_type == MediaType.ANNOUNCEMENT:
@@ -339,10 +336,8 @@ class UniversalGroupProvider(PlayerProvider):
             )
 
         # start the stream task
-        self.streams[player_id] = MultiClientStream(
-            audio_source=audio_source, audio_format=UGP_FORMAT
-        )
-        base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.flac"
+        self.streams[player_id] = UGPStream(audio_source=audio_source, audio_format=UGP_FORMAT)
+        base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.aac"
 
         # forward to downstream play_media commands
         async with TaskManager(self.mass) as tg:
@@ -414,23 +409,21 @@ class UniversalGroupProvider(PlayerProvider):
             group_childs=set(members),
         )
         self.mass.players.register_or_update(player)
-        # register dynamic routes for the ugp stream (both flac and mp3)
-        for fmt in ("mp3", "flac"):
-            route_path = f"/ugp/{group_player_id}.{fmt}"
-            self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream)
-            self._registered_routes.add(route_path)
+        # register dynamic route for the ugp stream
+        route_path = f"/ugp/{group_player_id}.aac"
+        self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream)
+        self._registered_routes.add(route_path)
 
         return player
 
-    def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None:
+    def on_group_child_power(
+        self, group_player: Player, child_player: Player, new_power: bool, group_state: PlayerState
+    ) -> None:
         """
         Call when a power command was executed on one of the child player of a PlayerGroup.
 
-        This is used to handle special actions such as (re)syncing.
+        The group state is sent with the state BEFORE the power command was executed.
         """
-        group_player = self.mass.players.get(player_id)
-        child_player = self.mass.players.get(child_player_id)
-
         if not group_player.powered:
             # guard, this should be caught in the player controller but just in case...
             return None
@@ -438,7 +431,7 @@ class UniversalGroupProvider(PlayerProvider):
         powered_childs = [
             x
             for x in self.mass.players.iter_group_members(group_player, True)
-            if not (not new_power and x.player_id == child_player_id)
+            if not (not new_power and x.player_id == child_player.player_id)
         ]
         if new_power and child_player not in powered_childs:
             powered_childs.append(child_player)
@@ -449,13 +442,13 @@ class UniversalGroupProvider(PlayerProvider):
                 "Group %s has no more powered members, turning off group player",
                 group_player.display_name,
             )
-            self.mass.create_task(self.cmd_power(player_id, False))
+            self.mass.create_task(self.cmd_power(group_player.player_id, False))
             return False
 
         # if a child player turned ON while the group player is already playing
         # we just direct it to the existing stream (we dont care about the audio being in sync)
-        if new_power and group_player.state == PlayerState.PLAYING:
-            base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.flac"
+        if new_power and group_state == PlayerState.PLAYING:
+            base_url = f"{self.mass.streams.base_url}/ugp/{group_player.player_id}.aac"
             self.mass.create_task(
                 self.mass.players.play_media(
                     child_player.player_id,
@@ -473,7 +466,6 @@ class UniversalGroupProvider(PlayerProvider):
     async def _serve_ugp_stream(self, request: web.Request) -> web.Response:
         """Serve the UGP (multi-client) flow stream audio to a player."""
         ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1]
-        fmt = request.path.rsplit(".")[-1]
         child_player_id = request.query.get("player_id")  # optional!
 
         if not (ugp_player := self.mass.players.get(ugp_player_id)):
@@ -482,18 +474,12 @@ class UniversalGroupProvider(PlayerProvider):
         if not (stream := self.streams.get(ugp_player_id, None)) or stream.done:
             raise web.HTTPNotFound(body=f"There is no active UGP stream for {ugp_player_id}!")
 
-        output_format = AudioFormat(
-            content_type=ContentType.try_parse(fmt),
-            sample_rate=stream.audio_format.sample_rate,
-            bit_depth=stream.audio_format.bit_depth,
-        )
-
         http_profile: str = await self.mass.config.get_player_config_value(
             child_player_id, CONF_HTTP_PROFILE
         )
         headers = {
             **DEFAULT_STREAM_HEADERS,
-            "Content-Type": f"audio/{fmt}",
+            "Content-Type": "audio/aac",
             "Accept-Ranges": "none",
             "Cache-Control": "no-cache",
             "Connection": "close",
@@ -501,7 +487,7 @@ class UniversalGroupProvider(PlayerProvider):
 
         resp = web.StreamResponse(status=200, reason="OK", headers=headers)
         if http_profile == "forced_content_length":
-            resp.content_length = get_chunksize(output_format, 24 * 3600)
+            resp.content_length = 4294967296
         elif http_profile == "chunked":
             resp.enable_chunked_encoding()
 
@@ -517,17 +503,10 @@ class UniversalGroupProvider(PlayerProvider):
             ugp_player.display_name,
             child_player_id or request.remote,
         )
-
-        async for chunk in stream.get_stream(
-            output_format=output_format,
-            filter_params=get_player_filter_params(self.mass, child_player_id)
-            if child_player_id
-            else None,
-        ):
+        async for chunk in stream.subscribe():
             try:
                 await resp.write(chunk)
-            except (BrokenPipeError, ConnectionResetError):
-                # race condition
+            except (ConnectionError, ConnectionResetError):
                 break
 
         return resp
@@ -550,3 +529,70 @@ class UniversalGroupProvider(PlayerProvider):
             and x not in syncgroup_childs
             and not x.startswith(UGP_PREFIX)
         ]
+
+
+class UGPStream:
+    """
+    Implementation of a Stream for the Universal Group Player.
+
+    Basiclaly this is like a fake radio radio stream (AAC) format with multiple subscribers.
+    The AAC format is chosen because it is widely supported and has a good balance between
+    quality and bandwidth and also allows for mid-stream joining of (extra) players.
+    """
+
+    def __init__(
+        self,
+        audio_source: AsyncGenerator[bytes, None],
+        audio_format: AudioFormat,
+    ) -> None:
+        """Initialize UGP Stream."""
+        self.audio_source = audio_source
+        self.input_format = audio_format
+        self.output_format = AudioFormat(content_type=ContentType.AAC)
+        self.subscribers: list[Callable[[bytes], Awaitable]] = []
+        self._task: asyncio.Task | None = None
+        self._done: asyncio.Event = asyncio.Event()
+
+    @property
+    def done(self) -> bool:
+        """Return if this stream is already done."""
+        return self._done.is_set() and self._task and self._task.done()
+
+    async def stop(self) -> None:
+        """Stop/cancel the stream."""
+        if self._done.is_set():
+            return
+        if self._task and not self._task.done():
+            self._task.cancel()
+        self._done.set()
+
+    async def subscribe(self) -> AsyncGenerator[bytes, None]:
+        """Subscribe to the raw/unaltered audio stream."""
+        # start the runner as soon as the (first) client connects
+        if not self._task:
+            self._task = asyncio.create_task(self._runner())
+        queue = asyncio.Queue(1)
+        try:
+            self.subscribers.append(queue.put)
+            while True:
+                chunk = await queue.get()
+                if not chunk:
+                    break
+                yield chunk
+        finally:
+            self.subscribers.remove(queue.put)
+
+    async def _runner(self) -> None:
+        """Run the stream for the given audio source."""
+        await asyncio.sleep(0.25)  # small delay to allow subscribers to connect
+        async for chunk in get_ffmpeg_stream(
+            audio_input=self.audio_source,
+            input_format=self.input_format,
+            output_format=self.output_format,
+            # TODO: enable readrate limiting + initial burst once we have a newer ffmpeg version
+            # extra_input_args=["-readrate", "1.15"],
+        ):
+            await asyncio.gather(*[sub(chunk) for sub in self.subscribers], return_exceptions=True)
+        # empty chunk when done
+        await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True)
+        self._done.set()