Bugfixes and improvements to (universal) player groups (#1203)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 4 Apr 2024 19:46:49 +0000 (21:46 +0200)
committerGitHub <noreply@github.com>
Thu, 4 Apr 2024 19:46:49 +0000 (21:46 +0200)
23 files changed:
music_assistant/common/helpers/uri.py
music_assistant/common/models/enums.py
music_assistant/common/models/player.py
music_assistant/constants.py
music_assistant/server/controllers/player_queues.py
music_assistant/server/controllers/players.py
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/didl_lite.py
music_assistant/server/helpers/multi_client_stream.py [new file with mode: 0644]
music_assistant/server/helpers/webserver.py
music_assistant/server/models/player_provider.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/chromecast/__init__.py
music_assistant/server/providers/dlna/__init__.py
music_assistant/server/providers/fully_kiosk/__init__.py
music_assistant/server/providers/hass_players/__init__.py
music_assistant/server/providers/slimproto/__init__.py
music_assistant/server/providers/snapcast/__init__.py
music_assistant/server/providers/sonos/__init__.py
music_assistant/server/providers/sonos/player.py
music_assistant/server/providers/ugp/__init__.py
music_assistant/server/server.py

index 558417bc48d280ecac30f447b6d2c5c64781e126..11a3997cb8329d5586cfc47af39baa847ce529b7 100644 (file)
@@ -19,7 +19,7 @@ def parse_uri(uri: str) -> tuple[MediaType, str, str]:
             media_type_str = uri.split("/")[3]
             media_type = MediaType(media_type_str)
             item_id = uri.split("/")[4].split("?")[0]
-        elif uri.startswith(("http://", "https://")):
+        elif uri.startswith(("http://", "https://", "rtsp://", "rtmp://")):
             # Translate a plain URL to the URL provider
             provider_instance_id_or_domain = "url"
             media_type = MediaType.UNKNOWN
index 75a2d53a45c1c8080c7c6d4d388255d636956627..2cfbaaa2841272f598d58d03ca1cfa1fb9788509 100644 (file)
@@ -19,6 +19,7 @@ class MediaType(StrEnum):
     RADIO = "radio"
     FOLDER = "folder"
     ANNOUNCEMENT = "announcement"
+    FLOW_STREAM = "flow_stream"
     UNKNOWN = "unknown"
 
     @classmethod
@@ -35,7 +36,6 @@ class MediaType(StrEnum):
             MediaType.TRACK,
             MediaType.PLAYLIST,
             MediaType.RADIO,
-            MediaType.ANNOUNCEMENT,
         )
 
 
index eb906bf4f78f415ff938226e0ea90076fd762fcd..ef32a2205990e468cb1ed345b6e1379843b08187 100644 (file)
@@ -8,7 +8,7 @@ from typing import Any
 
 from mashumaro import DataClassDictMixin
 
-from .enums import PlayerFeature, PlayerState, PlayerType
+from .enums import MediaType, PlayerFeature, PlayerState, PlayerType
 
 
 @dataclass(frozen=True)
@@ -20,6 +20,22 @@ class DeviceInfo(DataClassDictMixin):
     manufacturer: str = "Unknown Manufacturer"
 
 
+@dataclass
+class PlayerMedia(DataClassDictMixin):
+    """Metadata of Media loading/loaded into a player."""
+
+    uri: str  # uri or other identifier of the loaded media
+    media_type: MediaType = MediaType.UNKNOWN
+    title: str | None = None  # optional
+    artist: str | None = None  # optional
+    album: str | None = None  # optional
+    image_url: str | None = None  # optional
+    duration: int | None = None  # optional
+    queue_id: str | None = None  # only present for requests from queue controller
+    queue_item_id: str | None = None  # only present for requests from queue controller
+    custom_data: dict | None = None  # optional
+
+
 @dataclass
 class Player(DataClassDictMixin):
     """Representation of a Player within Music Assistant."""
@@ -58,8 +74,14 @@ class Player(DataClassDictMixin):
 
     # current_item_id: return item_id/uri of the current active/loaded item on the player
     # this may be a MA queue_item_id, url, uri or some provider specific string
+    # deprecated: use current_media instead
     current_item_id: str | None = None
 
+    # current_media: return current active/loaded item on the player
+    # this may be a MA queue item, url, uri or some provider specific string
+    # includes metadata if supported by the provider/player
+    current_media: PlayerMedia | None = None
+
     # can_sync_with: return tuple of player_ids that can be synced to/with this player
     # usually this is just a list of all player_ids within the playerprovider
     can_sync_with: tuple[str, ...] = field(default=())
index bd6a965ec82bdae1f62b2ec998353a3933cfd32f..a2eaa1f0849f1934a4be33f86a0c17c4e0876d67 100644 (file)
@@ -97,4 +97,3 @@ CONFIGURABLE_CORE_CONTROLLERS = (
 )
 SYNCGROUP_PREFIX: Final[str] = "syncgroup_"
 VERBOSE_LOG_LEVEL: Final[int] = 5
-UGP_PREFIX: Final[str] = "ugp_"
index 8bd6b83dbec7fecd571f4b8d00529db2f628316d..fc607e7a3b0ed51575e2872af248121f8ae2ea88 100644 (file)
@@ -30,9 +30,10 @@ from music_assistant.common.models.errors import (
     QueueEmpty,
 )
 from music_assistant.common.models.media_items import MediaItemType, media_from_dict
+from music_assistant.common.models.player import PlayerMedia
 from music_assistant.common.models.player_queue import PlayerQueue
 from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import FALLBACK_DURATION
+from music_assistant.constants import CONF_FLOW_MODE, FALLBACK_DURATION, MASS_LOGO_ONLINE
 from music_assistant.server.helpers.api import api_command
 from music_assistant.server.helpers.audio import get_stream_details
 from music_assistant.server.models.core_controller import CoreController
@@ -199,8 +200,11 @@ class PlayerQueuesController(CoreController):
         """Return the current active/synced queue for a player."""
         if player := self.mass.players.get(player_id):
             # account for player that is synced (sync child)
-            if player.synced_to:
+            if player.synced_to and player.synced_to != player.player_id:
                 return self.get_active_queue(player.synced_to)
