Fix: Issues with player groups and airplay mode
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 21 Nov 2024 11:57:55 +0000 (12:57 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 21 Nov 2024 12:20:43 +0000 (13:20 +0100)
music_assistant/controllers/players.py
music_assistant/models/player_provider.py
music_assistant/providers/airplay/provider.py
music_assistant/providers/bluesound/__init__.py
music_assistant/providers/player_group/__init__.py
music_assistant/providers/player_group/ugp_stream.py
music_assistant/providers/sonos/player.py
music_assistant/providers/sonos/provider.py

index bfc71539b0bb7052df372b7ed1bd6287f2068f01..9e6edfe66576a9f1acbbf55605e792a2ae76d376 100644 (file)
@@ -49,7 +49,6 @@ from music_assistant.helpers.throttle_retry import Throttler
 from music_assistant.helpers.util import TaskManager, get_changed_values
 from music_assistant.models.core_controller import CoreController
 from music_assistant.models.player_provider import PlayerProvider
-from music_assistant.providers.player_group import PlayerGroupProvider
 
 if TYPE_CHECKING:
     from collections.abc import Awaitable, Callable, Coroutine, Iterator
@@ -326,7 +325,9 @@ class PlayerController(CoreController):
 
         # ungroup player at power off
         player_was_synced = player.synced_to is not None
-        if not powered and (player.synced_to):
+        if not powered:
+            # this will handle both synced players and group players
+            # NOTE: ungroup will be ignored if the player is not grouped or synced
             await self.cmd_ungroup(player_id)
 
         # always stop player at power off
@@ -352,7 +353,9 @@ class PlayerController(CoreController):
         else:
             # allow the stop command to process and prevent race conditions
             await asyncio.sleep(0.2)
-            await self.mass.cache.set(player_id, powered, base_key="player_power")
+
+        # store last power state in cache
+        await self.mass.cache.set(player_id, powered, base_key="player_power")
 
         # always optimistically set the power state to update the UI
         # as fast as possible and prevent race conditions
@@ -641,7 +644,7 @@ class PlayerController(CoreController):
         parent_player: Player = self.get(target_player, True)
         prev_group_childs = parent_player.group_childs.copy()
         if PlayerFeature.SET_MEMBERS not in parent_player.supported_features:
-            msg = f"Player {parent_player.name} does not support sync commands"
+            msg = f"Player {parent_player.name} does not support group commands"
             raise UnsupportedFeaturedException(msg)
 
         if parent_player.synced_to:
@@ -663,7 +666,6 @@ class PlayerController(CoreController):
             if not (
                 child_player_id in parent_player.can_group_with
                 or child_player.provider in parent_player.can_group_with
-                or "*" in parent_player.can_group_with
             ):
                 raise UnsupportedFeaturedException(
                     f"Player {child_player.name} can not be grouped with {parent_player.name}"
@@ -717,20 +719,23 @@ class PlayerController(CoreController):
         if not (player := self.get(player_id)):
             self.logger.warning("Player %s is not available", player_id)
             return
-        if PlayerFeature.SET_MEMBERS not in player.supported_features:
-            self.logger.warning("Player %s does not support (un)group commands", player.name)
+
+        if (
+            player.active_group
+            and (group_player := self.get(player.active_group))
+            and PlayerFeature.SET_MEMBERS in group_player.supported_features
+        ):
+            # the player is part of a (permanent) groupplayer and the user tries to ungroup
+            # redirect the command to the group provider
+            group_provider = self.mass.get_provider(group_player.provider)
+            await group_provider.cmd_ungroup_member(player_id, group_player.player_id)
             return
+
         if not (player.synced_to or player.group_childs):
             return  # nothing to do
 
-        if player.active_group and (
-            (group_provider := self.get_player_provider(player.active_group))
-            and group_provider.domain == "player_group"
-        ):
-            # the player is part of a permanent (sync)group and the user tries to ungroup
-            # redirect the command to the group provider
-            group_provider = cast(PlayerGroupProvider, group_provider)
-            await group_provider.cmd_ungroup_member(player_id, player.active_group)
+        if PlayerFeature.SET_MEMBERS not in player.supported_features:
+            self.logger.warning("Player %s does not support (un)group commands", player.name)
             return
 
         # handle (edge)case where un ungroup command is sent to a sync leader;
index 784e040698426e7932dc97fedb3be68e4ba320f6..f7f7ee64bf10116326f782fff0dec5fe6fdb0434 100644 (file)
@@ -177,7 +177,7 @@ class PlayerProvider(Provider):
             - player_id: player_id of the player to handle the command.
             - target_player: player_id of the sync leader.
         """
-        # will only be called for players with SYNC feature set.
+        # will only be called for players with SET_MEMBERS feature set.
         raise NotImplementedError
 
     async def cmd_ungroup(self, player_id: str) -> None:
@@ -187,7 +187,7 @@ class PlayerProvider(Provider):
 
             - player_id: player_id of the player to handle the command.
         """
-        # will only be called for players with SYNC feature set.
+        # will only be called for players with SET_MEMBERS feature set.
         raise NotImplementedError
 
     async def cmd_group_many(self, target_player: str, child_player_ids: list[str]) -> None:
@@ -196,6 +196,17 @@ class PlayerProvider(Provider):
             # default implementation, simply call the cmd_group for all child players
             await self.cmd_group(child_id, target_player)
 
+    async def cmd_ungroup_member(self, player_id: str, target_player: str) -> None:
+        """Handle UNGROUP command for given player.
+
+        Remove the given player(id) from the given (master) player/sync group.
+
+            - player_id: player_id of the (child) player to ungroup from the group.
+            - target_player: player_id of the group player.
+        """
+        # can only be called for groupplayers with SET_MEMBERS feature set.
+        raise NotImplementedError
+
     async def poll_player(self, player_id: str) -> None:
         """Poll player for state updates.
 
index 89bc82fcf245d0fcc4c6b625f2a8474f6c1d9550..0ed4a1e74eccbff4b01bb693135de1ce00b0485e 100644 (file)
@@ -142,7 +142,6 @@ class AirplayProvider(PlayerProvider):
     _players: dict[str, AirPlayPlayer]
     _dacp_server: asyncio.Server = None
     _dacp_info: AsyncServiceInfo = None
-    _play_media_lock: asyncio.Lock = asyncio.Lock()
 
     @property
     def supported_features(self) -> set[ProviderFeature]:
@@ -279,56 +278,55 @@ class AirplayProvider(PlayerProvider):
         media: PlayerMedia,
     ) -> None:
         """Handle PLAY MEDIA on given player."""
-        async with self._play_media_lock:
-            player = self.mass.players.get(player_id)
-            # set the active source for the player to the media queue
-            # this accounts for syncgroups and linked players (e.g. sonos)
-            player.active_source = media.queue_id
-            if player.synced_to:
-                # should not happen, but just in case
-                raise RuntimeError("Player is synced")
-            # always stop existing stream first
-            async with TaskManager(self.mass) as tg:
-                for airplay_player in self._get_sync_clients(player_id):
-                    tg.create_task(airplay_player.cmd_stop(update_state=False))
-            # select audio source
-            if media.media_type == MediaType.ANNOUNCEMENT:
-                # special case: stream announcement
-                input_format = AIRPLAY_PCM_FORMAT
-                audio_source = self.mass.streams.get_announcement_stream(
-                    media.custom_data["url"],
-                    output_format=AIRPLAY_PCM_FORMAT,
-                    use_pre_announce=media.custom_data["use_pre_announce"],
-                )
-            elif media.queue_id.startswith("ugp_"):
-                # special case: UGP stream
-                ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
-                ugp_stream = ugp_provider.ugp_streams[media.queue_id]
-                input_format = ugp_stream.output_format
-                audio_source = ugp_stream.subscribe()
-            elif media.queue_id and media.queue_item_id:
-                # regular queue (flow) stream request
-                input_format = AIRPLAY_FLOW_PCM_FORMAT
-                audio_source = self.mass.streams.get_flow_stream(
-                    queue=self.mass.player_queues.get(media.queue_id),
-                    start_queue_item=self.mass.player_queues.get_item(
-                        media.queue_id, media.queue_item_id
-                    ),
-                    pcm_format=input_format,
-                )
-            else:
-                # assume url or some other direct path
-                # NOTE: this will fail if its an uri not playable by ffmpeg
-                input_format = AIRPLAY_PCM_FORMAT
-                audio_source = get_ffmpeg_stream(
-                    audio_input=media.uri,
-                    input_format=AudioFormat(ContentType.try_parse(media.uri)),
-                    output_format=AIRPLAY_PCM_FORMAT,
-                )
-            # setup RaopStreamSession for player (and its sync childs if any)
-            sync_clients = self._get_sync_clients(player_id)
-            raop_stream_session = RaopStreamSession(self, sync_clients, input_format, audio_source)
-            await raop_stream_session.start()
+        player = self.mass.players.get(player_id)
+        # set the active source for the player to the media queue
+        # this accounts for syncgroups and linked players (e.g. sonos)
+        player.active_source = media.queue_id
+        if player.synced_to:
+            # should not happen, but just in case
+            raise RuntimeError("Player is synced")
+        # always stop existing stream first
+        async with TaskManager(self.mass) as tg:
+            for airplay_player in self._get_sync_clients(player_id):
+                tg.create_task(airplay_player.cmd_stop(update_state=False))
+        # select audio source
+        if media.media_type == MediaType.ANNOUNCEMENT:
+            # special case: stream announcement
+            input_format = AIRPLAY_PCM_FORMAT
+            audio_source = self.mass.streams.get_announcement_stream(
+                media.custom_data["url"],
+                output_format=AIRPLAY_PCM_FORMAT,
+                use_pre_announce=media.custom_data["use_pre_announce"],
+            )
+        elif media.queue_id.startswith("ugp_"):
+            # special case: UGP stream
+            ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
+            ugp_stream = ugp_provider.ugp_streams[media.queue_id]
+            input_format = ugp_stream.output_format
+            audio_source = ugp_stream.subscribe()
+        elif media.queue_id and media.queue_item_id:
+            # regular queue (flow) stream request
+            input_format = AIRPLAY_FLOW_PCM_FORMAT
+            audio_source = self.mass.streams.get_flow_stream(
+                queue=self.mass.player_queues.get(media.queue_id),
+                start_queue_item=self.mass.player_queues.get_item(
+                    media.queue_id, media.queue_item_id
+                ),
+                pcm_format=input_format,
+            )
+        else:
+            # assume url or some other direct path
+            # NOTE: this will fail if its an uri not playable by ffmpeg
+            input_format = AIRPLAY_PCM_FORMAT
+            audio_source = get_ffmpeg_stream(
+                audio_input=media.uri,
+                input_format=AudioFormat(ContentType.try_parse(media.uri)),
+                output_format=AIRPLAY_PCM_FORMAT,
+            )
+        # setup RaopStreamSession for player (and its sync childs if any)
+        sync_clients = self._get_sync_clients(player_id)
+        raop_stream_session = RaopStreamSession(self, sync_clients, input_format, audio_source)
+        await raop_stream_session.start()
 
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
         """Send VOLUME_SET command to given player.
@@ -482,8 +480,7 @@ class AirplayProvider(PlayerProvider):
         else:
             manufacturer, model = get_model_info(info)
 
-        default_enabled = not is_broken_raop_model(manufacturer, model)
-        if not self.mass.config.get_raw_player_config_value(player_id, "enabled", default_enabled):
+        if not self.mass.config.get_raw_player_config_value(player_id, "enabled", True):
             self.logger.debug("Ignoring %s in discovery as it is disabled.", display_name)
             return
 
@@ -523,6 +520,7 @@ class AirplayProvider(PlayerProvider):
             },
             volume_level=volume,
             can_group_with={self.instance_id},
+            enabled_by_default=not is_broken_raop_model(manufacturer, model),
         )
         await self.mass.players.register_or_update(mass_player)
 
index 6730a66aa63c0d4e47ff07de58686a928928ad30..746f741ef6a55f4627fc2562949cbeda697a0ae4 100644 (file)
@@ -397,7 +397,7 @@ class BluesoundPlayerProvider(PlayerProvider):
     # TODO fix sync & ungroup
 
     async def cmd_group(self, player_id: str, target_player: str) -> None:
-        """Handle SYNC command for BluOS player."""
+        """Handle GROUP command for BluOS player."""
 
     async def cmd_ungroup(self, player_id: str) -> None:
         """Handle UNGROUP command for BluOS player."""
index 90940a1c4438cab71315089c9597bc7181fdd9c3..08a1d97e4d33c990610f76bcf2df2a4d0a184de5 100644 (file)
@@ -60,7 +60,7 @@ from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
 from music_assistant.helpers.util import TaskManager
 from music_assistant.models.player_provider import PlayerProvider
 
-from .ugp_stream import UGP_FORMAT, UGPStream
+from .ugp_stream import UGPStream
 
 if TYPE_CHECKING:
     from collections.abc import Iterable
@@ -73,6 +73,12 @@ if TYPE_CHECKING:
     from music_assistant.models import ProviderInstanceType
 
 
+UGP_FORMAT = AudioFormat(
+    content_type=ContentType.PCM_F32LE,
+    sample_rate=48000,
+    bit_depth=32,
+)
+
 # ruff: noqa: ARG002
 
 UNIVERSAL_PREFIX: Final[str] = "ugp_"