+            # handle active group player
+            if player.active_group and player.active_group != player.player_id:
+                return self.get_active_queue(player.active_group)
             # active_source may be filled with other queue id
             if player.active_source != player_id and (
                 queue := self.get_active_queue(player.active_source)
@@ -671,14 +675,18 @@ class PlayerQueuesController(CoreController):
         queue.current_index = index
         queue.index_in_buffer = index
         queue.flow_mode_start_index = index
-        queue.flow_mode = False  # reset
+        queue.flow_mode = self.mass.config.get_raw_player_config_value(
+            queue_id, CONF_FLOW_MODE, False
+        )
         # get streamdetails - do this here to catch unavailable items early
         queue_item.streamdetails = await get_stream_details(
             self.mass, queue_item, seek_position=seek_position, fade_in=fade_in
         )
+        # send play_media request to player
         await self.mass.players.play_media(
             player_id=queue_id,
-            queue_item=queue_item,
+            # transform into PlayerMedia to send to the actual player implementation
+            media=self.player_media_from_queue_item(queue_item, queue.flow_mode),
         )
 
     # Interaction with player
@@ -953,6 +961,27 @@ class PlayerQueuesController(CoreController):
                 return index
         return None
 
+    def player_media_from_queue_item(self, queue_item: QueueItem, flow_mode: bool) -> PlayerMedia:
+        """Parse PlayerMedia from QueueItem."""
+        media = PlayerMedia(
+            uri=self.mass.streams.resolve_stream_url(queue_item, flow_mode=flow_mode),
+            media_type=MediaType.FLOW_STREAM if flow_mode else queue_item.media_type,
+            title="Music Assistant" if flow_mode else queue_item.name,
+            image_url=MASS_LOGO_ONLINE,
+            duration=queue_item.duration,
+            queue_id=queue_item.queue_id,
+            queue_item_id=queue_item.queue_item_id,
+        )
+        if not flow_mode and queue_item.media_item:
+            media.title = queue_item.media_item.name
+            media.artist = getattr(queue_item.media_item, "artist_str", "")
+            media.album = (
+                album.name if (album := getattr(queue_item.media_item, "album", None)) else ""
+            )
+            if queue_item.image:
+                media.image_url = self.mass.metadata.get_image_url(queue_item.image)
+        return media
+
     def _get_next_index(
         self, queue_id: str, cur_index: int | None, is_skip: bool = False
     ) -> int | None:
@@ -1028,8 +1057,9 @@ class PlayerQueuesController(CoreController):
             with suppress(QueueEmpty):
                 next_item = await self.preload_next_item(queue.queue_id, index)
                 if supports_enqueue:
-                    await self.mass.players.enqueue_next_queue_item(
-                        player_id=player.player_id, queue_item=next_item
+                    await self.mass.players.enqueue_next_media(
+                        player_id=player.player_id,
+                        media=self.player_media_from_queue_item(next_item, queue.flow_mode),
                     )
                     return
                 await self.play_index(queue.queue_id, next_item.queue_item_id)
index 3f5cb23e65a792734d679652e42c4cca51f2a037..264a8d6debe1b2d7c87718d331406071e3663f21 100644 (file)
@@ -7,8 +7,6 @@ import functools
 from contextlib import suppress
 from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, cast
 
-import shortuuid
-
 from music_assistant.common.helpers.util import get_changed_values
 from music_assistant.common.models.config_entries import (
     CONF_ENTRY_ANNOUNCE_VOLUME,
@@ -20,7 +18,6 @@ from music_assistant.common.models.config_entries import (
     CONF_ENTRY_TTS_PRE_ANNOUNCE,
 )
 from music_assistant.common.models.enums import (
-    ContentType,
     EventType,
     MediaType,
     PlayerFeature,
@@ -28,7 +25,6 @@ from music_assistant.common.models.enums import (
     PlayerType,
     ProviderFeature,
     ProviderType,
-    StreamType,
 )
 from music_assistant.common.models.errors import (
     AlreadyRegisteredError,
@@ -36,24 +32,21 @@ from music_assistant.common.models.errors import (
     ProviderUnavailableError,
     UnsupportedFeaturedException,
 )
-from music_assistant.common.models.media_items import AudioFormat
-from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.common.models.streamdetails import StreamDetails
+from music_assistant.common.models.player import Player, PlayerMedia
 from music_assistant.constants import (
     CONF_AUTO_PLAY,
     CONF_GROUP_MEMBERS,
     CONF_HIDE_PLAYER,
     CONF_PLAYERS,
     SYNCGROUP_PREFIX,
-    UGP_PREFIX,
 )
 from music_assistant.server.helpers.api import api_command
+from music_assistant.server.helpers.tags import parse_tags
 from music_assistant.server.models.core_controller import CoreController
 from music_assistant.server.models.player_provider import PlayerProvider
 
 if TYPE_CHECKING:
-    from collections.abc import Awaitable, Callable, Coroutine, Iterable, Iterator
+    from collections.abc import Awaitable, Callable, Coroutine, Iterator
 
     from music_assistant.common.models.config_entries import CoreConfig
 
@@ -459,13 +452,13 @@ class PlayerController(CoreController):
         # always optimistically set the power state to update the UI
         # as fast as possible and prevent race conditions
         player.powered = powered
+        # always MA as active source on power ON
+        player.active_source = player_id if powered else None
         self.update(player_id)
-        # handle actions when a syncgroup child turns on
+        # handle actions when a (sync)group child turns on/off
         if active_group_player := self._get_active_player_group(player):
-            if active_group_player.startswith(SYNCGROUP_PREFIX):
-                self._on_syncgroup_child_power(active_group_player, player.player_id, powered)
-            elif player_prov := self.get_player_provider(active_group_player):
-                player_prov.on_child_power(active_group_player, player.player_id, powered)
+            player_prov = self.get_player_provider(active_group_player)
+            player_prov.on_child_power(active_group_player, player.player_id, powered)
         # handle 'auto play on power on'  feature
         elif (
             powered
@@ -540,14 +533,14 @@ class PlayerController(CoreController):
 
     @api_command("players/cmd/group_power")
     async def cmd_group_power(self, player_id: str, power: bool) -> None:
-        """Handle power command for a PlayerGroup/SyncGroup."""
+        """Handle power command for a SyncGroup."""
         group_player = self.get(player_id, True)
 
         if group_player.powered == power:
             return  # nothing to do
 
-        if group_player.type == PlayerType.GROUP and not player_id.startswith(UGP_PREFIX):
-            # this is a native group player (and not UGP), redirect
+        if group_player.type == PlayerType.GROUP:
+            # this is a native group player, redirect
             await self.cmd_power(player_id, power)
             return
 
@@ -564,19 +557,22 @@ class PlayerController(CoreController):
                 any_member_powered = True
                 if power:
                     # set active source to group player if the group (is going to be) powered
-                    member.active_source = group_player.player_id
+                    member.active_group = group_player.player_id
+                    member.active_source = group_player.active_source
                 else:
                     # turn off child player when group turns off
                     tg.create_task(self.cmd_power(member.player_id, False))
                     member.active_source = None
+                    member.active_group = None
             # edge case: group turned on but no members are powered, power them all!
             if not any_member_powered and power:
                 for member in self.iter_group_members(group_player, only_powered=False):
                     tg.create_task(self.cmd_power(member.player_id, True))
-                    member.active_source = group_player.player_id
+                    member.active_group = group_player.player_id
+                    member.active_source = group_player.active_source
 
         if power and group_player.player_id.startswith(SYNCGROUP_PREFIX):
-            await self._sync_syncgroup(group_player.player_id)
+            await self.sync_syncgroup(group_player.player_id)
         self.update(player_id)
 
     @api_command("players/cmd/volume_mute")
@@ -646,6 +642,25 @@ class PlayerController(CoreController):
                     player.active_group, url, use_pre_announce, volume_level
                 )
                 return
+            if player.type in (PlayerType.SYNC_GROUP, PlayerType.GROUP) and not player.powered:
+                # announcement request sent to inactive group,
+                # redirect to all underlying players instead
+                self.logger.warning(
+                    "Detected announcement request to an inactive playergroup, "
+                    "this will be redirected to the individual players."
+                )
+                async with asyncio.TaskGroup() as tg:
+                    for group_member in player.group_childs:
+                        tg.create_task(
+                            self.play_announcement(
+                                group_member,
+                                url=url,
+                                use_pre_announce=use_pre_announce,
+                                volume_level=volume_level,
+                            )
+                        )
+                return
+
             # determine pre-announce from (group)player config
             if use_pre_announce is None and "tts" in url:
                 use_pre_announce = self.mass.config.get_raw_player_config_value(
@@ -659,25 +674,13 @@ class PlayerController(CoreController):
                 use_pre_announce,
                 url,
             )
-            # create a queue item for the announcement so
+            # create a PlayerMedia object for the announcement so
             # we can send a regular play-media call downstream
-            announcement = QueueItem(
-                queue_id=player.player_id,
-                queue_item_id=url,
-                name="Announcement",
-                duration=None,
-                streamdetails=StreamDetails(
-                    provider="url",
-                    item_id=url,
-                    audio_format=AudioFormat(
-                        content_type=ContentType.try_parse(url),
-                    ),
-                    stream_type=StreamType.HTTP,
-                    media_type=MediaType.ANNOUNCEMENT,
-                    path=url,
-                    target_loudness=-10,
-                    data={"url": url, "use_pre_announce": use_pre_announce},
-                ),
+            announcement = PlayerMedia(
+                uri=self.mass.streams.get_announcement_url(player_id, url, use_pre_announce),
+                media_type=MediaType.ANNOUNCEMENT,
+                title="Announcement",
+                custom_data={"url": url, "use_pre_announce": use_pre_announce},
             )
             # handle native announce support
             if native_announce_support:
@@ -689,54 +692,41 @@ class PlayerController(CoreController):
         finally:
             player.announcement_in_progress = False
 
-    async def play_media(self, player_id: str, queue_item: QueueItem) -> None:
+    @api_command("players/cmd/play_media")
+    async def play_media(self, player_id: str, media: PlayerMedia) -> None:
         """Handle PLAY MEDIA on given player.
 
-        This is called by the Queue controller to start playing a queue item on the given player.
-        The provider's own implementation should work out how to handle this request.
-
-            - player_id: player_id of the player to handle the command.
-            - queue_item: The QueueItem that needs to be played on the player.
+        - player_id: player_id of the player to handle the command.
+        - media: The Media that needs to be played on the player.
         """
         if player_id.startswith(SYNCGROUP_PREFIX):
             # redirect to syncgroup-leader if needed
             await self.cmd_group_power(player_id, True)
             group_player = self.get(player_id, True)
             if sync_leader := self.get_sync_leader(group_player):
-                await self.play_media(sync_leader.player_id, queue_item=queue_item)
+                await self.play_media(sync_leader.player_id, media=media)
                 group_player.state = PlayerState.PLAYING
             return
         player_prov = self.mass.players.get_player_provider(player_id)
         await player_prov.play_media(
             player_id=player_id,
-            queue_item=queue_item,
+            media=media,
         )
 
-    async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
-        """
-        Handle enqueuing of the next queue item on the player.
-
-        Only called if the player supports PlayerFeature.ENQUE_NEXT.
-        Called about 1 second after a new track started playing.
-        Called about 15 seconds before the end of the current track.
-
-        A PlayerProvider implementation is in itself responsible for handling this
-        so that the queue items keep playing until its empty or the player stopped.
-
-        This will NOT be called if the end of the queue is reached (and repeat disabled).
-        This will NOT be called if the player is using flow mode to playback the queue.
-        """
+    @api_command("players/cmd/enqueue_next_media")
+    async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
+        """Handle enqueuing of a next media item on the player."""
         if player_id.startswith(SYNCGROUP_PREFIX):
             # redirect to syncgroup-leader if needed
             group_player = self.get(player_id, True)
             if sync_leader := self.get_sync_leader(group_player):
-                await self.enqueue_next_queue_item(
+                await self.enqueue_next_media(
                     sync_leader.player_id,
-                    queue_item=queue_item,
+                    media=media,
                 )
             return
         player_prov = self.mass.players.get_player_provider(player_id)
-        await player_prov.enqueue_next_queue_item(player_id=player_id, queue_item=queue_item)
+        await player_prov.enqueue_next_media(player_id=player_id, media=media)
 
     @api_command("players/cmd/sync")
     @handle_player_command
@@ -761,6 +751,8 @@ class PlayerController(CoreController):
         if PlayerFeature.SYNC not in parent_player.supported_features:
             msg = f"Player {parent_player.name} does not support (un)sync commands"
             raise UnsupportedFeaturedException(msg)
+        if player_id == target_player:
+            return
         if child_player.synced_to:
             if child_player.synced_to == parent_player.player_id:
                 # nothing to do: already synced to this parent
@@ -825,13 +817,10 @@ class PlayerController(CoreController):
             msg = f"Provider {provider} is not available!"
             raise ProviderUnavailableError(msg)
         if ProviderFeature.PLAYER_GROUP_CREATE in player_prov.supported_features:
-            # provider supports group create feature: forward request to provider
-            # the provider is itself responsible for
-            # checking if the members can be used for grouping
+            # Provider supports group create feature: forward request to provider.
+            # NOTE: The provider is itself responsible for
+            # checking if the members can be used for grouping.
             return await player_prov.create_group(name, members=members)
-        if ProviderFeature.SYNC_PLAYERS in player_prov.supported_features:
-            # default syncgroup implementation
-            return await self._create_syncgroup(player_prov.instance_id, name, members)
         msg = f"Provider {player_prov.name} does not support creating groups"
         raise UnsupportedFeaturedException(msg)
 
@@ -1011,29 +1000,6 @@ class PlayerController(CoreController):
 
     # Syncgroup specific functions/helpers
 
-    async def _create_syncgroup(self, provider: str, name: str, members: list[str]) -> Player:
-        """Create new (providers-specific) SyncGroup with given name and members."""
-        new_group_id = f"{SYNCGROUP_PREFIX}{shortuuid.random(8).lower()}"
-        # cleanup list, filter groups (should be handled by frontend, but just in case)
-        members = [
-            x.player_id
-            for x in self
-            if x.player_id in members
-            if not x.player_id.startswith(SYNCGROUP_PREFIX)
-            if x.provider == provider and PlayerFeature.SYNC in x.supported_features
-        ]
-        # create default config with the user chosen name
-        self.mass.config.create_default_player_config(
-            new_group_id,
-            provider,
-            name=name,
-            enabled=True,
-            values={CONF_GROUP_MEMBERS: members},
-        )
-        return self._register_syncgroup(
-            group_player_id=new_group_id, provider=provider, name=name, members=members
-        )
-
     def get_sync_leader(self, group_player: Player) -> Player | None:
         """Get the active sync leader player for a syncgroup or synced player."""
         if group_player.synced_to:
@@ -1061,7 +1027,7 @@ class PlayerController(CoreController):
             return child_player
         return None
 
-    async def _sync_syncgroup(self, player_id: str) -> None:
+    async def sync_syncgroup(self, player_id: str) -> None:
         """Sync all (possible) players of a syncgroup."""
         group_player = self.get(player_id, True)
         sync_leader = self.get_sync_leader(group_player)
@@ -1078,105 +1044,25 @@ class PlayerController(CoreController):
 
     async def _register_syncgroups(self) -> None:
         """Register all (virtual/fake) syncgroup players."""
-        player_configs = await self.mass.config.get_player_configs(include_values=True)
+        player_configs = await self.mass.config.get_player_configs()
         for player_config in player_configs:
             if not player_config.player_id.startswith(SYNCGROUP_PREFIX):
                 continue
-            members = player_config.get_value(CONF_GROUP_MEMBERS)
-            self._register_syncgroup(
+            if not (player_prov := self.mass.get_provider(player_config.provider)):
+                continue
+            members = self.mass.config.get_raw_player_config_value(
+                player_config.player_id, CONF_GROUP_MEMBERS
+            )
+            player_prov.register_syncgroup(
                 group_player_id=player_config.player_id,
-                provider=player_config.provider,
                 name=player_config.name or player_config.default_name,
                 members=members,
             )
 
-    def _register_syncgroup(
-        self, group_player_id: str, provider: str, name: str, members: Iterable[str]
-    ) -> Player:
-        """Register a (virtual/fake) syncgroup player."""
-        # extract player features from first/random player
-        for member in members:
-            if first_player := self.get(member):
-                break
-        else:
-            # edge case: no child player is (yet) available; postpone register
-            return None
-        player = Player(
-            player_id=group_player_id,
-            provider=provider,
-            type=PlayerType.SYNC_GROUP,
-            name=name,
-            available=True,
-            powered=False,
-            device_info=DeviceInfo(model="SyncGroup", manufacturer=provider.title()),
-            supported_features=first_player.supported_features,
-            group_childs=set(members),
-            active_source=group_player_id,
-        )
-        self.mass.players.register_or_update(player)
-        return player
-
-    def _on_syncgroup_child_power(
-        self, player_id: str, child_player_id: str, new_power: bool
-    ) -> None:
-        """
-        Call when a power command was executed on one of the child player of a Player/Sync group.
-
-        This is used to handle special actions such as (re)syncing.
-        """
-        group_player = self.mass.players.get(player_id)
-        child_player = self.mass.players.get(child_player_id)
-
-        if not group_player.powered:
-            # guard, this should be caught in the player controller but just in case...
-            return
-
-        powered_childs = list(self.iter_group_members(group_player, True))
-        if not new_power and child_player in powered_childs:
-            powered_childs.remove(child_player)
-        if new_power and child_player not in powered_childs:
-            powered_childs.append(child_player)
-
-        # if the last player of a group turned off, turn off the group
-        if len(powered_childs) == 0:
-            self.logger.debug(
-                "Group %s has no more powered members, turning off group player",
-                group_player.display_name,
-            )
-            self.mass.create_task(self.cmd_power(player_id, False))
-            return
-
-        group_playing = group_player.state == PlayerState.PLAYING
-        is_sync_leader = (
-            len(child_player.group_childs) > 0
-            and child_player.active_source == group_player.player_id
-        )
-        if group_playing and not new_power and is_sync_leader:
-            # the current sync leader player turned OFF while the group player
-            # should still be playing - we need to select a new sync leader and resume
-            self.logger.warning(
-                "Syncleader %s turned off while syncgroup is playing, "
-                "a forced resume for syngroup %s will be attempted in 5 seconds...",
-                child_player.display_name,
-                group_player.display_name,
-            )
-
-            async def forced_resync() -> None:
-                # we need to wait a bit here to not run into massive race conditions
-                await asyncio.sleep(5)
-                await self._sync_syncgroup(group_player.player_id)
-                await self.mass.player_queues.resume(group_player.player_id)
-
-            self.mass.create_task(forced_resync())
-            return
-        if new_power:
-            # if a child player turned ON while the group player is on, we need to resync/resume
-            self.mass.create_task(self._sync_syncgroup(group_player.player_id))
-
     async def _play_announcement(
         self,
         player: Player,
-        announcement: QueueItem,
+        announcement: PlayerMedia,
         volume_level: int | None = None,
     ) -> None:
         """Handle (default/fallback) implementation of the play announcement feature.
@@ -1209,9 +1095,9 @@ class PlayerController(CoreController):
             # wait for the player to stop
             with suppress(TimeoutError):
                 await self.wait_for_state(player, PlayerState.IDLE, 10)
-        # a small amount of pause before the volume command
-        # prevents that the last piece of music is very loud
-        await asyncio.sleep(0.2)
+            # a small amount of pause before the volume command
+            # prevents that the last piece of music is very loud
+            await asyncio.sleep(0.2)
         # adjust volume if needed
         # in case of a (sync) group, we need to do this for all child players
         prev_volumes: dict[str, int] = {}
@@ -1219,7 +1105,12 @@ class PlayerController(CoreController):
             for volume_player_id in player.group_childs or (player.player_id,):
                 if not (volume_player := self.get(volume_player_id)):
                     continue
-                if volume_player.active_source != player.active_source:
+                # filter out players that have a different source active
+                if volume_player.active_source not in (
+                    player.active_source,
+                    volume_player.player_id,
+                    None,
+                ):
                     continue
                 prev_volume = volume_player.volume_level
                 announcement_volume = self.get_announcement_volume(volume_player_id, volume_level)
@@ -1239,8 +1130,8 @@ class PlayerController(CoreController):
             "Announcement to player %s - playing the announcement on the player...",
             player.display_name,
         )
-        await self.play_media(player_id=player.player_id, queue_item=announcement)
-        # wait for the player to play
+        await self.play_media(player_id=player.player_id, media=announcement)
+        # wait for the player(s) to play
         with suppress(TimeoutError):
             await self.wait_for_state(player, PlayerState.PLAYING, 10)
         self.logger.debug(
@@ -1248,10 +1139,11 @@ class PlayerController(CoreController):
             player.display_name,
         )
         # wait for the player to stop playing
+        if not announcement.duration:
+            media_info = await parse_tags(announcement.custom_data["url"])
+            announcement.duration = media_info.duration
         with suppress(TimeoutError):
-            await self.wait_for_state(
-                player, PlayerState.IDLE, (announcement.streamdetails.duration or 60) + 3
-            )
+            await self.wait_for_state(player, PlayerState.IDLE, (announcement.duration or 60) + 3)
         self.logger.debug(
             "Announcement to player %s - restore previous state...", player.display_name
         )
index 2cd0fa10bf5b5ba730b8fbfcde8d465b7c25e06a..f608609faeda1fe46c45601e9e1688ae2c055bc6 100644 (file)
@@ -14,10 +14,8 @@ import os
 import time
 import urllib.parse
 from collections.abc import AsyncGenerator
-from contextlib import suppress
 from typing import TYPE_CHECKING
 
-import shortuuid
 from aiofiles.os import wrap
 from aiohttp import web
 
@@ -40,7 +38,6 @@ from music_assistant.constants import (
     CONF_OUTPUT_CHANNELS,
     CONF_PUBLISH_IP,
     SILENCE_FILE,
-    UGP_PREFIX,
 )
 from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
 from music_assistant.server.helpers.audio import (
@@ -84,191 +81,6 @@ FLOW_DEFAULT_BIT_DEPTH = 24
 isfile = wrap(os.path.isfile)
 
 
-class MultiClientStreamJob:
-    """
-    Representation of a (multiclient) Audio Queue stream job/task.
-
-    The whole idea here is that in case of a player (sync)group,
-    all client players receive the exact same (PCM) audio chunks from the source audio.
-    A StreamJob is tied to a Queue and streams the queue flow stream,
-    In case a stream is restarted (e.g. when seeking), a new MultiClientStreamJob will be created.
-    """
-
-    _audio_task: asyncio.Task | None = None
-
-    def __init__(
-        self,
-        stream_controller: StreamsController,
-        queue_id: str,
-        pcm_format: AudioFormat,
-        start_queue_item: QueueItem,
-    ) -> None:
-        """Initialize MultiClientStreamJob instance."""
-        self.stream_controller = stream_controller
-        self.queue_id = queue_id
-        self.queue = self.stream_controller.mass.player_queues.get(queue_id)
-        assert self.queue  # just in case
-        self.pcm_format = pcm_format
-        self.start_queue_item = start_queue_item
-        self.job_id = shortuuid.uuid()
-        self.expected_players: set[str] = set()
-        self.subscribed_players: dict[str, asyncio.Queue[bytes]] = {}
-        self.bytes_streamed: int = 0
-        self._all_clients_connected = asyncio.Event()
-        self.logger = stream_controller.logger.getChild("streamjob")
-        self._finished: bool = False
-        # start running the audio task in the background
-        self._audio_task = asyncio.create_task(self._stream_job_runner())
-
-    @property
-    def finished(self) -> bool:
-        """Return if this StreamJob is finished."""
-        return self._finished or self._audio_task and self._audio_task.done()
-
-    @property
-    def pending(self) -> bool:
-        """Return if this Job is pending start."""
-        return not self.finished and not self._all_clients_connected.is_set()
-
-    @property
-    def running(self) -> bool:
-        """Return if this Job is running."""
-        return not self.finished and not self.pending
-
-    def stop(self) -> None:
-        """Stop running this job."""
-        self._finished = True
-        if self._audio_task and self._audio_task.done():
-            return
-        if self._audio_task:
-            self._audio_task.cancel()
-        for sub_queue in self.subscribed_players.values():
-            with suppress(asyncio.QueueFull):
-                sub_queue.put_nowait(b"")
-
-    def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str:
-        """Resolve the childplayer specific stream URL to this streamjob."""
-        fmt = output_codec.value
-        # handle raw pcm
-        if output_codec.is_pcm():
-            player = self.stream_controller.mass.players.get(child_player_id)
-            player_max_bit_depth = 24 if player.supports_24bit else 16
-            output_sample_rate = min(self.pcm_format.sample_rate, player.max_sample_rate)
-            output_bit_depth = min(self.pcm_format.bit_depth, player_max_bit_depth)
-            output_channels = self.stream_controller.mass.config.get_raw_player_config_value(
-                child_player_id, CONF_OUTPUT_CHANNELS, "stereo"
-            )
-            channels = 1 if output_channels != "stereo" else 2
-            fmt += (
-                f";codec=pcm;rate={output_sample_rate};"
-                f"bitrate={output_bit_depth};channels={channels}"
-            )
-        url = f"{self.stream_controller._server.base_url}/multi/{self.queue_id}/{self.job_id}/{child_player_id}/{self.start_queue_item.queue_item_id}.{fmt}"  # noqa: E501
-        self.expected_players.add(child_player_id)
-        return url
-
-    async def subscribe(self, player_id: str) -> AsyncGenerator[bytes, None]:
-        """Subscribe consumer and iterate incoming chunks on the queue."""
-        try:
-            # some players (e.g. dlna, sonos) misbehave and do multiple GET requests
-            # to the stream in an attempt to get the audio details such as duration
-            # which is a bit pointless for our duration-less queue stream
-            # and it completely messes with the subscription logic
-            if player_id in self.subscribed_players:
-                self.logger.warning(
-                    "Player %s is making multiple requests "
-                    "to the same stream, playback may be disturbed!",
-                    player_id,
-                )
-                player_id = f"{player_id}_{shortuuid.random(4)}"
-            elif self._all_clients_connected.is_set():
-                # client subscribes while we're already started - that is going to be messy for sure
-                self.logger.warning(
-                    "Player %s is is joining while the stream is already started, "
-                    "playback may be disturbed!",
-                    player_id,
-                )
-
-            self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2)
-
-            if self._all_clients_connected.is_set():
-                # client subscribes while we're already started,
-                # that will most probably lead to a bad experience but support it anyways
-                self.logger.warning(
-                    "Client %s is joining while the stream is already started", player_id
-                )
-            self.logger.debug("Subscribed client %s", player_id)
-
-            if len(self.subscribed_players) == len(self.expected_players):
-                # we reached the number of expected subscribers, set event
-                # so that chunks can be pushed
-                await asyncio.sleep(0.2)
-                self._all_clients_connected.set()
-
-            # keep reading audio chunks from the queue until we receive an empty one
-            while True:
-                chunk = await sub_queue.get()
-                if chunk == b"":
-                    # EOF chunk received
-                    break
-                yield chunk
-        finally:
-            self.subscribed_players.pop(player_id, None)
-            self.logger.debug("Unsubscribed client %s", player_id)
-            # check if this was the last subscriber and we should cancel
-            await asyncio.sleep(2)
-            if len(self.subscribed_players) == 0 and self._audio_task and not self.finished:
-                self.logger.debug("Cleaning up, all clients disappeared...")
-                self._audio_task.cancel()
-
-    async def _put_chunk(self, chunk: bytes) -> None:
-        """Put chunk of data to all subscribers."""
-        async with asyncio.TaskGroup() as tg:
-            for sub_queue in list(self.subscribed_players.values()):
-                # put this chunk on the player's subqueue
-                tg.create_task(sub_queue.put(chunk))
-        self.bytes_streamed += len(chunk)
-
-    async def _stream_job_runner(self) -> None:
-        """Feed audio chunks to StreamJob subscribers."""
-        chunk_num = 0
-        async for chunk in self.stream_controller.get_flow_stream(
-            self.queue,
-            self.start_queue_item,
-            self.pcm_format,
-        ):
-            chunk_num += 1
-            if chunk_num == 1:
-                # wait until all expected clients are connected
-                try:
-                    async with asyncio.timeout(10):
-                        await self._all_clients_connected.wait()
-                except TimeoutError:
-                    if len(self.subscribed_players) == 0:
-                        self.stream_controller.logger.exception(
-                            "Abort multi client stream job for queue %s: "
-                            "clients did not connect within timeout",
-                            self.queue.display_name,
-                        )
-                        break
-                    # not all clients connected but timeout expired, set flag and move on
-                    # with all clients that did connect
-                    self._all_clients_connected.set()
-                else:
-                    self.stream_controller.logger.debug(
-                        "Starting multi client stream job for queue %s "
-                        "with %s out of %s connected clients",
-                        self.queue.display_name,
-                        len(self.subscribed_players),
-                        len(self.expected_players),
-                    )
-
-            await self._put_chunk(chunk)
-
-        # mark EOF with empty chunk
-        await self._put_chunk(b"")
-
-
 def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
     """Parse PCM info from a codec/content_type string."""
     params = (
@@ -289,7 +101,6 @@ class StreamsController(CoreController):
         """Initialize instance."""
         super().__init__(*args, **kwargs)
         self._server = Webserver(self.logger, enable_dynamic_routes=True)
-        self.multi_client_jobs: dict[str, MultiClientStreamJob] = {}
         self.register_dynamic_route = self._server.register_dynamic_route
         self.unregister_dynamic_route = self._server.unregister_dynamic_route
         self.manifest.name = "Streamserver"
@@ -387,11 +198,6 @@ class StreamsController(CoreController):
                     "/single/{queue_id}/{queue_item_id}.{fmt}",
                     self.serve_queue_item_stream,
                 ),
-                (
-                    "*",
-                    "/multi/{queue_id}/{job_id}/{player_id}/{queue_item_id}.{fmt}",
-                    self.serve_multi_subscriber_stream,
-                ),
                 (
                     "*",
                     "/command/{queue_id}/{command}.mp3",
@@ -411,26 +217,12 @@ class StreamsController(CoreController):
 
     def resolve_stream_url(
         self,
-        player_id: str,
         queue_item: QueueItem,
-        output_codec: ContentType,
         flow_mode: bool = False,
+        output_codec: ContentType = ContentType.FLAC,
     ) -> str:
         """Resolve the stream URL for the given QueueItem."""
         fmt = output_codec.value
-        # handle special stream created by UGP
-        if queue_item.queue_id.startswith(UGP_PREFIX):
-            return self.multi_client_jobs[queue_item.queue_id].resolve_stream_url(
-                player_id, output_codec
-            )
-        # handle announcement item
-        if queue_item.media_type == MediaType.ANNOUNCEMENT:
-            return self.get_announcement_url(
-                player_id=queue_item.queue_id,
-                announcement_url=queue_item.streamdetails.data["url"],
-                use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
-                content_type=output_codec,
-            )
         # handle raw pcm without exact format specifiers
         if output_codec.is_pcm() and ";" not in fmt:
             fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
@@ -444,40 +236,6 @@ class StreamsController(CoreController):
         url += "?" + urllib.parse.urlencode(query_params)
         return url
 
-    def create_multi_client_stream_job(
-        self,
-        queue_id: str,
-        start_queue_item: QueueItem,
-        pcm_bit_depth: int = FLOW_DEFAULT_BIT_DEPTH,
-        pcm_sample_rate: int = FLOW_DEFAULT_SAMPLE_RATE,
-    ) -> MultiClientStreamJob:
-        """Create a MultiClientStreamJob for the given queue..
-
-        This is called by player/sync group implementations to start streaming
-        the queue audio to multiple players at once.
-        """
-        if existing_job := self.multi_client_jobs.pop(queue_id, None):
-            if (
-                queue_id.startswith(UGP_PREFIX)
-                and existing_job.job_id == start_queue_item.queue_item_id
-            ):
-                return existing_job
-            # cleanup existing job first
-            if not existing_job.finished:
-                existing_job.stop()
-        self.multi_client_jobs[queue_id] = stream_job = MultiClientStreamJob(
-            self,
-            queue_id=queue_id,
-            pcm_format=AudioFormat(
-                content_type=ContentType.from_bit_depth(pcm_bit_depth),
-                sample_rate=pcm_sample_rate,
-                bit_depth=pcm_bit_depth,
-                channels=2,
-            ),
-            start_queue_item=start_queue_item,
-        )
-        return stream_job
-
     async def serve_queue_item_stream(self, request: web.Request) -> web.Response:
         """Stream single queueitem audio to a player."""
         self._log_request(request)
@@ -642,64 +400,6 @@ class StreamsController(CoreController):
 
         return resp
 
-    async def serve_multi_subscriber_stream(self, request: web.Request) -> web.Response:
-        """Stream Queue Flow audio to a child player within a multi subscriber setup."""
-        self._log_request(request)
-        queue_id = request.match_info["queue_id"]
-        streamjob = self.multi_client_jobs.get(queue_id)
-        if not streamjob:
-            raise web.HTTPNotFound(reason=f"Unknown StreamJob for queue: {queue_id}")
-        job_id = request.match_info["job_id"]
-        if job_id != streamjob.job_id:
-            raise web.HTTPNotFound(reason=f"StreamJob ID {job_id} mismatch for queue: {queue_id}")
-        child_player_id = request.match_info["player_id"]
-        child_player = self.mass.players.get(child_player_id)
-        if not child_player:
-            raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")
-        # work out (childplayer specific!) output format/details
-        output_format = await self._get_output_format(
-            output_format_str=request.match_info["fmt"],
-            queue_player=child_player,
-            default_sample_rate=streamjob.pcm_format.sample_rate,
-            default_bit_depth=streamjob.pcm_format.bit_depth,
-        )
-        # prepare request, add some DLNA/UPNP compatible headers
-        headers = {
-            **DEFAULT_STREAM_HEADERS,
-            "Content-Type": f"audio/{output_format.output_format_str}",
-        }
-        resp = web.StreamResponse(
-            status=200,
-            reason="OK",
-            headers=headers,
-        )
-        await resp.prepare(request)
-
-        # return early if this is not a GET request
-        if request.method != "GET":
-            return resp
-
-        # all checks passed, start streaming!
-        self.logger.debug(
-            "Start serving multi-subscriber Queue flow audio stream for queue %s to player %s",
-            streamjob.queue.display_name,
-            child_player.display_name,
-        )
-
-        async for chunk in get_ffmpeg_stream(
-            audio_input=streamjob.subscribe(child_player_id),
-            input_format=streamjob.pcm_format,
-            output_format=output_format,
-            filter_params=get_player_filter_params(self.mass, child_player_id),
-        ):
-            try:
-                await resp.write(chunk)
-            except (BrokenPipeError, ConnectionResetError):
-                # race condition
-                break
-
-        return resp
-
     async def serve_command_request(self, request: web.Request) -> web.Response:
         """Handle special 'command' request for a player."""
         self._log_request(request)
@@ -805,6 +505,7 @@ class StreamsController(CoreController):
             queue.display_name,
             use_crossfade,
         )
+        total_bytes_sent = 0
 
         while True:
             # get (next) queue item to stream
@@ -844,26 +545,22 @@ class StreamsController(CoreController):
                 queue_track.streamdetails,
                 pcm_format=pcm_format,
                 # strip silence from begin/end if track is being crossfaded
-                strip_silence_begin=use_crossfade and bytes_written > 0,
+                strip_silence_begin=use_crossfade and total_bytes_sent > 0,
                 strip_silence_end=use_crossfade,
             ):
-                # required buffer size is a bit dynamic,
-                # it needs to be small when the flow stream starts
-                seconds_streamed = int(bytes_written / pcm_sample_size)
-                if not use_crossfade or seconds_streamed < 5:
-                    buffer_size = pcm_sample_size
-                elif seconds_streamed < 10:
-                    buffer_size = pcm_sample_size * 2
-                elif use_crossfade and seconds_streamed < 20:
-                    buffer_size = pcm_sample_size * 5
+                # buffer size needs to be big enough to include the crossfade part
+                # allow it to be a bit smaller when playback just starts
+                if not use_crossfade:
+                    req_buffer_size = pcm_sample_size
+                elif (total_bytes_sent + bytes_written) < crossfade_size:
+                    req_buffer_size = int(crossfade_size / 2)
                 else:
-                    buffer_size = crossfade_size + pcm_sample_size * 2
-                    # buffer size needs to be big enough to include the crossfade part
+                    req_buffer_size = crossfade_size
 
                 # ALWAYS APPEND CHUNK TO BUFFER
                 buffer += chunk
                 del chunk
-                if len(buffer) < buffer_size:
+                if len(buffer) < req_buffer_size:
                     # buffer is not full enough, move on
                     continue
 
@@ -892,12 +589,10 @@ class StreamsController(CoreController):
                     buffer = b""
 
                 #### OTHER: enough data in buffer, feed to output
-                while len(buffer) > buffer_size:
-                    subchunk = buffer[:pcm_sample_size]
+                while len(buffer) > req_buffer_size:
+                    yield buffer[:pcm_sample_size]
+                    bytes_written += pcm_sample_size
                     buffer = buffer[pcm_sample_size:]
-                    bytes_written += len(subchunk)
-                    yield subchunk
-                    del subchunk
 
             #### HANDLE END OF TRACK
             if last_fadeout_part:
@@ -909,10 +604,11 @@ class StreamsController(CoreController):
                 # if crossfade is enabled, save fadeout part to pickup for next track
                 last_fadeout_part = buffer[-crossfade_size:]
                 remaining_bytes = buffer[:-crossfade_size]
-                yield remaining_bytes
-                bytes_written += len(remaining_bytes)
+                if remaining_bytes:
+                    yield remaining_bytes
+                    bytes_written += len(remaining_bytes)
                 del remaining_bytes
-            else:
+            elif buffer:
                 # no crossfade enabled, just yield the buffer last part
                 bytes_written += len(buffer)
                 yield buffer
@@ -926,6 +622,7 @@ class StreamsController(CoreController):
             queue_track.streamdetails.duration = (
                 queue_track.streamdetails.seek_position + seconds_streamed
             )
+            total_bytes_sent += bytes_written
             self.logger.debug(
                 "Finished Streaming queue track: %s (%s) on queue %s",
                 queue_track.streamdetails.uri,
@@ -941,7 +638,7 @@ class StreamsController(CoreController):
             queue_track.streamdetails.seconds_streamed += last_part_seconds
             queue_track.streamdetails.duration += last_part_seconds
             del last_fadeout_part
-
+        total_bytes_sent += bytes_written
         self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
 
     async def get_announcement_stream(
@@ -991,8 +688,9 @@ class StreamsController(CoreController):
             strip_silence_end = False
         # pcm_sample_size = chunk size = 1 second of pcm audio
         pcm_sample_size = pcm_format.pcm_sample_size
-        buffer_size_begin = pcm_sample_size * 2 if strip_silence_begin else pcm_sample_size * 1
-        buffer_size_end = pcm_sample_size * 5 if strip_silence_end else pcm_sample_size * 1
+        buffer_size = (
+            pcm_sample_size * 5 if (strip_silence_begin or strip_silence_end) else pcm_sample_size
+        )
 
         # collect all arguments for ffmpeg
         filter_params = []
@@ -1037,9 +735,13 @@ class StreamsController(CoreController):
             input_format=streamdetails.audio_format,
             output_format=pcm_format,
             filter_params=filter_params,
-            # we criple ffmpeg a bit on purpose with the filter_threads
-            # option so it doesn't consume all cpu when calculating loudnorm
-            extra_input_args=[*extra_input_args, "-filter_threads", "1"],
+            extra_input_args=[
+                *extra_input_args,
+                # we criple ffmpeg a bit on purpose with the filter_threads
+                # option so it doesn't consume all cpu when calculating loudnorm
+                "-filter_threads",
+                "1",
+            ],
             name="ffmpeg_media_stream",
             stderr_enabled=True,
         ) as ffmpeg_proc:
@@ -1121,11 +823,10 @@ class StreamsController(CoreController):
             chunk_num = 0
             async for chunk in ffmpeg_proc.iter_chunked(pcm_sample_size):
                 chunk_num += 1
-                required_buffer = buffer_size_begin if chunk_num < 60 else buffer_size_end
                 buffer += chunk
                 del chunk
 
-                if len(buffer) < required_buffer:
+                if len(buffer) < buffer_size:
                     # buffer is not full enough, move on
                     continue
 
@@ -1144,12 +845,10 @@ class StreamsController(CoreController):
                     continue
 
                 #### OTHER: enough data in buffer, feed to output
-                while len(buffer) > required_buffer:
-                    subchunk = buffer[:pcm_sample_size]
+                while len(buffer) > buffer_size:
+                    yield buffer[:pcm_sample_size]
+                    state_data["bytes_sent"] += pcm_sample_size
                     buffer = buffer[pcm_sample_size:]
-                    state_data["bytes_sent"] += len(subchunk)
-                    yield subchunk
-                    del subchunk
 
             # all chunks received, strip silence of last part if needed and yield remaining bytes
             if strip_silence_end:
index c07e609534f36f58096afc2dd1c4f883f0c8b634..02397131936fe5dda136c9c9749b9dc5608c0883 100644 (file)
@@ -65,7 +65,7 @@ HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
 class FFMpeg(AsyncProcess):
     """FFMpeg wrapped as AsyncProcess."""
 
-    def __init__(
+    def __init__(  # noqa: PLR0913
         self,
         audio_input: AsyncGenerator[bytes, None] | str | int,
         input_format: AudioFormat,
@@ -76,6 +76,7 @@ class FFMpeg(AsyncProcess):
         name: str = "ffmpeg",
         stderr_enabled: bool = False,
         audio_output: str | int = "-",
+        loglevel: str | None = None,
     ) -> None:
         """Initialize AsyncProcess."""
         ffmpeg_args = get_ffmpeg_args(
@@ -86,7 +87,7 @@ class FFMpeg(AsyncProcess):
             input_path=audio_input if isinstance(audio_input, str) else "-",
             output_path=audio_output if isinstance(audio_output, str) else "-",
             extra_input_args=extra_input_args or [],
-            loglevel="info"
+            loglevel=loglevel or "info"
             if stderr_enabled or LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL)
             else "error",
         )
@@ -807,7 +808,7 @@ def get_ffmpeg_args(
         "-nostats",
         "-ignore_unknown",
         "-protocol_whitelist",
-        "file,http,https,tcp,tls,crypto,pipe,data,fd",
+        "file,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp",
     ]
     # collect input args
     input_args = []
index 69845357455e8e96c2b8a7546b082c434658febe..59b02c65717f3b994f70cb88c78c4539510f4ff4 100644 (file)
@@ -6,72 +6,47 @@ import datetime
 from typing import TYPE_CHECKING
 
 from music_assistant.common.models.enums import MediaType
-from music_assistant.constants import MASS_LOGO_ONLINE, UGP_PREFIX
+from music_assistant.constants import MASS_LOGO_ONLINE
 
 if TYPE_CHECKING:
-    from music_assistant.common.models.queue_item import QueueItem
-    from music_assistant.server import MusicAssistant
+    from music_assistant.common.models.player import PlayerMedia
 
 # ruff: noqa: E501
 
 
-def create_didl_metadata(mass: MusicAssistant, url: str, queue_item: QueueItem) -> str:
-    """Create DIDL metadata string from url and (optional) QueueItem."""
-    ext = url.split(".")[-1].split("?")[0]
-    if "flow" in url or queue_item.queue_id.startswith(UGP_PREFIX):
-        # flow stream
+def create_didl_metadata(media: PlayerMedia) -> str:
+    """Create DIDL metadata string from url and PlayerMedia."""
+    ext = media.uri.split(".")[-1].split("?")[0]
+    image_url = media.image_url or MASS_LOGO_ONLINE
+    if media.media_type in (MediaType.FLOW_STREAM, MediaType.RADIO) or not media.duration:
+        # flow stream, radio or other duration-less stream
+        title = media.title or media.uri
         return (
             '<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
             f'<item id="flowmode" parentID="0" restricted="1">'
-            f"<dc:title>Music Assistant</dc:title>"
-            f"<upnp:albumArtURI>{escape_string(MASS_LOGO_ONLINE)}</upnp:albumArtURI>"
-            f"<dc:queueItemId>{queue_item.queue_id}</dc:queueItemId>"
-            "<upnp:class>object.item.audioItem.audioBroadcast</upnp:class>"
-            f"<upnp:mimeType>audio/{ext}</upnp:mimeType>"
-            f'<res duration="23:59:59.000" protocolInfo="http-get:*:audio/{ext}:DLNA.ORG_PN={ext.upper()};DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000">{escape_string(url)}</res>'
-            "</item>"
-            "</DIDL-Lite>"
-        )
-    image_url = (
-        mass.metadata.get_image_url(queue_item.image) if queue_item.image else MASS_LOGO_ONLINE
-    )
-    if queue_item.media_type != MediaType.TRACK or not queue_item.duration:
-        # radio or other non-track item
-        return (
-            '<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
-            f'<item id="flowmode" parentID="0" restricted="1">'
-            f"<dc:title>{escape_string(queue_item.name)}</dc:title>"
+            f"<dc:title>{escape_string(title)}</dc:title>"
             f"<upnp:albumArtURI>{escape_string(image_url)}</upnp:albumArtURI>"
-            f"<dc:queueItemId>{queue_item.queue_item_id}</dc:queueItemId>"
+            f"<dc:queueItemId>{media.uri}</dc:queueItemId>"
             "<upnp:class>object.item.audioItem.audioBroadcast</upnp:class>"
             f"<upnp:mimeType>audio/{ext}</upnp:mimeType>"
-            f'<res duration="23:59:59.000" protocolInfo="http-get:*:audio/{ext}:DLNA.ORG_PN={ext.upper()};DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000">{escape_string(url)}</res>'
+            f'<res duration="23:59:59.000" protocolInfo="http-get:*:audio/{ext}:DLNA.ORG_PN={ext.upper()};DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000">{escape_string(media.uri)}</res>'
             "</item>"
             "</DIDL-Lite>"
         )
-    title = escape_string(queue_item.media_item.name)
-    if queue_item.media_item.artists and queue_item.media_item.artists[0].name:
-        artist = escape_string(queue_item.media_item.artists[0].name)
-    else:
-        artist = ""
-    if queue_item.media_item.album and queue_item.media_item.album.name:
-        album = escape_string(queue_item.media_item.album.name)
-    else:
-        album = ""
-    duration_str = str(datetime.timedelta(seconds=queue_item.duration)) + ".000"
+    duration_str = str(datetime.timedelta(seconds=media.duration or 0)) + ".000"
     return (
         '<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
         '<item id="1" parentID="0" restricted="1">'
-        f"<dc:title>{title}</dc:title>"
-        f"<dc:creator>{artist}</dc:creator>"
-        f"<upnp:album>{album}</upnp:album>"
-        f"<upnp:artist>{artist}</upnp:artist>"
-        f"<upnp:duration>{int(queue_item.duration)}</upnp:duration>"
-        f"<dc:queueItemId>{queue_item.queue_item_id}</dc:queueItemId>"
+        f"<dc:title>{escape_string(media.title or media.uri)}</dc:title>"
+        f"<dc:creator>{escape_string(media.artist or '')}</dc:creator>"
+        f"<upnp:album>{escape_string(media.album or '')}</upnp:album>"
+        f"<upnp:artist>{escape_string(media.artist or '')}</upnp:artist>"
+        f"<upnp:duration>{int(media.duration or 0)}</upnp:duration>"
+        f"<dc:queueItemId>{media.uri}</dc:queueItemId>"
         f"<upnp:albumArtURI>{escape_string(image_url)}</upnp:albumArtURI>"
         "<upnp:class>object.item.audioItem.audioBroadcast</upnp:class>"
         f"<upnp:mimeType>audio/{ext}</upnp:mimeType>"
-        f'<res duration="{duration_str}" protocolInfo="http-get:*:audio/{ext}:DLNA.ORG_PN={ext.upper()};DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000">{escape_string(url)}</res>'
+        f'<res duration="{duration_str}" protocolInfo="http-get:*:audio/{ext}:DLNA.ORG_PN={ext.upper()};DLNA.ORG_OP=01;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000">{escape_string(media.uri)}</res>'
         "</item>"
         "</DIDL-Lite>"
     )
diff --git a/music_assistant/server/helpers/multi_client_stream.py b/music_assistant/server/helpers/multi_client_stream.py
new file mode 100644 (file)
index 0000000..3dea07e
--- /dev/null
@@ -0,0 +1,98 @@
+"""Implementation of a simple multi-client stream task/job."""
+
+import asyncio
+import logging
+from collections.abc import AsyncGenerator
+from contextlib import suppress
+
+from music_assistant.common.helpers.util import empty_queue
+from music_assistant.common.models.media_items import AudioFormat
+from music_assistant.server.helpers.audio import get_ffmpeg_stream
+
+LOGGER = logging.getLogger(__name__)
+
+
+class MultiClientStream:
+    """Implementation of a simple multi-client (audio) stream task/job."""
+
+    def __init__(
+        self,
+        audio_source: AsyncGenerator[bytes, None],
+        audio_format: AudioFormat,
+        expected_clients: int = 0,
+    ) -> None:
+        """Initialize MultiClientStream."""
+        self.audio_source = audio_source
+        self.audio_format = audio_format
+        self.subscribers: list[asyncio.Queue] = []
+        self.expected_clients = expected_clients
+        self.task = asyncio.create_task(self._runner())
+
+    @property
+    def done(self) -> bool:
+        """Return if this stream is already done."""
+        return self.task.done()
+
+    async def stop(self) -> None:
+        """Stop/cancel the stream."""
+        if self.done:
+            return
+        self.task.cancel()
+        with suppress(asyncio.CancelledError):
+            await self.task
+        for sub_queue in list(self.subscribers):
+            empty_queue(sub_queue)
+
+    async def get_stream(
+        self, output_format: AudioFormat, filter_params: list[str] | None = None
+    ) -> AsyncGenerator[bytes, None]:
+        """Get (client specific encoded) ffmpeg stream."""
+        async for chunk in get_ffmpeg_stream(
+            audio_input=self.subscribe_raw(),
+            input_format=self.audio_format,
+            output_format=output_format,
+            filter_params=filter_params,
+        ):
+            yield chunk
+
+    async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
+        """Subscribe to the raw/unaltered audio stream."""
+        try:
+            queue = asyncio.Queue(1)
+            self.subscribers.append(queue)
+            while True:
+                chunk = await queue.get()
+                if chunk == b"":
+                    break
+                yield chunk
+        finally:
+            with suppress(ValueError):
+                self.subscribers.remove(queue)
+
+    async def _runner(self) -> None:
+        """Run the stream for the given audio source."""
+        expected_clients = self.expected_clients or 1
+        # wait for first/all subscriber
+        count = 0
+        while count < 50:
+            await asyncio.sleep(0.5)
+            count += 1
+            if len(self.subscribers) >= expected_clients:
+                break
+            if count == 50:
+                return
+        LOGGER.debug(
+            "Starting multi-client stream with %s/%s clients",
+            len(self.subscribers),
+            self.expected_clients,
+        )
+        async for chunk in self.audio_source:
+            if len(self.subscribers) == 0:
+                return
+            async with asyncio.TaskGroup() as tg:
+                for sub in list(self.subscribers):
+                    tg.create_task(sub.put(chunk))
+        # EOF: send empty chunk
+        async with asyncio.TaskGroup() as tg:
+            for sub in list(self.subscribers):
+                tg.create_task(sub.put(b""))
index 29dcc23535dba6bdef4b622c33b34f10a257e4a5..9ea282bd923a7eb91bf922de32cc76ff5e1d6a8c 100644 (file)
@@ -53,7 +53,7 @@ class Webserver:
             },
         )
         self.logger.info("Starting server on  %s:%s - base url: %s", bind_ip, bind_port, base_url)
-        self._apprunner = web.AppRunner(self._webapp, access_log=None)
+        self._apprunner = web.AppRunner(self._webapp, access_log=None, shutdown_timeout=10)
         # add static routes
         if self._static_routes:
             for method, path, handler in self._static_routes:
@@ -68,9 +68,7 @@ class Webserver:
         await self._apprunner.setup()
         # set host to None to bind to all addresses on both IPv4 and IPv6
         host = None if bind_ip == "0.0.0.0" else bind_ip
-        self._tcp_site = web.TCPSite(
-            self._apprunner, host=host, port=bind_port, shutdown_timeout=10
-        )
+        self._tcp_site = web.TCPSite(self._apprunner, host=host, port=bind_port)
         await self._tcp_site.start()
 
     async def close(self) -> None:
index d7f3db8ae54a27e941b125361628c61fe4bf8b10..c85d0e3527a8fcd9384fc3bed7ba36a0138d7cb1 100644 (file)
@@ -3,7 +3,9 @@
 from __future__ import annotations
 
 from abc import abstractmethod
-from typing import TYPE_CHECKING
+from collections.abc import Iterable
+
+import shortuuid
 
 from music_assistant.common.models.config_entries import (
     CONF_ENTRY_ANNOUNCE_VOLUME,
@@ -21,21 +23,18 @@ from music_assistant.common.models.config_entries import (
     ConfigValueOption,
     PlayerConfig,
 )
-from music_assistant.common.models.enums import ConfigEntryType
-from music_assistant.constants import (
-    CONF_GROUP_MEMBERS,
-    CONF_GROUP_PLAYERS,
-    SYNCGROUP_PREFIX,
-    UGP_PREFIX,
+from music_assistant.common.models.enums import (
+    ConfigEntryType,
+    PlayerFeature,
+    PlayerState,
+    PlayerType,
+    ProviderFeature,
 )
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
+from music_assistant.constants import CONF_GROUP_MEMBERS, CONF_GROUP_PLAYERS, SYNCGROUP_PREFIX
 
 from .provider import Provider
 
-if TYPE_CHECKING:
-    from music_assistant.common.models.player import Player
-    from music_assistant.common.models.queue_item import QueueItem
-
-
 # ruff: noqa: ARG001, ARG002
 
 
@@ -75,7 +74,7 @@ class PlayerProvider(Provider):
                 ),
                 CONF_ENTRY_PLAYER_ICON_GROUP,
             )
-        if not player_id.startswith((SYNCGROUP_PREFIX, UGP_PREFIX)):
+        if not player_id.startswith(SYNCGROUP_PREFIX):
             # add default entries for announce feature
             entries = (
                 *entries,
@@ -130,21 +129,21 @@ class PlayerProvider(Provider):
     async def play_media(
         self,
         player_id: str,
-        queue_item: QueueItem,
+        media: PlayerMedia,
     ) -> None:
         """Handle PLAY MEDIA on given player.
 
-        This is called by the Queue controller to start playing a queue item on the given player.
+        This is called by the Players controller to start playing a mediaitem on the given player.
         The provider's own implementation should work out how to handle this request.
 
             - player_id: player_id of the player to handle the command.
-            - queue_item: The QueueItem that needs to be played on the player.
+            - media: Details of the item that needs to be played on the player.
         """
         raise NotImplementedError
 
-    async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
+    async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
         """
-        Handle enqueuing of the next queue item on the player.
+        Handle enqueuing of the next (queue) item on the player.
 
         Only called if the player supports PlayerFeature.ENQUE_NEXT.
         Called about 1 second after a new track started playing.
@@ -158,7 +157,7 @@ class PlayerProvider(Provider):
         """
 
     async def play_announcement(
-        self, player_id: str, announcement: QueueItem, volume_level: int | None = None
+        self, player_id: str, announcement: PlayerMedia, volume_level: int | None = None
     ) -> None:
         """Handle (provider native) playback of an announcement on given player."""
         # will only be called for players with PLAY_ANNOUNCEMENT feature set.
@@ -229,8 +228,28 @@ class PlayerProvider(Provider):
             - name: Name for the new group to create.
             - members: A list of player_id's that should be part of this group.
         """
-        # will only be called for players with PLAYER_GROUP_CREATE feature set.
-        raise NotImplementedError
+        # should only be called for providers with PLAYER_GROUP_CREATE feature set.
+        if ProviderFeature.PLAYER_GROUP_CREATE not in self.supported_features:
+            raise NotImplementedError
+        # default implementation: create syncgroup
+        new_group_id = f"{SYNCGROUP_PREFIX}{shortuuid.random(8).lower()}"
+        # cleanup list, filter groups (should be handled by frontend, but just in case)
+        members = [
+            x.player_id
+            for x in self.players
+            if x.player_id in members
+            if not x.player_id.startswith(SYNCGROUP_PREFIX)
+            if x.provider == self.instance_id and PlayerFeature.SYNC in x.supported_features
+        ]
+        # create default config with the user chosen name
+        self.mass.config.create_default_player_config(
+            new_group_id,
+            self.instance_id,
+            name=name,
+            enabled=True,
+            values={CONF_GROUP_MEMBERS: members},
+        )
+        return self.register_syncgroup(group_player_id=new_group_id, name=name, members=members)
 
     async def poll_player(self, player_id: str) -> None:
         """Poll player for state updates.
@@ -250,10 +269,88 @@ class PlayerProvider(Provider):
 
     def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None:
         """
-        Call when a power command was executed on one of the child player of a Player/Sync group.
+        Call when a power command was executed on one of the child players of a Sync group.
 
         This is used to handle special actions such as (re)syncing.
         """
+        group_player = self.mass.players.get(player_id)
+        child_player = self.mass.players.get(child_player_id)
+
+        if not group_player.powered:
+            # guard, this should be caught in the player controller but just in case...
+            return
+
+        powered_childs = list(self.mass.players.iter_group_members(group_player, True))
+        if not new_power and child_player in powered_childs:
+            powered_childs.remove(child_player)
+        if new_power and child_player not in powered_childs:
+            powered_childs.append(child_player)
+
+        # if the last player of a group turned off, turn off the group
+        if len(powered_childs) == 0:
+            self.logger.debug(
+                "Group %s has no more powered members, turning off group player",
+                group_player.display_name,
+            )
+            self.mass.create_task(self.mass.players.cmd_power(player_id, False))
+            return
+
+        # the below actions are only suitable for syncgroups
+        if ProviderFeature.SYNC_PLAYERS not in self.supported_features:
+            return
+
+        group_playing = group_player.state == PlayerState.PLAYING
+        is_sync_leader = (
+            len(child_player.group_childs) > 0
+            and child_player.active_source == group_player.player_id
+        )
+        if group_playing and not new_power and is_sync_leader:
+            # the current sync leader player turned OFF while the group player
+            # should still be playing - we need to select a new sync leader and resume
+            self.logger.warning(
+                "Syncleader %s turned off while syncgroup is playing, "
+                "a forced resume for syngroup %s will be attempted in 5 seconds...",
+                child_player.display_name,
+                group_player.display_name,
+            )
+
+            async def full_resync() -> None:
+                await self.mass.players.sync_syncgroup(group_player.player_id)
+                await self.mass.player_queues.resume(group_player.player_id)
+
+            self.mass.call_later(5, full_resync, task_id=f"forced_resync_{player_id}")
+            return
+        elif new_power:
+            # if a child player turned ON while the group is already active, we need to resync
+            sync_leader = self.mass.players.get_sync_leader(group_player)
+            if sync_leader.player_id != child_player_id:
+                self.mass.create_task(
+                    self.cmd_sync(child_player_id, sync_leader.player_id),
+                )
+
+    def register_syncgroup(self, group_player_id: str, name: str, members: Iterable[str]) -> Player:
+        """Register a (virtual/fake) syncgroup player."""
+        # extract player features from first/random player
+        for member in members:
+            if first_player := self.mass.players.get(member):
+                break
+        else:
+            # edge case: no child player is (yet) available; postpone register
+            return None
+        player = Player(
+            player_id=group_player_id,
+            provider=self.instance_id,
+            type=PlayerType.SYNC_GROUP,
+            name=name,
+            available=True,
+            powered=False,
+            device_info=DeviceInfo(model="SyncGroup", manufacturer=self.name),
+            supported_features=first_player.supported_features,
+            group_childs=set(members),
+            active_source=group_player_id,
+        )
+        self.mass.players.register_or_update(player)
+        return player
 
     # DO NOT OVERRIDE BELOW
 
index 5c5fe3a340b207d16e8e250323ea4bfb7287deda..96cc8c3dd025ff37068385584870684cba63dd9d 100644 (file)
@@ -40,19 +40,23 @@ from music_assistant.common.models.enums import (
     ProviderFeature,
 )
 from music_assistant.common.models.media_items import AudioFormat
-from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
 from music_assistant.common.models.player_queue import PlayerQueue
-from music_assistant.constants import CONF_SYNC_ADJUST, UGP_PREFIX, VERBOSE_LOG_LEVEL
-from music_assistant.server.helpers.audio import get_ffmpeg_args, get_player_filter_params
+from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
+from music_assistant.server.helpers.audio import (
+    get_ffmpeg_args,
+    get_ffmpeg_stream,
+    get_player_filter_params,
+)
 from music_assistant.server.helpers.process import AsyncProcess, check_output
 from music_assistant.server.models.player_provider import PlayerProvider
 
 if TYPE_CHECKING:
     from music_assistant.common.models.config_entries import ProviderConfig
     from music_assistant.common.models.provider import ProviderManifest
-    from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
     from music_assistant.server.models import ProviderInstanceType
+    from music_assistant.server.providers.ugp import UniversalGroupProvider
 
 DOMAIN = "airplay"
 
@@ -463,7 +467,7 @@ class AirplayProvider(PlayerProvider):
     @property
     def supported_features(self) -> tuple[ProviderFeature, ...]:
         """Return the features supported by this Provider."""
-        return (ProviderFeature.SYNC_PLAYERS,)
+        return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE)
 
     async def handle_async_init(self) -> None:
         """Handle async initialization of the provider."""
@@ -491,7 +495,6 @@ class AirplayProvider(PlayerProvider):
             server=f"{socket.gethostname()}.local",
         )
         await self.mass.aiozc.async_register_service(self._dacp_info)
-        self._resync_handle: asyncio.TimerHandle | None = None
 
     async def on_mdns_service_state_change(
         self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
@@ -590,44 +593,51 @@ class AirplayProvider(PlayerProvider):
     async def play_media(
         self,
         player_id: str,
-        queue_item: QueueItem,
+        media: PlayerMedia,
     ) -> None:
         """Handle PLAY MEDIA on given player."""
         player = self.mass.players.get(player_id)
         if player.synced_to:
             # should not happen, but just in case
             raise RuntimeError("Player is synced")
-        # fix race condition where resync and play media are called at more or less the same time
-        if self._resync_handle:
-            self._resync_handle.cancel()
-            self._resync_handle = None
         # always stop existing stream first
-        wait_stopped = not queue_item.streamdetails or queue_item.streamdetails.seek_position == 0
         async with asyncio.TaskGroup() as tg:
             for airplay_player in self._get_sync_clients(player_id):
                 if airplay_player.active_stream and airplay_player.active_stream.running:
-                    tg.create_task(airplay_player.active_stream.stop(wait=wait_stopped))
+                    tg.create_task(airplay_player.active_stream.stop(wait=False))
         # select audio source
-        if queue_item.media_type == MediaType.ANNOUNCEMENT:
+        if media.media_type == MediaType.ANNOUNCEMENT:
             # special case: stream announcement
             input_format = AIRPLAY_PCM_FORMAT
             audio_source = self.mass.streams.get_announcement_stream(
-                queue_item.streamdetails.data["url"],
+                media.custom_data["url"],
                 output_format=AIRPLAY_PCM_FORMAT,
-                use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
+                use_pre_announce=media.custom_data["use_pre_announce"],
             )
-        elif queue_item.queue_id.startswith(UGP_PREFIX):
-            # special case: we got forwarded a request from the UGP
-            # use the existing stream job that was already created by UGP
-            stream_job = self.mass.streams.multi_client_jobs[queue_item.queue_id]
-            stream_job.expected_players.add(player_id)
-            input_format = stream_job.pcm_format
-            audio_source = stream_job.subscribe(player_id)
-        else:
-            queue = self.mass.player_queues.get(queue_item.queue_id)
+        elif media.queue_id.startswith("ugp_"):
+            # special case: UGP stream
+            ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
+            ugp_stream = ugp_provider.streams[media.queue_id]
+            input_format = ugp_stream.audio_format
+            audio_source = ugp_stream.subscribe_raw()
+        elif media.queue_id and media.queue_item_id:
+            # regular queue stream request
             input_format = AIRPLAY_PCM_FORMAT
             audio_source = self.mass.streams.get_flow_stream(
-                queue, start_queue_item=queue_item, pcm_format=AIRPLAY_PCM_FORMAT
+                queue=self.mass.player_queues.get(media.queue_id),
+                start_queue_item=self.mass.player_queues.get_item(
+                    media.queue_id, media.queue_item_id
+                ),
+                pcm_format=AIRPLAY_PCM_FORMAT,
+            )
+        else:
+            # assume url or some other direct path
+            # NOTE: this will fail if its an uri not playable by ffmpeg
+            input_format = AIRPLAY_PCM_FORMAT
+            audio_source = get_ffmpeg_stream(
+                audio_input=media.uri,
+                input_format=AudioFormat(ContentType.try_parse(media.uri)),
+                output_format=AIRPLAY_PCM_FORMAT,
             )
         self.mass.create_task(self._handle_stream_audio, player_id, audio_source, input_format)
 
@@ -714,17 +724,13 @@ class AirplayProvider(PlayerProvider):
         active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id)
         if active_queue.state == PlayerState.PLAYING:
             # playback needs to be restarted to form a new multi client stream session
-            def resync() -> None:
-                self._resync_handle = None
-                self.mass.create_task(
-                    self.mass.player_queues.resume(active_queue.queue_id, fade_in=False)
-                )
-
             # this could potentially be called by multiple players at the exact same time
             # so we debounce the resync a bit here with a timer
-            if self._resync_handle:
-                self._resync_handle.cancel()
-            self._resync_handle = self.mass.loop.call_later(0.5, resync)
+            self.mass.call_later(
+                1,
+                self.mass.player_queues.resume(active_queue.queue_id, fade_in=False),
+                task_id=f"resume_{active_queue.queue_id}",
+            )
         else:
             # make sure that the player manager gets an update
             self.mass.players.update(child_player.player_id, skip_forward=True)
index 361733a40702a145ba1e70b0825933164eacb52e..7bb06958b4c051fdf47575f8dd94fe2cea18f75b 100644 (file)
@@ -8,7 +8,6 @@ import logging
 import threading
 import time
 from dataclasses import dataclass
-from logging import Logger
 from typing import TYPE_CHECKING, Any
 from uuid import UUID
 
@@ -26,14 +25,13 @@ from music_assistant.common.models.config_entries import (
 )
 from music_assistant.common.models.enums import (
     ConfigEntryType,
-    ContentType,
     MediaType,
     PlayerFeature,
     PlayerState,
     PlayerType,
 )
 from music_assistant.common.models.errors import PlayerUnavailableError
-from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
 from music_assistant.constants import (
     CONF_CROSSFADE,
     CONF_FLOW_MODE,
@@ -52,7 +50,6 @@ if TYPE_CHECKING:
 
     from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
     from music_assistant.common.models.provider import ProviderManifest
-    from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
     from music_assistant.server.models import ProviderInstanceType
 
@@ -125,7 +122,6 @@ class CastPlayer:
     cast_info: ChromecastInfo
     cc: pychromecast.Chromecast
     player: Player
-    logger: Logger
     status_listener: CastStatusListener | None = None
     mz_controller: MultizoneController | None = None
     active_group: str | None = None
@@ -237,41 +233,25 @@ class ChromecastProvider(PlayerProvider):
     async def play_media(
         self,
         player_id: str,
-        queue_item: QueueItem,
+        media: PlayerMedia,
     ) -> None:
         """Handle PLAY MEDIA on given player."""
         castplayer = self.castplayers[player_id]
-        use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
-        url = self.mass.streams.resolve_stream_url(
-            player_id,
-            queue_item=queue_item,
-            output_codec=ContentType.FLAC,
-            flow_mode=use_flow_mode,
-        )
+        is_flow_mode = "/flow/" in media.uri
         queuedata = {
             "type": "LOAD",
-            "media": self._create_cc_media_item(queue_item, url),
+            "media": self._create_cc_media_item(media),
         }
-
         # make sure that the media controller app is launched
-        app_id = ALT_APP_ID if use_flow_mode else DEFAULT_APP_ID
+        app_id = ALT_APP_ID if is_flow_mode else DEFAULT_APP_ID
         await self._launch_app(castplayer, app_id)
         # send queue info to the CC
         media_controller = castplayer.cc.media_controller
         await asyncio.to_thread(media_controller.send_message, data=queuedata, inc_session_id=True)
 
-    async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
-        """Handle enqueuing of the next queue item on the player."""
+    async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
+        """Handle enqueuing of the next item on the player."""
         castplayer = self.castplayers[player_id]
-        if isinstance(queue_item, str):
-            url = self.mass.streams.get_command_url(queue_item, "next")
-            queue_item = None
-        else:
-            url = self.mass.streams.resolve_stream_url(
-                player_id,
-                queue_item=queue_item,
-                output_codec=ContentType.FLAC,
-            )
         next_item_id = None
         status = castplayer.cc.media_controller.status
         # lookup position of current track in cast queue
@@ -286,7 +266,7 @@ class ChromecastProvider(PlayerProvider):
                 continue
             next_item_id = item["itemId"]
             # check if the next queue item isn't already queued
-            if item.get("media", {}).get("customData", {}).get("uri") == url:
+            if item.get("media", {}).get("customData", {}).get("uri") == media.uri:
                 return
         queuedata = {
             "type": "QUEUE_INSERT",
@@ -296,7 +276,7 @@ class ChromecastProvider(PlayerProvider):
                     "autoplay": True,
                     "startTime": 0,
                     "preloadTime": 0,
-                    "media": self._create_cc_media_item(queue_item, url),
+                    "media": self._create_cc_media_item(media),
                 }
             ],
         }
@@ -305,7 +285,7 @@ class ChromecastProvider(PlayerProvider):
         self.mass.create_task(media_controller.send_message, data=queuedata, inc_session_id=True)
         self.logger.debug(
             "Enqued next track (%s) to player %s",
-            queue_item.name if queue_item else url,
+            media.title or media.uri,
             castplayer.player.display_name,
         )
 
@@ -424,7 +404,6 @@ class ChromecastProvider(PlayerProvider):
                     supports_24bit=not cast_info.is_audio_group,
                     enabled_by_default=enabled_by_default,
                 ),
-                logger=self.logger.getChild(cast_info.friendly_name),
             )
             self.castplayers[player_id] = castplayer
 
@@ -452,8 +431,10 @@ class ChromecastProvider(PlayerProvider):
         """Handle updated CastStatus."""
         if status is None:
             return  # guard
-        castplayer.logger.debug(
-            "Received cast status - app_id: %s - volume: %s",
+        self.logger.log(
+            VERBOSE_LOG_LEVEL,
+            "Received cast status for %s - app_id: %s - volume: %s",
+            castplayer.player.display_name,
             status.app_id,
             status.volume_level,
         )
@@ -485,7 +466,12 @@ class ChromecastProvider(PlayerProvider):
 
     def on_new_media_status(self, castplayer: CastPlayer, status: MediaStatus) -> None:
         """Handle updated MediaStatus."""
-        castplayer.logger.debug("Received media status update: %s", status.player_state)
+        self.logger.log(
+            VERBOSE_LOG_LEVEL,
+            "Received media status for %s update: %s",
+            castplayer.player.display_name,
+            status.player_state,
+        )
         # player state
         castplayer.player.elapsed_time_last_updated = time.time()
         if status.player_is_playing:
@@ -521,7 +507,12 @@ class ChromecastProvider(PlayerProvider):
 
     def on_new_connection_status(self, castplayer: CastPlayer, status: ConnectionStatus) -> None:
         """Handle updated ConnectionStatus."""
-        castplayer.logger.debug("Received connection status update - status: %s", status.status)
+        self.logger.log(
+            VERBOSE_LOG_LEVEL,
+            "Received connection status update for %s - status: %s",
+            castplayer.player.display_name,
+            status.status,
+        )
 
         if status.status == CONNECTION_STATUS_DISCONNECTED:
             castplayer.player.available = False
@@ -565,7 +556,7 @@ class ChromecastProvider(PlayerProvider):
             # Quit the previous app before starting splash screen or media player
             if castplayer.cc.app_id is not None:
                 castplayer.cc.quit_app()
-            castplayer.logger.debug("Launching App %s.", app_id)
+            self.logger.debug("Launching App %s.", app_id)
             castplayer.cc.socket_client.receiver_controller.launch_app(
                 app_id,
                 force_launch=True,
@@ -577,59 +568,38 @@ class ChromecastProvider(PlayerProvider):
 
     async def _disconnect_chromecast(self, castplayer: CastPlayer) -> None:
         """Disconnect Chromecast object if it is set."""
-        castplayer.logger.debug("Disconnecting from chromecast socket")
+        self.logger.debug("Disconnecting from chromecast socket %s", castplayer.player.display_name)
         await self.mass.loop.run_in_executor(None, castplayer.cc.disconnect, 10)
         castplayer.mz_controller = None
         castplayer.status_listener.invalidate()
         castplayer.status_listener = None
         self.castplayers.pop(castplayer.player_id, None)
 
-    def _create_cc_media_item(self, queue_item: QueueItem, stream_url: str) -> dict[str, Any]:
-        """Create CC media item from MA QueueItem."""
-        duration = int(queue_item.duration) if queue_item.duration else None
-        image_url = self.mass.metadata.get_image_url(queue_item.image) if queue_item.image else ""
-        if queue_item.media_type == MediaType.TRACK and queue_item.media_item:
+    def _create_cc_media_item(self, media: PlayerMedia) -> dict[str, Any]:
+        """Create CC media item from MA PlayerMedia."""
+        if media.media_type == MediaType.TRACK:
             stream_type = STREAM_TYPE_BUFFERED
-            metadata = {
-                "metadataType": 3,
-                "albumName": (
-                    queue_item.media_item.album.name if queue_item.media_item.album else ""
-                ),
-                "songName": queue_item.media_item.name,
-                "artist": (
-                    queue_item.media_item.artists[0].name if queue_item.media_item.artists else ""
-                ),
-                "title": queue_item.media_item.name,
-                "images": [{"url": image_url}] if image_url else None,
-            }
-        elif queue_item.streamdetails and queue_item.streamdetails.stream_title:
-            stream_type = STREAM_TYPE_LIVE
-            metadata = {
-                "metadataType": 3,
-                "songName": queue_item.streamdetails.stream_title.split(" - ")[-1],
-                "artist": queue_item.streamdetails.stream_title.split(" - ")[0],
-                "albumName": queue_item.name,
-                "images": [{"url": image_url}] if image_url else None,
-                "title": queue_item.streamdetails.stream_title.split(" - ")[-1],
-            }
         else:
             stream_type = STREAM_TYPE_LIVE
-            metadata = {
-                "metadataType": 0,
-                "title": queue_item.name,
-                "images": [{"url": image_url}] if image_url else None,
-            }
+        metadata = {
+            "metadataType": 3,
+            "albumName": media.album or "",
+            "songName": media.title or "",
+            "artist": media.title or "",
+            "title": media.title or "",
+            "images": [{"url": media.image_url}] if media.image_url else None,
+        }
         return {
-            "contentId": stream_url,
+            "contentId": media.uri,
             "customData": {
-                "uri": queue_item.uri,
-                "queue_item_id": queue_item.queue_item_id,
+                "uri": media.uri,
+                "queue_item_id": media.uri,
                 "deviceName": "Music Assistant",
             },
             "contentType": "audio/flac",
             "streamType": stream_type,
             "metadata": metadata,
-            "duration": duration,
+            "duration": media.duration,
         }
 
     async def update_flow_metadata(self, castplayer: CastPlayer) -> None:
@@ -648,12 +618,21 @@ class ChromecastProvider(PlayerProvider):
         media_controller = castplayer.cc.media_controller
         # update metadata of current item chromecast
         if media_controller.status.media_custom_data["queue_item_id"] != current_item.queue_item_id:
-            cc_item = self._create_cc_media_item(current_item, "")
+            image_url = self.mass.metadata.get_image_url(current_item.image)
             queuedata = {
                 "type": "PLAY",
                 "mediaSessionId": media_controller.status.media_session_id,
                 "customData": {
-                    "metadata": cc_item["metadata"],
+                    "metadata": {
+                        "metadataType": 3,
+                        "albumName": album.name
+                        if (album := getattr(current_item.media_item, "album", None))
+                        else "",
+                        "songName": current_item.media_item.name,
+                        "artist": getattr(current_item.media_item, "artist_str", ""),
+                        "title": current_item.media_item.name,
+                        "images": [{"url": image_url}] if image_url else None,
+                    }
                 },
             }
             self.mass.create_task(
index 96bdb8d928ef849f3cf73f1a7d614fcef4cd679e..592d42e23b41df7f3464a2eb13ce566998b8ca07 100644 (file)
@@ -32,13 +32,12 @@ from music_assistant.common.models.config_entries import (
 )
 from music_assistant.common.models.enums import (
     ConfigEntryType,
-    ContentType,
     PlayerFeature,
     PlayerState,
     PlayerType,
 )
 from music_assistant.common.models.errors import PlayerUnavailableError
-from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
 from music_assistant.constants import (
     CONF_CROSSFADE,
     CONF_ENFORCE_MP3,
@@ -59,7 +58,6 @@ if TYPE_CHECKING:
 
     from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
     from music_assistant.common.models.provider import ProviderManifest
-    from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
     from music_assistant.server.models import ProviderInstanceType
 
@@ -342,27 +340,17 @@ class DLNAPlayerProvider(PlayerProvider):
         await dlna_player.device.async_play()
 
     @catch_request_errors
-    async def play_media(
-        self,
-        player_id: str,
-        queue_item: QueueItem,
-    ) -> None:
+    async def play_media(self, player_id: str, media: PlayerMedia) -> None:
         """Handle PLAY MEDIA on given player."""
-        use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
-        enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
-        url = self.mass.streams.resolve_stream_url(
-            player_id,
-            queue_item=queue_item,
-            output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
-            flow_mode=use_flow_mode,
-        )
+        if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, False):
+            media.uri = media.uri.replace(".flac", ".mp3")
         dlna_player = self.dlnaplayers[player_id]
         # always clear queue (by sending stop) first
         if dlna_player.device.can_stop:
             await self.cmd_stop(player_id)
-        didl_metadata = create_didl_metadata(self.mass, url, queue_item)
-        title = queue_item.name if queue_item else "Music Assistant"
-        await dlna_player.device.async_set_transport_uri(url, title, didl_metadata)
+        didl_metadata = create_didl_metadata(media)
+        title = media.title or media.uri
+        await dlna_player.device.async_set_transport_uri(media.uri, title, didl_metadata)
         # Play it
         await dlna_player.device.async_wait_for_can_play(10)
         # optimistically set this timestamp to help in case of a player
@@ -378,17 +366,14 @@ class DLNAPlayerProvider(PlayerProvider):
             await self.poll_player(dlna_player.udn)
 
     @catch_request_errors
-    async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
+    async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
         """Handle enqueuing of the next queue item on the player."""
         dlna_player = self.dlnaplayers[player_id]
-        url = self.mass.streams.resolve_stream_url(
-            player_id,
-            queue_item=queue_item,
-            output_codec=ContentType.FLAC,
-        )
-        didl_metadata = create_didl_metadata(self.mass, url, queue_item)
-        title = queue_item.name
-        await dlna_player.device.async_set_next_transport_uri(url, title, didl_metadata)
+        if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, False):
+            media.uri = media.uri.replace(".flac", ".mp3")
+        didl_metadata = create_didl_metadata(media)
+        title = media.title or media.uri
+        await dlna_player.device.async_set_next_transport_uri(media.uri, title, didl_metadata)
         self.logger.debug(
             "Enqued next track (%s) to player %s",
             title,
index 54bffd9171caeba9e4c6e21be010e6a0e7aa781a..bb5056411358304e897f420f6d10dac3fdcc7c16 100644 (file)
@@ -16,20 +16,18 @@ from music_assistant.common.models.config_entries import (
 )
 from music_assistant.common.models.enums import (
     ConfigEntryType,
-    ContentType,
     PlayerFeature,
     PlayerState,
     PlayerType,
 )
 from music_assistant.common.models.errors import PlayerUnavailableError, SetupFailedError
-from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
 from music_assistant.constants import CONF_IP_ADDRESS, CONF_PASSWORD, CONF_PORT
 from music_assistant.server.models.player_provider import PlayerProvider
 
 if TYPE_CHECKING:
     from music_assistant.common.models.config_entries import ProviderConfig
     from music_assistant.common.models.provider import ProviderManifest
-    from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
     from music_assistant.server.models import ProviderInstanceType
 
@@ -181,19 +179,14 @@ class FullyKioskProvider(PlayerProvider):
     async def play_media(
         self,
         player_id: str,
-        queue_item: QueueItem,
+        media: PlayerMedia,
     ) -> None:
         """Handle PLAY MEDIA on given player."""
         player = self.mass.players.get(player_id)
-        enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
-        url = self.mass.streams.resolve_stream_url(
-            player_id,
-            queue_item=queue_item,
-            output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
-            flow_mode=True,
-        )
-        await self._fully.playSound(url, AUDIOMANAGER_STREAM_MUSIC)
-        player.current_item_id = queue_item.queue_id
+        if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, False):
+            media.uri = media.uri.replace(".flac", ".mp3")
+        await self._fully.playSound(media.uri, AUDIOMANAGER_STREAM_MUSIC)
+        player.current_media = media
         player.elapsed_time = 0
         player.elapsed_time_last_updated = time.time()
         player.state = PlayerState.PLAYING
index f8ef8e21b3daa78b3eebca93fc92c542e9cd1805..89ad8488f325211ec62f90e1b4b89b758260d0a6 100644 (file)
@@ -21,13 +21,12 @@ from music_assistant.common.models.config_entries import (
 )
 from music_assistant.common.models.enums import (
     ConfigEntryType,
-    ContentType,
     PlayerFeature,
     PlayerState,
     PlayerType,
 )
 from music_assistant.common.models.errors import SetupFailedError
-from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
 from music_assistant.constants import CONF_CROSSFADE, CONF_FLOW_MODE
 from music_assistant.server.models.player_provider import PlayerProvider
 from music_assistant.server.providers.hass import DOMAIN as HASS_DOMAIN
@@ -42,7 +41,6 @@ if TYPE_CHECKING:
 
     from music_assistant.common.models.config_entries import ProviderConfig
     from music_assistant.common.models.provider import ProviderManifest
-    from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
     from music_assistant.server.models import ProviderInstanceType
     from music_assistant.server.providers.hass import HomeAssistant as HomeAssistantProvider
@@ -245,25 +243,15 @@ class HomeAssistantPlayers(PlayerProvider):
             target={"entity_id": player_id},
         )
 
-    async def play_media(
-        self,
-        player_id: str,
-        queue_item: QueueItem,
-    ) -> None:
+    async def play_media(self, player_id: str, media: PlayerMedia) -> None:
         """Handle PLAY MEDIA on given player."""
-        use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
-        enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
-        url = self.mass.streams.resolve_stream_url(
-            player_id,
-            queue_item=queue_item,
-            output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
-            flow_mode=use_flow_mode,
-        )
+        if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, False):
+            media.uri = media.uri.replace(".flac", ".mp3")
         await self.hass_prov.hass.call_service(
             domain="media_player",
             service="play_media",
             service_data={
-                "media_content_id": url,
+                "media_content_id": media.uri,
                 "media_content_type": "music",
                 "enqueue": "replace",
             },
@@ -274,30 +262,15 @@ class HomeAssistantPlayers(PlayerProvider):
             player.elapsed_time = 0
             player.elapsed_time_last_updated = time.time()
 
-    async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
-        """
-        Handle enqueuing of the next queue item on the player.
-
-        Only called if the player supports PlayerFeature.ENQUE_NEXT.
-        Called about 1 second after a new track started playing.
-        Called about 15 seconds before the end of the current track.
-
-        A PlayerProvider implementation is in itself responsible for handling this
-        so that the queue items keep playing until its empty or the player stopped.
-
-        This will NOT be called if the end of the queue is reached (and repeat disabled).
-        This will NOT be called if the player is using flow mode to playback the queue.
-        """
-        url = self.mass.streams.resolve_stream_url(
-            player_id,
-            queue_item=queue_item,
-            output_codec=ContentType.FLAC,
-        )
+    async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
+        """Handle enqueuing of the next queue item on the player."""
+        if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, False):
+            media.uri = media.uri.replace(".flac", ".mp3")
         await self.hass_prov.hass.call_service(
             domain="media_player",
             service="play_media",
             service_data={
-                "media_content_id": url,
+                "media_content_id": media.uri,
                 "media_content_type": "music",
                 "enqueue": "next",
             },
index 36dcd28cc686219a7b3b90f341acd251f2c25e65..c4d3dacfc9d593f65d220750e7732196cb32cdf6 100644 (file)
@@ -12,6 +12,7 @@ from contextlib import suppress
 from dataclasses import dataclass
 from typing import TYPE_CHECKING
 
+from aiohttp import web
 from aioslimproto.client import PlayerState as SlimPlayerState
 from aioslimproto.client import SlimClient
 from aioslimproto.client import TransitionType as SlimTransition
@@ -45,24 +46,26 @@ from music_assistant.common.models.enums import (
     RepeatMode,
 )
 from music_assistant.common.models.errors import MusicAssistantError, SetupFailedError
-from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.media_items import AudioFormat
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
 from music_assistant.constants import (
     CONF_CROSSFADE,
     CONF_CROSSFADE_DURATION,
     CONF_ENFORCE_MP3,
     CONF_PORT,
     CONF_SYNC_ADJUST,
-    MASS_LOGO_ONLINE,
     VERBOSE_LOG_LEVEL,
 )
+from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params
+from music_assistant.server.helpers.multi_client_stream import MultiClientStream
 from music_assistant.server.models.player_provider import PlayerProvider
+from music_assistant.server.providers.ugp import UniversalGroupProvider
 
 if TYPE_CHECKING:
     from aioslimproto.models import SlimEvent
 
     from music_assistant.common.models.config_entries import ProviderConfig
     from music_assistant.common.models.provider import ProviderManifest
-    from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
     from music_assistant.server.models import ProviderInstanceType
 
@@ -82,7 +85,7 @@ REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
 # sync constants
 MIN_DEVIATION_ADJUST = 8  # 5 milliseconds
 MIN_REQ_PLAYPOINTS = 8  # we need at least 8 measurements
-DEVIATION_JUMP_IGNORE = 5000  # ignore a sudden unrealistic jump
+DEVIATION_JUMP_IGNORE = 500  # ignore a sudden unrealistic jump
 MAX_SKIP_AHEAD_MS = 800  # 0.8 seconds
 
 
@@ -91,7 +94,7 @@ class SyncPlayPoint:
     """Simple structure to describe a Sync Playpoint."""
 
     timestamp: float
-    sync_job_id: str
+    sync_master: str
     diff: int
 
 
@@ -218,17 +221,18 @@ class SlimprotoProvider(PlayerProvider):
     slimproto: SlimServer
     _sync_playpoints: dict[str, deque[SyncPlayPoint]]
     _do_not_resync_before: dict[str, float]
+    _multi_streams: dict[str, MultiClientStream]
 
     @property
     def supported_features(self) -> tuple[ProviderFeature, ...]:
         """Return the features supported by this Provider."""
-        return (ProviderFeature.SYNC_PLAYERS,)
+        return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE)
 
     async def handle_async_init(self) -> None:
         """Handle async initialization of the provider."""
         self._sync_playpoints = {}
         self._do_not_resync_before = {}
-        self._resync_handle: asyncio.TimerHandle | None = None
+        self._multi_streams = {}
         control_port = self.config.get_value(CONF_PORT)
         telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT)
         json_port = self.config.get_value(CONF_CLI_JSON_PORT)
@@ -248,15 +252,21 @@ class SlimprotoProvider(PlayerProvider):
         try:
             await self.slimproto.start()
         except OSError as err:
-            msg = f"Unable to start the Slimproto server - is port {control_port} already taken ?"
-            raise SetupFailedError(msg) from err
+            raise SetupFailedError(
+                "Unable to start the Slimproto server - "
+                "is one of the required TCP ports already taken ?"
+            ) from err
 
     async def loaded_in_mass(self) -> None:
         """Call after the provider has been loaded."""
         self.slimproto.subscribe(self._client_callback)
+        self.mass.streams.register_dynamic_route(
+            "/slimproto/multi", self._serve_multi_client_stream
+        )
 
     async def unload(self) -> None:
         """Handle close/cleanup of the provider."""
+        self.mass.streams.unregister_dynamic_route("/slimproto/multi")
         await self.slimproto.stop()
 
     async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
@@ -330,67 +340,98 @@ class SlimprotoProvider(PlayerProvider):
     async def play_media(
         self,
         player_id: str,
-        queue_item: QueueItem,
+        media: PlayerMedia,
     ) -> None:
         """Handle PLAY MEDIA on given player."""
-        # fix race condition where resync and play media are called at more or less the same time
-        if self._resync_handle:
-            self._resync_handle.cancel()
-            self._resync_handle = None
         player = self.mass.players.get(player_id)
         if player.synced_to:
             msg = "A synced player cannot receive play commands directly"
             raise RuntimeError(msg)
 
-        if player.group_childs and queue_item.media_type != MediaType.ANNOUNCEMENT:
-            # player has sync members, we need to start a (multi-player) stream job
-            # to make sure that all clients receive the exact same audio
-            stream_job = self.mass.streams.create_multi_client_stream_job(
-                queue_id=queue_item.queue_id,
-                start_queue_item=queue_item,
+        if not player.group_childs:
+            slimplayer = self.slimproto.get_player(player_id)
+            # simple, single-player playback
+            await self._handle_play_url(
+                slimplayer,
+                url=media.uri,
+                media=media,
+                send_flush=True,
+                auto_play=False,
+            )
+            return
+
+        # this is a syncgroup, we need to handle this with a multi client stream
+        master_audio_format = AudioFormat(
+            content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24
+        )
+        if media.media_type == MediaType.ANNOUNCEMENT:
+            # special case: stream announcement
+            audio_source = self.mass.streams.get_announcement_stream(
+                media.custom_data["url"],
+                output_format=master_audio_format,
+                use_pre_announce=media.custom_data["use_pre_announce"],
+            )
+        elif media.queue_id.startswith("ugp_"):
+            # special case: UGP stream
+            ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
+            ugp_stream = ugp_provider.streams[media.queue_id]
+            audio_source = ugp_stream.subscribe_raw()
+        elif media.queue_id and media.queue_item_id:
+            # regular queue stream request
+            audio_source = self.mass.streams.get_flow_stream(
+                queue=self.mass.player_queues.get(media.queue_id),
+                start_queue_item=self.mass.player_queues.get_item(
+                    media.queue_id, media.queue_item_id
+                ),
+                pcm_format=master_audio_format,
             )
         else:
-            stream_job = None
-        # forward command to player and any connected sync members
+            # assume url or some other direct path
+            # NOTE: this will fail if its an uri not playable by ffmpeg
+            audio_source = get_ffmpeg_stream(
+                audio_input=media.uri,
+                input_format=AudioFormat(ContentType.try_parse(media.uri)),
+                output_format=master_audio_format,
+            )
+        # start the stream task
+        self._multi_streams[player_id] = stream = MultiClientStream(
+            audio_source=audio_source, audio_format=master_audio_format
+        )
+        base_url = f"{self.mass.streams.base_url}/slimproto/multi?player_id={player_id}&fmt=flac"
+
+        # forward to downstream play_media commands
         async with asyncio.TaskGroup() as tg:
             for slimplayer in self._get_sync_clients(player_id):
-                enforce_mp3 = await self.mass.config.get_player_config_value(
-                    slimplayer.player_id, CONF_ENFORCE_MP3
-                )
+                url = f"{base_url}&child_player_id={slimplayer.player_id}"
+                if self.mass.config.get_raw_player_config_value(
+                    slimplayer.player_id, CONF_ENFORCE_MP3, False
+                ):
+                    url = url.replace("flac", "mp3")
+                stream.expected_clients += 1
                 tg.create_task(
                     self._handle_play_url(
                         slimplayer,
-                        url=stream_job.resolve_stream_url(
-                            slimplayer.player_id,
-                            output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
-                        )
-                        if stream_job
-                        else self.mass.streams.resolve_stream_url(
-                            slimplayer.player_id,
-                            queue_item=queue_item,
-                            output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
-                        ),
-                        queue_item=None,
+                        url=url,
+                        media=media,
                         send_flush=True,
-                        auto_play=stream_job is None,
+                        auto_play=False,
                     )
                 )
 
-    async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
+    async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
         """Handle enqueuing of the next queue item on the player."""
         if not (slimplayer := self.slimproto.get_player(player_id)):
             return
-        enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
-        url = self.mass.streams.resolve_stream_url(
-            player_id,
-            queue_item=queue_item,
-            output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
-            flow_mode=False,
-        )
+        url = media.uri
+        if self.mass.config.get_raw_player_config_value(
+            slimplayer.player_id, CONF_ENFORCE_MP3, False
+        ):
+            url = url.replace("flac", "mp3")
+
         await self._handle_play_url(
             slimplayer,
             url=url,
-            queue_item=queue_item,
+            media=media,
             enqueue=True,
             send_flush=False,
             auto_play=True,
@@ -400,7 +441,7 @@ class SlimprotoProvider(PlayerProvider):
         self,
         slimplayer: SlimClient,
         url: str,
-        queue_item: QueueItem | None,
+        media: PlayerMedia,
         enqueue: bool = False,
         send_flush: bool = True,
         auto_play: bool = False,
@@ -414,43 +455,15 @@ class SlimprotoProvider(PlayerProvider):
         else:
             transition_duration = 0
 
-        if queue_item and queue_item.media_item:
-            album = getattr(queue_item.media_item, "album", None)
-            metadata = {
-                "item_id": queue_item.queue_item_id,
-                "title": queue_item.media_item.name,
-                "album": album.name if album else "",
-                "artist": getattr(queue_item.media_item, "artist_str", "Music Assistant"),
-                "image_url": self.mass.metadata.get_image_url(
-                    queue_item.image,
-                    size=512,
-                    prefer_proxy=True,
-                )
-                if queue_item.image
-                else MASS_LOGO_ONLINE,
-                "duration": queue_item.duration,
-            }
-        elif queue_item:
-            metadata = {
-                "item_id": queue_item.queue_item_id,
-                "title": queue_item.name,
-                "artist": "Music Assistant",
-                "image_url": self.mass.metadata.get_image_url(
-                    queue_item.image,
-                    size=512,
-                    prefer_proxy=True,
-                )
-                if queue_item.image
-                else MASS_LOGO_ONLINE,
-                "duration": queue_item.duration,
-            }
-        else:
-            metadata = {
-                "item_id": "flow",
-                "title": "Music Assistant",
-                "image_url": MASS_LOGO_ONLINE,
-            }
-        queue = self.mass.player_queues.get(queue_item.queue_id if queue_item else player_id)
+        metadata = {
+            "item_id": media.queue_item_id or media.uri,
+            "title": media.title,
+            "album": media.album,
+            "artist": media.artist,
+            "image_url": media.image_url,
+            "duration": media.duration,
+        }
+        queue = self.mass.player_queues.get(media.queue_id or player_id)
         slimplayer.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode]
         slimplayer.extra_data["playlist shuffle"] = int(queue.shuffle_enabled)
         # slimplayer.extra_data["can_seek"] = 1 if queue_item else 0
@@ -505,7 +518,7 @@ class SlimprotoProvider(PlayerProvider):
         parent_player = self.mass.players.get(target_player)
         assert parent_player  # guard
         if parent_player.synced_to:
-            raise RuntimeError("Player is already synced")
+            raise RuntimeError("Parent player is already synced!")
         if child_player.synced_to and child_player.synced_to != target_player:
             raise RuntimeError("Player is already synced to another player")
         # always make sure that the parent player is part of the sync group
@@ -515,18 +528,14 @@ class SlimprotoProvider(PlayerProvider):
         # check if we should (re)start or join a stream session
         active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id)
         if active_queue.state == PlayerState.PLAYING:
-            # playback needs to be restarted to form a new multi slimplayer stream session
-            def resync() -> None:
-                self._resync_handle = None
-                self.mass.create_task(
-                    self.mass.player_queues.resume(active_queue.queue_id, fade_in=False)
-                )
-
+            # playback needs to be restarted to form a new multi client stream session
             # this could potentially be called by multiple players at the exact same time
             # so we debounce the resync a bit here with a timer
-            if self._resync_handle:
-                self._resync_handle.cancel()
-            self._resync_handle = self.mass.loop.call_later(0.5, resync)
+            self.mass.call_later(
+                1,
+                self.mass.player_queues.resume(active_queue.queue_id, fade_in=False),
+                task_id=f"resume_{active_queue.queue_id}",
+            )
         else:
             # make sure that the player manager gets an update
             self.mass.players.update(child_player.player_id, skip_forward=True)
@@ -709,12 +718,6 @@ class SlimprotoProvider(PlayerProvider):
         # average lag/drift so we can adjust accordingly
         sync_playpoints = self._sync_playpoints[slimplayer.player_id]
 
-        active_queue = self.mass.player_queues.get_active_queue(slimplayer.player_id)
-        stream_job = self.mass.streams.multi_client_jobs.get(active_queue.queue_id)
-        if not stream_job:
-            # should not happen, but just in case
-            return
-
         now = time.time()
         if now < self._do_not_resync_before[slimplayer.player_id]:
             return
@@ -723,8 +726,8 @@ class SlimprotoProvider(PlayerProvider):
         if last_playpoint and (now - last_playpoint.timestamp) > 10:
             # last playpoint is too old, invalidate
             sync_playpoints.clear()
-        if last_playpoint and last_playpoint.sync_job_id != stream_job.job_id:
-            # streamjob has changed, invalidate
+        if last_playpoint and last_playpoint.sync_master != sync_master.player_id:
+            # this should not happen, but just in case
             sync_playpoints.clear()
 
         diff = int(
@@ -732,12 +735,15 @@ class SlimprotoProvider(PlayerProvider):
             - self._get_corrected_elapsed_milliseconds(slimplayer)
         )
 
-        if last_playpoint and abs(last_playpoint.diff - diff) > DEVIATION_JUMP_IGNORE:
-            # ignore unexpected spikes
+        # ignore unexpected spikes
+        if (
+            sync_playpoints
+            and abs(statistics.fmean(x.diff for x in sync_playpoints)) > DEVIATION_JUMP_IGNORE
+        ):
             return
 
         # we can now append the current playpoint to our list
-        sync_playpoints.append(SyncPlayPoint(now, stream_job.job_id, diff))
+        sync_playpoints.append(SyncPlayPoint(now, sync_master.player_id, diff))
 
         min_req_playpoints = 2 if sync_master.elapsed_seconds < 2 else MIN_REQ_PLAYPOINTS
         if len(sync_playpoints) < min_req_playpoints:
@@ -803,7 +809,7 @@ class SlimprotoProvider(PlayerProvider):
                 # Instead just start playback on all players and let the sync logic work out
                 # the delays etc.
                 self._do_not_resync_before[_client.player_id] = time.time() + 1
-                tg.create_task(_client.unpause_at(0))
+                tg.create_task(_client.pause_for(200))
 
     async def _handle_connected(self, slimplayer: SlimClient) -> None:
         """Handle a slimplayer connected event."""
@@ -888,3 +894,52 @@ class SlimprotoProvider(PlayerProvider):
         await slimplayer.configure_display(
             visualisation=SlimVisualisationType(visualization), disabled=not display_enabled
         )
+
+    async def _serve_multi_client_stream(self, request: web.Request) -> web.Response:
+        """Serve the multi-client flow stream audio to a player."""
+        player_id = request.query.get("player_id")
+        fmt = request.query.get("fmt")
+        child_player_id = request.query.get("child_player_id")
+
+        if not (player := self.mass.players.get(player_id)):
+            raise web.HTTPNotFound(reason=f"Unknown player: {player_id}")
+
+        if not (child_player := self.mass.players.get(child_player_id)):
+            raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")
+
+        if not (stream := self._multi_streams.get(player_id, None)) or stream.done:
+            raise web.HTTPNotFound(f"There is no active stream for {player_id}!")
+
+        resp = web.StreamResponse(
+            status=200,
+            reason="OK",
+            headers={
+                "Content-Type": f"audio/{fmt}",
+            },
+        )
+        await resp.prepare(request)
+
+        # return early if this is not a GET request
+        if request.method != "GET":
+            return resp
+
+        # all checks passed, start streaming!
+        self.logger.debug(
+            "Start serving multi-client flow audio stream for player %s to %s",
+            player.display_name,
+            child_player.display_name,
+        )
+
+        async for chunk in stream.get_stream(
+            output_format=AudioFormat(content_type=ContentType.try_parse(fmt)),
+            filter_params=get_player_filter_params(self.mass, child_player_id)
+            if child_player_id
+            else None,
+        ):
+            try:
+                await resp.write(chunk)
+            except (BrokenPipeError, ConnectionResetError):
+                # race condition
+                break
+
+        return resp
index 88106e25df7d1187a2f3947aec08d001de986ee6..a180dc9696e1b55d4ad57046acde9921cd331f05 100644 (file)
@@ -34,9 +34,8 @@ from music_assistant.common.models.enums import (
 )
 from music_assistant.common.models.errors import SetupFailedError
 from music_assistant.common.models.media_items import AudioFormat
-from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.constants import UGP_PREFIX
-from music_assistant.server.helpers.audio import FFMpeg, get_player_filter_params
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
+from music_assistant.server.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params
 from music_assistant.server.helpers.process import AsyncProcess, check_output
 from music_assistant.server.models.player_provider import PlayerProvider
 
@@ -47,9 +46,9 @@ if TYPE_CHECKING:
 
     from music_assistant.common.models.config_entries import ProviderConfig
     from music_assistant.common.models.provider import ProviderManifest
-    from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
     from music_assistant.server.models import ProviderInstanceType
+    from music_assistant.server.providers.ugp import UniversalGroupProvider
 
 CONF_SERVER_HOST = "snapcast_server_host"
 CONF_SERVER_CONTROL_PORT = "snapcast_server_control_port"
@@ -145,7 +144,7 @@ class SnapCastProvider(PlayerProvider):
     @property
     def supported_features(self) -> tuple[ProviderFeature, ...]:
         """Return the features supported by this Provider."""
-        return (ProviderFeature.SYNC_PLAYERS,)
+        return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE)
 
     async def handle_async_init(self) -> None:
         """Handle async initialization of the provider."""
@@ -262,10 +261,14 @@ class SnapCastProvider(PlayerProvider):
         )
         player.synced_to = self._synced_to(player_id)
         player.group_childs = self._group_childs(player_id)
-        if player.current_item_id and player_id in player.current_item_id:
-            player.active_source = player_id
-        elif stream := self._get_snapstream(player_id):
-            player.active_source = stream.name
+        if player.active_group is None:
+            if stream := self._get_snapstream(player_id):
+                if stream.name.startswith(("MusicAssistant", "default")):
+                    player.active_source = player_id
+                else:
+                    player.active_source = stream.name
+            else:
+                player.active_source = player_id
         self.mass.players.register_or_update(player)
 
     async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
@@ -308,7 +311,7 @@ class SnapCastProvider(PlayerProvider):
         await self._get_snapgroup(player_id).set_stream("default")
         self._handle_update()
 
-    async def play_media(self, player_id: str, queue_item: QueueItem) -> None:
+    async def play_media(self, player_id: str, media: PlayerMedia) -> None:
         """Handle PLAY MEDIA on given player."""
         player = self.mass.players.get(player_id)
         if player.synced_to:
@@ -316,31 +319,43 @@ class SnapCastProvider(PlayerProvider):
             raise RuntimeError(msg)
         # stop any existing streams first
         await self.cmd_stop(player_id)
-        queue = self.mass.player_queues.get(queue_item.queue_id)
         stream, port = await self._create_stream()
         snap_group = self._get_snapgroup(player_id)
         await snap_group.set_stream(stream.identifier)
 
-        if queue_item.media_type == MediaType.ANNOUNCEMENT:
+        # select audio source
+        if media.media_type == MediaType.ANNOUNCEMENT:
             # special case: stream announcement
             input_format = DEFAULT_SNAPCAST_FORMAT
             audio_source = self.mass.streams.get_announcement_stream(
-                queue_item.streamdetails.data["url"],
+                media.custom_data["url"],
                 output_format=DEFAULT_SNAPCAST_FORMAT,
-                use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
+                use_pre_announce=media.custom_data["use_pre_announce"],
             )
-        elif queue_item.queue_id.startswith(UGP_PREFIX):
-            # special case: we got forwarded a request from the UGP
-            # use the existing stream job that was already created by UGP
-            stream_job = self.mass.streams.multi_client_jobs[queue_item.queue_id]
-            stream_job.expected_players.add(player_id)
-            input_format = stream_job.pcm_format
-            audio_source = stream_job.subscribe(player_id)
-        else:
-            queue = self.mass.player_queues.get(queue_item.queue_id)
+        elif media.queue_id.startswith("ugp_"):
+            # special case: UGP stream
+            ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
+            ugp_stream = ugp_provider.streams[media.queue_id]
+            input_format = ugp_stream.audio_format
+            audio_source = ugp_stream.subscribe_raw()
+        elif media.queue_id and media.queue_item_id:
+            # regular queue stream request
             input_format = DEFAULT_SNAPCAST_FORMAT
             audio_source = self.mass.streams.get_flow_stream(
-                queue, start_queue_item=queue_item, pcm_format=DEFAULT_SNAPCAST_FORMAT
+                queue=self.mass.player_queues.get(media.queue_id),
+                start_queue_item=self.mass.player_queues.get_item(
+                    media.queue_id, media.queue_item_id
+                ),
+                pcm_format=DEFAULT_SNAPCAST_FORMAT,
+            )
+        else:
+            # assume url or some other direct path
+            # NOTE: this will fail if its an uri not playable by ffmpeg
+            input_format = DEFAULT_SNAPCAST_FORMAT
+            audio_source = get_ffmpeg_stream(
+                audio_input=media.uri,
+                input_format=AudioFormat(ContentType.try_parse(media.uri)),
+                output_format=DEFAULT_SNAPCAST_FORMAT,
             )
 
         async def _streamer() -> None:
@@ -350,7 +365,7 @@ class SnapCastProvider(PlayerProvider):
             def stream_callback(_stream) -> None:
                 player.state = PlayerState(_stream.status)
                 if player.state == PlayerState.PLAYING:
-                    player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}"
+                    player.current_media = media
                     player.elapsed_time = 0
                     player.elapsed_time_last_updated = time.time()
                 self.mass.players.update(player_id)
index 5027dfff3f0ea171c4968da04f6f742effe70d5a..274e9d4487a75558f6ba5204697951b14d7450eb 100644 (file)
@@ -26,13 +26,12 @@ from music_assistant.common.models.config_entries import (
 )
 from music_assistant.common.models.enums import (
     ConfigEntryType,
-    ContentType,
     PlayerFeature,
     PlayerType,
     ProviderFeature,
 )
 from music_assistant.common.models.errors import PlayerCommandFailed, PlayerUnavailableError
-from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
 from music_assistant.constants import CONF_CROSSFADE, SYNCGROUP_PREFIX, VERBOSE_LOG_LEVEL
 from music_assistant.server.helpers.didl_lite import create_didl_metadata
 from music_assistant.server.models.player_provider import PlayerProvider
@@ -44,7 +43,6 @@ if TYPE_CHECKING:
 
     from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
     from music_assistant.common.models.provider import ProviderManifest
-    from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
     from music_assistant.server.models import ProviderInstanceType
 
@@ -144,7 +142,7 @@ class SonosPlayerProvider(PlayerProvider):
     @property
     def supported_features(self) -> tuple[ProviderFeature, ...]:
         """Return the features supported by this Provider."""
-        return (ProviderFeature.SYNC_PLAYERS,)
+        return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE)
 
     async def handle_async_init(self) -> None:
         """Handle async initialization of the provider."""
@@ -336,7 +334,7 @@ class SonosPlayerProvider(PlayerProvider):
     async def play_media(
         self,
         player_id: str,
-        queue_item: QueueItem,
+        media: PlayerMedia,
     ) -> None:
         """Handle PLAY MEDIA on given player."""
         sonos_player = self.sonosplayers[player_id]
@@ -349,36 +347,13 @@ class SonosPlayerProvider(PlayerProvider):
             )
             raise PlayerCommandFailed(msg)
 
-        url = self.mass.streams.resolve_stream_url(
-            player_id,
-            queue_item=queue_item,
-            output_codec=ContentType.FLAC,
-        )
-        self.mass.create_task(
-            sonos_player.soco.play_uri, url, meta=create_didl_metadata(self.mass, url, queue_item)
-        )
-
-    async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
-        """
-        Handle enqueuing of the next queue item on the player.
-
-        If the player supports PlayerFeature.ENQUE_NEXT:
-          This will be called about 10 seconds before the end of the track.
-        If the player does NOT report support for PlayerFeature.ENQUE_NEXT:
-          This will be called when the end of the track is reached.
+        didl_metadata = create_didl_metadata(media)
+        self.mass.create_task(sonos_player.soco.play_uri, media.uri, meta=didl_metadata)
 
-        A PlayerProvider implementation is in itself responsible for handling this
-        so that the queue items keep playing until its empty or the player stopped.
-
-        This will NOT be called if the end of the queue is reached (and repeat disabled).
-        This will NOT be called if flow mode is enabled on the queue.
-        """
+    async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
+        """Handle enqueuing of the next queue item on the player."""
         sonos_player = self.sonosplayers[player_id]
-        url = self.mass.streams.resolve_stream_url(
-            player_id,
-            queue_item=queue_item,
-            output_codec=ContentType.FLAC,
-        )
+        didl_metadata = create_didl_metadata(media)
         # set crossfade according to player setting
         crossfade = await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
         if sonos_player.crossfade != crossfade:
@@ -394,10 +369,25 @@ class SonosPlayerProvider(PlayerProvider):
 
             await asyncio.to_thread(set_crossfade)
 
-        await self._enqueue_item(sonos_player, url=url, queue_item=queue_item)
+        try:
+            await asyncio.to_thread(
+                sonos_player.soco.avTransport.SetNextAVTransportURI,
+                [("InstanceID", 0), ("NextURI", media.uri), ("NextURIMetaData", didl_metadata)],
+                timeout=60,
+            )
+        except Exception as err:
+            self.logger.warning(
+                "Unable to enqueue next track on player: %s: %s", sonos_player.zone_name, err
+            )
+        else:
+            self.logger.debug(
+                "Enqued next track (%s) to player %s",
+                media.title or media.uri,
+                sonos_player.soco.player_name,
+            )
 
     async def play_announcement(
-        self, player_id: str, announcement: QueueItem, volume_level: int | None = None
+        self, player_id: str, announcement: PlayerMedia, volume_level: int | None = None
     ) -> None:
         """Handle (provider native) playback of an announcement on given player."""
         if player_id.startswith(SYNCGROUP_PREFIX):
@@ -410,19 +400,16 @@ class SonosPlayerProvider(PlayerProvider):
                             self.play_announcement(child_player_id, announcement, volume_level)
                         )
             return
-        announcement_url = self.mass.streams.resolve_stream_url(
-            player_id, announcement, ContentType.MP3
-        )
         sonos_player = self.sonosplayers[player_id]
         self.logger.debug(
             "Playing announcement %s using websocket audioclip on %s",
-            announcement_url,
+            announcement.uri,
             sonos_player.zone_name,
         )
         volume_level = self.mass.players.get_announcement_volume(player_id, volume_level)
         try:
             response, _ = await sonos_player.websocket.play_clip(
-                announcement_url,
+                announcement.uri,
                 volume=volume_level,
             )
         except SonosWebsocketError as exc:
@@ -544,28 +531,3 @@ class SonosPlayerProvider(PlayerProvider):
         self.mass.loop.call_soon_threadsafe(
             self.mass.players.register_or_update, sonos_player.mass_player
         )
-
-    async def _enqueue_item(
-        self,
-        sonos_player: SonosPlayer,
-        url: str,
-        queue_item: QueueItem | None,
-    ) -> None:
-        """Enqueue a queue item to the Sonos player Queue."""
-        metadata = create_didl_metadata(self.mass, url, queue_item)
-        try:
-            await asyncio.to_thread(
-                sonos_player.soco.avTransport.SetNextAVTransportURI,
-                [("InstanceID", 0), ("NextURI", url), ("NextURIMetaData", metadata)],
-                timeout=60,
-            )
-        except Exception as err:
-            self.logger.warning(
-                "Unable to enqueue next track on player: %s: %s", sonos_player.zone_name, err
-            )
-        else:
-            self.logger.debug(
-                "Enqued next track (%s) to player %s",
-                queue_item.name if queue_item else url,
-                sonos_player.soco.player_name,
-            )
index 20388f4229cdcc223f6728f9c8c84f8ca97005b3..711a1485c9ce96da74c7bc1a35dd27cc2f284198 100644 (file)
@@ -535,17 +535,17 @@ class SonosPlayer:
         """Handle callback for topology change event."""
         if xml := event.variables.get("zone_group_state"):
             zgs = ET.fromstring(xml)
-            vanished_devices = zgs.find("VanishedDevices") or []
-            for vanished_device in vanished_devices:
-                if (reason := vanished_device.get("Reason")) not in SUPPORTED_VANISH_REASONS:
-                    self.logger.debug(
-                        "Ignoring %s marked %s as vanished with reason: %s",
-                        self.zone_name,
-                        vanished_device.get("ZoneName"),
-                        reason,
-                    )
-                    continue
-                self.mass.create_task(self._vanished(reason))
+            if vanished_devices := zgs.find("VanishedDevices"):
+                for vanished_device in vanished_devices:
+                    if (reason := vanished_device.get("Reason")) not in SUPPORTED_VANISH_REASONS:
+                        self.logger.debug(
+                            "Ignoring %s marked %s as vanished with reason: %s",
+                            self.zone_name,
+                            vanished_device.get("ZoneName"),
+                            reason,
+                        )
+                        continue
+                    self.mass.create_task(self._vanished(reason))
 
         if "zone_player_uui_ds_in_group" not in event.variables:
             return
index 900f5424bfa3043a04acb805951fc919f576caa3..cb764060bf7bcde79a7e879daa81107f28993132 100644 (file)
@@ -8,9 +8,11 @@ allowing the user to create player groups from all players known in the system.
 from __future__ import annotations
 
 import asyncio
+from time import time
 from typing import TYPE_CHECKING
 
 import shortuuid
+from aiohttp import web
 
 from music_assistant.common.models.config_entries import (
     CONF_ENTRY_CROSSFADE_DURATION,
@@ -20,20 +22,19 @@ from music_assistant.common.models.config_entries import (
 )
 from music_assistant.common.models.enums import (
     ConfigEntryType,
+    ContentType,
     MediaType,
     PlayerFeature,
     PlayerState,
     PlayerType,
     ProviderFeature,
 )
-from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import (
-    CONF_CROSSFADE,
-    CONF_GROUP_MEMBERS,
-    SYNCGROUP_PREFIX,
-    UGP_PREFIX,
-)
+from music_assistant.common.models.media_items import AudioFormat
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
+from music_assistant.constants import CONF_CROSSFADE, CONF_GROUP_MEMBERS, SYNCGROUP_PREFIX
+from music_assistant.server.controllers.streams import DEFAULT_STREAM_HEADERS
+from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params
+from music_assistant.server.helpers.multi_client_stream import MultiClientStream
 from music_assistant.server.models.player_provider import PlayerProvider
 
 if TYPE_CHECKING:
@@ -47,6 +48,10 @@ if TYPE_CHECKING:
 
 # ruff: noqa: ARG002
 
+UGP_FORMAT = AudioFormat(
+    content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24
+)
+
 
 async def setup(
     mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
@@ -74,9 +79,6 @@ async def get_config_entries(
 class UniversalGroupProvider(PlayerProvider):
     """Base/builtin provider for universally grouping players."""
 
-    prev_sync_leaders: dict[str, tuple[str]] | None = None
-    debounce_id: str | None = None
-
     @property
     def supported_features(self) -> tuple[ProviderFeature, ...]:
         """Return the features supported by this Provider."""
@@ -87,12 +89,23 @@ class UniversalGroupProvider(PlayerProvider):
     ) -> None:
         """Initialize MusicProvider."""
         super().__init__(mass, manifest, config)
-        self.prev_sync_leaders = {}
+        self._registered_routes: set[str] = set()
+        self.streams: dict[str, MultiClientStream] = {}
 
     async def loaded_in_mass(self) -> None:
         """Call after the provider has been loaded."""
         await self._register_all_players()
 
+    async def unload(self) -> None:
+        """
+        Handle unload/close of the provider.
+
+        Called when provider is deregistered (e.g. MA exiting or config reloading).
+        """
+        for route_path in list(self._registered_routes):
+            self._registered_routes.remove(route_path)
+            self.mass.streams.unregister_dynamic_route(route_path)
+
     async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
         """Return all (provider/player specific) Config Entries for the given player (if any)."""
         base_entries = await super().get_player_config_entries(player_id)
@@ -136,21 +149,19 @@ class UniversalGroupProvider(PlayerProvider):
         """Send STOP command to given player."""
         group_player = self.mass.players.get(player_id)
         group_player.state = PlayerState.IDLE
+        self.mass.players.update(player_id)
         # forward command to player and any connected sync child's
         async with asyncio.TaskGroup() as tg:
             for member in self.mass.players.iter_group_members(group_player, only_powered=True):
                 if member.state == PlayerState.IDLE:
                     continue
                 tg.create_task(self.mass.players.cmd_stop(member.player_id))
-        if existing := self.mass.streams.multi_client_jobs.pop(player_id, None):
-            existing.stop()
+        if (stream := self.streams.pop(player_id, None)) and not stream.done:
+            await stream.stop()
 
     async def cmd_play(self, player_id: str) -> None:
         """Send PLAY command to given player."""
 
-    async def cmd_pause(self, player_id: str) -> None:
-        """Send PAUSE command to given player."""
-
     async def cmd_power(self, player_id: str, powered: bool) -> None:
         """Send POWER command to given player."""
         await self.mass.players.cmd_group_power(player_id, powered)
@@ -159,52 +170,74 @@ class UniversalGroupProvider(PlayerProvider):
         """Send VOLUME_SET command to given player."""
         # group volume is already handled in the player manager
 
-    async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
-        """Send VOLUME MUTE command to given player."""
-
     async def play_media(
         self,
         player_id: str,
-        queue_item: QueueItem,
+        media: PlayerMedia,
     ) -> None:
         """Handle PLAY MEDIA on given player."""
         # power ON
         await self.cmd_power(player_id, True)
         group_player = self.mass.players.get(player_id)
+        # stop any existing stream first
+        if (existing := self.streams.pop(player_id, None)) and not existing.done:
+            existing.task.cancel()
+
+        # select audio source
+        if media.media_type == MediaType.ANNOUNCEMENT:
+            # special case: stream announcement
+            audio_source = self.mass.streams.get_announcement_stream(
+                media.custom_data["url"],
+                output_format=UGP_FORMAT,
+                use_pre_announce=media.custom_data["use_pre_announce"],
+            )
+        elif media.queue_id and media.queue_item_id:
+            # regular queue stream request
+            audio_source = self.mass.streams.get_flow_stream(
+                queue=self.mass.player_queues.get(media.queue_id),
+                start_queue_item=self.mass.player_queues.get_item(
+                    media.queue_id, media.queue_item_id
+                ),
+                pcm_format=UGP_FORMAT,
+            )
+        else:
+            # assume url or some other direct path
+            # NOTE: this will fail if its an uri not playable by ffmpeg
+            audio_source = get_ffmpeg_stream(
+                audio_input=media.uri,
+                input_format=AudioFormat(ContentType.try_parse(media.uri)),
+                output_format=UGP_FORMAT,
+            )
 
-        # create a multi-client stream job - all (direct) child's of this UGP group
-        # will subscribe to this multi client queue stream
-        queue = self.mass.player_queues.get(player_id)
-        stream_job = self.mass.streams.create_multi_client_stream_job(
-            queue.queue_id,
-            start_queue_item=queue_item,
+        # start the stream task
+        self.streams[player_id] = MultiClientStream(
+            audio_source=audio_source, audio_format=UGP_FORMAT
         )
-        # create a fake queue item to forward to downstream play_media commands
-        ugp_queue_item = QueueItem(
-            player_id,
-            queue_item_id=stream_job.job_id,
-            name="Music Assistant",
-            duration=None,
-        )
-        # special case: handle announcement sent to this UGP
-        # we just forward this as-is downstream and let all child players handle this themselves
-        if queue_item.media_type == MediaType.ANNOUNCEMENT:
-            ugp_queue_item = queue_item
+        base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.flac"
 
-        # forward the stream job to all group members
+        # forward to downstream play_media commands
         async with asyncio.TaskGroup() as tg:
             for member in self.mass.players.iter_group_members(group_player, only_powered=True):
-                player_prov = self.mass.players.get_player_provider(member.player_id)
                 if member.player_id.startswith(SYNCGROUP_PREFIX):
                     member = self.mass.players.get_sync_leader(member)  # noqa: PLW2901
                     if member is None:
                         continue
-                tg.create_task(player_prov.play_media(member.player_id, ugp_queue_item))
-
-    async def poll_player(self, player_id: str) -> None:
-        """Poll player for state updates."""
-        self.update_attributes(player_id)
-        self.mass.players.update(player_id, skip_forward=True)
+                tg.create_task(
+                    self.mass.players.play_media(
+                        member.player_id,
+                        media=PlayerMedia(
+                            uri=f"{base_url}?player_id={member.player_id}",
+                            media_type=MediaType.FLOW_STREAM,
+                            title=group_player.display_name,
+                            queue_id=group_player.player_id,
+                        ),
+                    )
+                )
+        # set the state optimistically
+        group_player.elapsed_time = 0
+        group_player.elapsed_time_last_updated = time() - 1
+        group_player.state = PlayerState.PLAYING
+        self.mass.players.update(player_id)
 
     async def create_group(self, name: str, members: list[str]) -> Player:
         """Create new PlayerGroup on this provider.
@@ -214,7 +247,7 @@ class UniversalGroupProvider(PlayerProvider):
             - name: Name for the new group to create.
             - members: A list of player_id's that should be part of this group.
         """
-        new_group_id = f"{UGP_PREFIX}{shortuuid.random(8).lower()}"
+        new_group_id = f"{self.domain}_{shortuuid.random(8).lower()}"
         # cleanup list, filter groups (should be handled by frontend, but just in case)
         members = [
             x.player_id
@@ -261,22 +294,13 @@ class UniversalGroupProvider(PlayerProvider):
             group_childs=set(members),
         )
         self.mass.players.register_or_update(player)
-        return player
-
-    def update_attributes(self, player_id: str) -> None:
-        """Update player attributes."""
-        group_player = self.mass.players.get(player_id)
-        if not group_player.powered:
-            group_player.state = PlayerState.IDLE
-            return
+        # register dynamic routes for the ugp stream (both flac and mp3)
+        for fmt in ("mp3", "flac"):
+            route_path = f"/ugp/{group_player_id}.{fmt}"
+            self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream)
+            self._registered_routes.add(route_path)
 
-        # read the state from the first active group member
-        for member in self.mass.players.iter_group_members(group_player, only_powered=True):
-            group_player.current_item_id = member.current_item_id
-            group_player.elapsed_time = member.elapsed_time
-            group_player.elapsed_time_last_updated = member.elapsed_time_last_updated
-            group_player.state = member.state
-            break
+        return player
 
     def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None:
         """
@@ -309,18 +333,66 @@ class UniversalGroupProvider(PlayerProvider):
             return False
 
         # if a child player turned ON while the group player is already playing
-        # we need to resync/resume
+        # we just direct it to the existing stream (we dont care about the audio being in sync)
         if new_power and group_player.state == PlayerState.PLAYING:
-            self.logger.warning(
-                "Player %s turned on while syncgroup is playing, "
-                "a forced resume for %s will be performed...",
-                child_player.display_name,
-                group_player.display_name,
+            base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.flac"
+            self.mass.create_task(
+                self.mass.players.play_media(
+                    child_player.player_id,
+                    media=PlayerMedia(
+                        uri=f"{base_url}?player_id={child_player.player_id}",
+                        media_type=MediaType.FLOW_STREAM,
+                        title=group_player.display_name,
+                        queue_id=group_player.player_id,
+                    ),
+                )
             )
-            self.mass.loop.call_later(
-                1,
-                self.mass.create_task,
-                self.mass.player_queues.resume(group_player.player_id),
-            )
-            return None
+
         return None
+
+    async def _serve_ugp_stream(self, request: web.Request) -> web.Response:
+        """Serve the UGP (multi-client) flow stream audio to a player."""
+        ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1]
+        fmt = request.path.rsplit(".")[-1]
+        child_player_id = request.query.get("player_id")  # optional!
+
+        if not (ugp_player := self.mass.players.get(ugp_player_id)):
+            raise web.HTTPNotFound(reason=f"Unknown UGP player: {ugp_player_id}")
+
+        if not (stream := self.streams.get(ugp_player_id, None)) or stream.done:
+            raise web.HTTPNotFound(f"There is no active UGP stream for {ugp_player_id}!")
+
+        resp = web.StreamResponse(
+            status=200,
+            reason="OK",
+            headers={
+                **DEFAULT_STREAM_HEADERS,
+                "Content-Type": f"audio/{fmt}",
+            },
+        )
+        await resp.prepare(request)
+
+        # return early if this is not a GET request
+        if request.method != "GET":
+            return resp
+
+        # all checks passed, start streaming!
+        self.logger.debug(
+            "Start serving UGP flow audio stream for UGP-player %s to %s",
+            ugp_player.display_name,
+            child_player_id or request.remote,
+        )
+
+        async for chunk in stream.get_stream(
+            output_format=AudioFormat(content_type=ContentType.try_parse(fmt)),
+            filter_params=get_player_filter_params(self.mass, child_player_id)
+            if child_player_id
+            else None,
+        ):
+            try:
+                await resp.write(chunk)
+            except (BrokenPipeError, ConnectionResetError):
+                # race condition
+                break
+
+        return resp
index e930ea9dac30e9f8026304d496b0abbf528ca7ae..7953fb391001c507c6ed560911ce2ee4b768a8e6 100644 (file)
@@ -98,6 +98,7 @@ class MusicAssistant:
         self._provider_manifests: dict[str, ProviderManifest] = {}
         self._providers: dict[str, ProviderInstanceType] = {}
         self._tracked_tasks: dict[str, asyncio.Task] = {}
+        self._tracked_timers: dict[str, asyncio.TimerHandle] = {}
         self.closing = False
         self.running_as_hass_addon: bool = False
         self.version: str = "0.0.0"
@@ -363,12 +364,24 @@ class MusicAssistant:
         task_id: str | None = None,
         **kwargs: Any,
     ) -> asyncio.TimerHandle:
-        """Run callable/awaitable after given delay."""
+        """
+        Run callable/awaitable after given delay.
+
+        Use task_id for debouncing.
+        """
+        if not task_id:
+            task_id = uuid4().hex
+
+        if existing := self._tracked_timers.get(task_id):
+            existing.cancel()
 
         def _create_task() -> None:
+            self._tracked_timers.pop(task_id)
             self.create_task(target, *args, task_id=task_id, **kwargs)
 
-        return self.loop.call_later(delay, _create_task)
+        handle = self.loop.call_later(delay, _create_task)
+        self._tracked_timers[task_id] = handle
+        return handle
 
     def get_task(self, task_id: str) -> asyncio.Task | asyncio.Future:
         """Get existing scheduled task."""