@@ -213,6 +219,7 @@ class PlayerGroupProvider(PlayerProvider):
             CONF_ENTRY_PLAYER_ICON_GROUP,
             CONF_ENTRY_GROUP_TYPE,
             CONF_ENTRY_GROUP_MEMBERS,
+            CONFIG_ENTRY_DYNAMIC_MEMBERS,
         )
         # group type is static and can not be changed. we just grab the existing, stored value
         group_type: str = self.mass.config.get_raw_player_config_value(
@@ -267,14 +274,13 @@ class PlayerGroupProvider(PlayerProvider):
         return (
             *base_entries,
             group_members,
-            CONFIG_ENTRY_DYNAMIC_MEMBERS,
             *(entry for entry in child_config_entries if entry.key in allowed_conf_entries),
         )
 
     async def on_player_config_change(self, config: PlayerConfig, changed_keys: set[str]) -> None:
         """Call (by config manager) when the configuration of a player changes."""
+        members = config.get_value(CONF_GROUP_MEMBERS)
         if f"values/{CONF_GROUP_MEMBERS}" in changed_keys:
-            members = config.get_value(CONF_GROUP_MEMBERS)
             # ensure we filter invalid members
             members = self._filter_members(config.get_value(CONF_GROUP_TYPE), members)
             if group_player := self.mass.players.get(config.player_id):
@@ -343,19 +349,6 @@ class PlayerGroupProvider(PlayerProvider):
         if not powered and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
             await self.cmd_stop(group_player.player_id)
 
-        # always (re)fetch the configured group members at power on
-        if not group_player.powered:
-            group_member_ids = self.mass.config.get_raw_player_config_value(
-                player_id, CONF_GROUP_MEMBERS, []
-            )
-            group_player.group_childs.set(
-                x
-                for x in group_member_ids
-                if (child_player := self.mass.players.get(x))
-                and child_player.available
-                and child_player.enabled
-            )
-
         if powered:
             # handle TURN_ON of the group player by turning on all members
             for member in self.mass.players.iter_group_members(
@@ -396,7 +389,7 @@ class PlayerGroupProvider(PlayerProvider):
         group_player.powered = powered
         self.mass.players.update(group_player.player_id)
         if not powered:
-            # reset the group members when powered off
+            # reset the original group members when powered off
             group_player.group_childs.set(
                 self.mass.config.get_raw_player_config_value(player_id, CONF_GROUP_MEMBERS, [])
             )
@@ -578,9 +571,22 @@ class PlayerGroupProvider(PlayerProvider):
             raise UnsupportedFeaturedException(
                 f"Adjusting group members is not allowed for group {group_player.display_name}"
             )
-        new_members = self._filter_members(group_type, [*group_player.group_childs, player_id])
-        group_player.group_childs.set(new_members)
-        if group_player.powered:
+        group_player.group_childs.append(player_id)
+
+        # handle resync/resume if group player was already playing
+        if group_player.state == PlayerState.PLAYING and group_type == GROUP_TYPE_UNIVERSAL:
+            child_player_provider = self.mass.players.get_player_provider(player_id)
+            base_url = f"{self.mass.streams.base_url}/ugp/{group_player.player_id}.mp3"
+            await child_player_provider.play_media(
+                player_id,
+                media=PlayerMedia(
+                    uri=f"{base_url}?player_id={player_id}",
+                    media_type=MediaType.FLOW_STREAM,
+                    title=group_player.display_name,
+                    queue_id=group_player.player_id,
+                ),
+            )
+        elif group_player.powered and group_type != GROUP_TYPE_UNIVERSAL:
             # power on group player (which will also resync) if needed
             await self.cmd_power(target_player, True)
 
@@ -606,20 +612,30 @@ class PlayerGroupProvider(PlayerProvider):
             raise UnsupportedFeaturedException(
                 f"Adjusting group members is not allowed for group {group_player.display_name}"
             )
-        is_sync_leader = len(child_player.group_childs) > 0
+        group_type = self.mass.config.get_raw_player_config_value(
+            group_player.player_id, CONF_ENTRY_GROUP_TYPE.key, CONF_ENTRY_GROUP_TYPE.default_value
+        )
         was_playing = child_player.state == PlayerState.PLAYING
-        # forward command to the player provider
+        is_sync_leader = len(child_player.group_childs) > 0
+        group_player.group_childs.remove(player_id)
+        child_player.active_group = None
+        child_player.active_source = None
+        if group_type == GROUP_TYPE_UNIVERSAL:
+            if was_playing:
+                # stop playing the group player
+                player_provider = self.mass.players.get_player_provider(child_player.player_id)
+                await player_provider.cmd_stop(child_player.player_id)
+            self._update_attributes(group_player)
+            return
+        # handle sync group
         if player_provider := self.mass.players.get_player_provider(child_player.player_id):
             await player_provider.cmd_ungroup(child_player.player_id)
-            child_player.active_group = None
-            child_player.active_source = None
-        group_player.group_childs.set({x for x in group_player.group_childs if x != player_id})
-        if is_sync_leader and was_playing:
+        if is_sync_leader and was_playing and group_player.powered:
             # ungrouping the sync leader will stop the group so we need to resume
-            self.mass.call_later(2, self.mass.players.cmd_play, group_player.player_id)
-        elif group_player.powered:
-            # power on group player (which will also resync) if needed
-            await self.cmd_power(group_player.player_id, True)
+            task_id = f"resync_group_{group_player.player_id}"
+            self.mass.call_later(
+                3, self.mass.players.cmd_play, group_player.player_id, task_id=task_id
+            )
 
     async def _register_all_players(self) -> None:
         """Register all (virtual/fake) group players in the Player controller."""
@@ -772,6 +788,9 @@ class PlayerGroupProvider(PlayerProvider):
 
     def _update_attributes(self, player: Player) -> None:
         """Update attributes of a player."""
+        group_type = self.mass.config.get_raw_player_config_value(
+            player.player_id, CONF_ENTRY_GROUP_TYPE.key, CONF_ENTRY_GROUP_TYPE.default_value
+        )
         for child_player in self.mass.players.iter_group_members(player, active_only=True):
             # just grab the first active player
             if child_player.synced_to:
@@ -785,6 +804,19 @@ class PlayerGroupProvider(PlayerProvider):
         else:
             player.state = PlayerState.IDLE
             player.active_source = player.player_id
+        if group_type == GROUP_TYPE_UNIVERSAL:
+            can_group_with = {
+                # allow grouping with all providers, except the playergroup provider itself
+                x.instance_id
+                for x in self.mass.players.providers
+                if x.instance_id != self.instance_id
+            }
+        elif sync_player_provider := self.mass.get_provider(group_type):
+            can_group_with = {sync_player_provider.instance_id}
+        else:
+            can_group_with = {}
+
+        player.can_group_with = can_group_with
         self.mass.players.update(player.player_id)
 
     async def _serve_ugp_stream(self, request: web.Request) -> web.Response:
index f5d8b8a183e45dbff35bf1e27c98838444afde02..ed9654a6d3829bdf0490b164abdbe4965b9149fc 100644 (file)
@@ -18,12 +18,6 @@ from music_assistant.helpers.util import empty_queue
 
 # ruff: noqa: ARG002
 
-UGP_FORMAT = AudioFormat(
-    content_type=ContentType.PCM_F32LE,
-    sample_rate=48000,
-    bit_depth=32,
-)
-
 
 class UGPStream:
     """
index 67a307df113d9497d309520687fe0827bb33496e..7b7290685c813fa7ec5c05b01455011a9c91c0d1 100644 (file)
@@ -200,7 +200,7 @@ class SonosPlayer:
         if self.client.player.is_passive:
             self.logger.debug("Ignore STOP command: Player is synced to another player.")
             return
-        if (airplay := self.get_linked_airplay_player(True)) and airplay.state != PlayerState.IDLE:
+        if airplay := self.get_linked_airplay_player(True):
             # linked airplay player is active, redirect the command
             self.logger.debug("Redirecting PLAY command to linked airplay player.")
             if player_provider := self.mass.get_provider(airplay.provider):
@@ -247,6 +247,7 @@ class SonosPlayer:
         self.mass_player.volume_muted = self.client.player.volume_muted
 
         group_parent = None
+        airplay_player = self.get_linked_airplay_player(False)
         if self.client.player.is_coordinator:
             # player is group coordinator
             active_group = self.client.player.group
@@ -254,6 +255,20 @@ class SonosPlayer:
                 self.mass_player.group_childs.set(self.client.player.group_members)
             else:
                 self.mass_player.group_childs.clear()
+            # append airplay child's to group childs
+            if self.airplay_mode_enabled and airplay_player:
+                airplay_childs = [
+                    x for x in airplay_player.group_childs if x != airplay_player.player_id
+                ]
+                self.mass_player.group_childs.extend(airplay_childs)
+                airplay_prov = self.mass.get_provider(airplay_player.provider)
+                self.mass_player.can_group_with.update(
+                    x.player_id
+                    for x in airplay_prov.players
+                    if x.player_id != airplay_player.player_id
+                )
+            else:
+                self.mass_player.can_group_with = {self.prov.instance_id}
             self.mass_player.synced_to = None
         else:
             # player is group child (synced to another player)
@@ -278,7 +293,6 @@ class SonosPlayer:
             self.mass_player.active_source = SOURCE_LINE_IN
         elif container_type == ContainerType.AIRPLAY:
             # check if the MA airplay player is active
-            airplay_player = self.get_linked_airplay_player(False)
             if airplay_player and airplay_player.state in (
                 PlayerState.PLAYING,
                 PlayerState.PAUSED,
index 99d8d17696aa25f4178631bb2167c8a9711ac212..f7c060cec71c16291e0f787a3157f60265b9f732 100644 (file)
@@ -201,9 +201,17 @@ class SonosPlayerProvider(PlayerProvider):
     async def cmd_group_many(self, target_player: str, child_player_ids: list[str]) -> None:
         """Create temporary sync group by joining given players to target player."""
         sonos_player = self.sonos_players[target_player]
-        await sonos_player.client.player.group.modify_group_members(
-            player_ids_to_add=child_player_ids, player_ids_to_remove=[]
-        )
+        if airplay_player := sonos_player.get_linked_airplay_player(False):
+            # if airplay mode is enabled, we could possibly receive child player id's that are
+            # not Sonos players, but Airplay players. We redirect those.
+            airplay_child_ids = [x for x in child_player_ids if x.startswith("ap")]
+            child_player_ids = [x for x in child_player_ids if x not in airplay_child_ids]
+            if airplay_child_ids:
+                await self.mass.players.cmd_group_many(airplay_player.player_id, airplay_child_ids)
+        if child_player_ids:
+            await sonos_player.client.player.group.modify_group_members(
+                player_ids_to_add=child_player_ids, player_ids_to_remove=[]
+            )
 
     async def cmd_ungroup(self, player_id: str) -> None:
         """Handle UNGROUP command for given player.