From a47ac7e149a2d26676434e19a496d635b241ac02 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Thu, 4 Apr 2024 21:46:49 +0200 Subject: [PATCH] Bugfixes and improvements to (universal) player groups (#1203) --- music_assistant/common/helpers/uri.py | 2 +- music_assistant/common/models/enums.py | 2 +- music_assistant/common/models/player.py | 24 +- music_assistant/constants.py | 1 - .../server/controllers/player_queues.py | 42 +- music_assistant/server/controllers/players.py | 274 ++++--------- music_assistant/server/controllers/streams.py | 369 ++---------------- music_assistant/server/helpers/audio.py | 7 +- music_assistant/server/helpers/didl_lite.py | 65 +-- .../server/helpers/multi_client_stream.py | 98 +++++ music_assistant/server/helpers/webserver.py | 6 +- .../server/models/player_provider.py | 141 +++++-- .../server/providers/airplay/__init__.py | 76 ++-- .../server/providers/chromecast/__init__.py | 129 +++--- .../server/providers/dlna/__init__.py | 41 +- .../server/providers/fully_kiosk/__init__.py | 19 +- .../server/providers/hass_players/__init__.py | 47 +-- .../server/providers/slimproto/__init__.py | 269 ++++++++----- .../server/providers/snapcast/__init__.py | 65 +-- .../server/providers/sonos/__init__.py | 92 ++--- .../server/providers/sonos/player.py | 22 +- .../server/providers/ugp/__init__.py | 220 +++++++---- music_assistant/server/server.py | 17 +- 23 files changed, 946 insertions(+), 1082 deletions(-) create mode 100644 music_assistant/server/helpers/multi_client_stream.py diff --git a/music_assistant/common/helpers/uri.py b/music_assistant/common/helpers/uri.py index 558417bc..11a3997c 100644 --- a/music_assistant/common/helpers/uri.py +++ b/music_assistant/common/helpers/uri.py @@ -19,7 +19,7 @@ def parse_uri(uri: str) -> tuple[MediaType, str, str]: media_type_str = uri.split("/")[3] media_type = MediaType(media_type_str) item_id = uri.split("/")[4].split("?")[0] - elif uri.startswith(("http://", "https://")): + elif uri.startswith(("http://", "https://", "rtsp://", "rtmp://")): # Translate a plain URL to the URL provider provider_instance_id_or_domain = "url" media_type = MediaType.UNKNOWN diff --git a/music_assistant/common/models/enums.py b/music_assistant/common/models/enums.py index 75a2d53a..2cfbaaa2 100644 --- a/music_assistant/common/models/enums.py +++ b/music_assistant/common/models/enums.py @@ -19,6 +19,7 @@ class MediaType(StrEnum): RADIO = "radio" FOLDER = "folder" ANNOUNCEMENT = "announcement" + FLOW_STREAM = "flow_stream" UNKNOWN = "unknown" @classmethod @@ -35,7 +36,6 @@ class MediaType(StrEnum): MediaType.TRACK, MediaType.PLAYLIST, MediaType.RADIO, - MediaType.ANNOUNCEMENT, ) diff --git a/music_assistant/common/models/player.py b/music_assistant/common/models/player.py index eb906bf4..ef32a220 100644 --- a/music_assistant/common/models/player.py +++ b/music_assistant/common/models/player.py @@ -8,7 +8,7 @@ from typing import Any from mashumaro import DataClassDictMixin -from .enums import PlayerFeature, PlayerState, PlayerType +from .enums import MediaType, PlayerFeature, PlayerState, PlayerType @dataclass(frozen=True) @@ -20,6 +20,22 @@ class DeviceInfo(DataClassDictMixin): manufacturer: str = "Unknown Manufacturer" +@dataclass +class PlayerMedia(DataClassDictMixin): + """Metadata of Media loading/loaded into a player.""" + + uri: str # uri or other identifier of the loaded media + media_type: MediaType = MediaType.UNKNOWN + title: str | None = None # optional + artist: str | None = None # optional + album: str | None = None # optional + image_url: str | None = None # optional + duration: int | None = None # optional + queue_id: str | None = None # only present for requests from queue controller + queue_item_id: str | None = None # only present for requests from queue controller + custom_data: dict | None = None # optional + + @dataclass class Player(DataClassDictMixin): """Representation of a Player within Music Assistant.""" @@ -58,8 +74,14 @@ class Player(DataClassDictMixin): # current_item_id: return item_id/uri of the current active/loaded item on the player # this may be a MA queue_item_id, url, uri or some provider specific string + # deprecated: use current_media instead current_item_id: str | None = None + # current_media: return current active/loaded item on the player + # this may be a MA queue item, url, uri or some provider specific string + # includes metadata if supported by the provider/player + current_media: PlayerMedia | None = None + # can_sync_with: return tuple of player_ids that can be synced to/with this player # usually this is just a list of all player_ids within the playerprovider can_sync_with: tuple[str, ...] = field(default=()) diff --git a/music_assistant/constants.py b/music_assistant/constants.py index bd6a965e..a2eaa1f0 100644 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -97,4 +97,3 @@ CONFIGURABLE_CORE_CONTROLLERS = ( ) SYNCGROUP_PREFIX: Final[str] = "syncgroup_" VERBOSE_LOG_LEVEL: Final[int] = 5 -UGP_PREFIX: Final[str] = "ugp_" diff --git a/music_assistant/server/controllers/player_queues.py b/music_assistant/server/controllers/player_queues.py index 8bd6b83d..fc607e7a 100644 --- a/music_assistant/server/controllers/player_queues.py +++ b/music_assistant/server/controllers/player_queues.py @@ -30,9 +30,10 @@ from music_assistant.common.models.errors import ( QueueEmpty, ) from music_assistant.common.models.media_items import MediaItemType, media_from_dict +from music_assistant.common.models.player import PlayerMedia from music_assistant.common.models.player_queue import PlayerQueue from music_assistant.common.models.queue_item import QueueItem -from music_assistant.constants import FALLBACK_DURATION +from music_assistant.constants import CONF_FLOW_MODE, FALLBACK_DURATION, MASS_LOGO_ONLINE from music_assistant.server.helpers.api import api_command from music_assistant.server.helpers.audio import get_stream_details from music_assistant.server.models.core_controller import CoreController @@ -199,8 +200,11 @@ class PlayerQueuesController(CoreController): """Return the current active/synced queue for a player.""" if player := self.mass.players.get(player_id): # account for player that is synced (sync child) - if player.synced_to: + if player.synced_to and player.synced_to != player.player_id: return self.get_active_queue(player.synced_to) + # handle active group player + if player.active_group and player.active_group != player.player_id: + return self.get_active_queue(player.active_group) # active_source may be filled with other queue id if player.active_source != player_id and ( queue := self.get_active_queue(player.active_source) @@ -671,14 +675,18 @@ class PlayerQueuesController(CoreController): queue.current_index = index queue.index_in_buffer = index queue.flow_mode_start_index = index - queue.flow_mode = False # reset + queue.flow_mode = self.mass.config.get_raw_player_config_value( + queue_id, CONF_FLOW_MODE, False + ) # get streamdetails - do this here to catch unavailable items early queue_item.streamdetails = await get_stream_details( self.mass, queue_item, seek_position=seek_position, fade_in=fade_in ) + # send play_media request to player await self.mass.players.play_media( player_id=queue_id, - queue_item=queue_item, + # transform into PlayerMedia to send to the actual player implementation + media=self.player_media_from_queue_item(queue_item, queue.flow_mode), ) # Interaction with player @@ -953,6 +961,27 @@ class PlayerQueuesController(CoreController): return index return None + def player_media_from_queue_item(self, queue_item: QueueItem, flow_mode: bool) -> PlayerMedia: + """Parse PlayerMedia from QueueItem.""" + media = PlayerMedia( + uri=self.mass.streams.resolve_stream_url(queue_item, flow_mode=flow_mode), + media_type=MediaType.FLOW_STREAM if flow_mode else queue_item.media_type, + title="Music Assistant" if flow_mode else queue_item.name, + image_url=MASS_LOGO_ONLINE, + duration=queue_item.duration, + queue_id=queue_item.queue_id, + queue_item_id=queue_item.queue_item_id, + ) + if not flow_mode and queue_item.media_item: + media.title = queue_item.media_item.name + media.artist = getattr(queue_item.media_item, "artist_str", "") + media.album = ( + album.name if (album := getattr(queue_item.media_item, "album", None)) else "" + ) + if queue_item.image: + media.image_url = self.mass.metadata.get_image_url(queue_item.image) + return media + def _get_next_index( self, queue_id: str, cur_index: int | None, is_skip: bool = False ) -> int | None: @@ -1028,8 +1057,9 @@ class PlayerQueuesController(CoreController): with suppress(QueueEmpty): next_item = await self.preload_next_item(queue.queue_id, index) if supports_enqueue: - await self.mass.players.enqueue_next_queue_item( - player_id=player.player_id, queue_item=next_item + await self.mass.players.enqueue_next_media( + player_id=player.player_id, + media=self.player_media_from_queue_item(next_item, queue.flow_mode), ) return await self.play_index(queue.queue_id, next_item.queue_item_id) diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index 3f5cb23e..264a8d6d 100644 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -7,8 +7,6 @@ import functools from contextlib import suppress from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, cast -import shortuuid - from music_assistant.common.helpers.util import get_changed_values from music_assistant.common.models.config_entries import ( CONF_ENTRY_ANNOUNCE_VOLUME, @@ -20,7 +18,6 @@ from music_assistant.common.models.config_entries import ( CONF_ENTRY_TTS_PRE_ANNOUNCE, ) from music_assistant.common.models.enums import ( - ContentType, EventType, MediaType, PlayerFeature, @@ -28,7 +25,6 @@ from music_assistant.common.models.enums import ( PlayerType, ProviderFeature, ProviderType, - StreamType, ) from music_assistant.common.models.errors import ( AlreadyRegisteredError, @@ -36,24 +32,21 @@ from music_assistant.common.models.errors import ( ProviderUnavailableError, UnsupportedFeaturedException, ) -from music_assistant.common.models.media_items import AudioFormat -from music_assistant.common.models.player import DeviceInfo, Player -from music_assistant.common.models.queue_item import QueueItem -from music_assistant.common.models.streamdetails import StreamDetails +from music_assistant.common.models.player import Player, PlayerMedia from music_assistant.constants import ( CONF_AUTO_PLAY, CONF_GROUP_MEMBERS, CONF_HIDE_PLAYER, CONF_PLAYERS, SYNCGROUP_PREFIX, - UGP_PREFIX, ) from music_assistant.server.helpers.api import api_command +from music_assistant.server.helpers.tags import parse_tags from music_assistant.server.models.core_controller import CoreController from music_assistant.server.models.player_provider import PlayerProvider if TYPE_CHECKING: - from collections.abc import Awaitable, Callable, Coroutine, Iterable, Iterator + from collections.abc import Awaitable, Callable, Coroutine, Iterator from music_assistant.common.models.config_entries import CoreConfig @@ -459,13 +452,13 @@ class PlayerController(CoreController): # always optimistically set the power state to update the UI # as fast as possible and prevent race conditions player.powered = powered + # always MA as active source on power ON + player.active_source = player_id if powered else None self.update(player_id) - # handle actions when a syncgroup child turns on + # handle actions when a (sync)group child turns on/off if active_group_player := self._get_active_player_group(player): - if active_group_player.startswith(SYNCGROUP_PREFIX): - self._on_syncgroup_child_power(active_group_player, player.player_id, powered) - elif player_prov := self.get_player_provider(active_group_player): - player_prov.on_child_power(active_group_player, player.player_id, powered) + player_prov = self.get_player_provider(active_group_player) + player_prov.on_child_power(active_group_player, player.player_id, powered) # handle 'auto play on power on' feature elif ( powered @@ -540,14 +533,14 @@ class PlayerController(CoreController): @api_command("players/cmd/group_power") async def cmd_group_power(self, player_id: str, power: bool) -> None: - """Handle power command for a PlayerGroup/SyncGroup.""" + """Handle power command for a SyncGroup.""" group_player = self.get(player_id, True) if group_player.powered == power: return # nothing to do - if group_player.type == PlayerType.GROUP and not player_id.startswith(UGP_PREFIX): - # this is a native group player (and not UGP), redirect + if group_player.type == PlayerType.GROUP: + # this is a native group player, redirect await self.cmd_power(player_id, power) return @@ -564,19 +557,22 @@ class PlayerController(CoreController): any_member_powered = True if power: # set active source to group player if the group (is going to be) powered - member.active_source = group_player.player_id + member.active_group = group_player.player_id + member.active_source = group_player.active_source else: # turn off child player when group turns off tg.create_task(self.cmd_power(member.player_id, False)) member.active_source = None + member.active_group = None # edge case: group turned on but no members are powered, power them all! if not any_member_powered and power: for member in self.iter_group_members(group_player, only_powered=False): tg.create_task(self.cmd_power(member.player_id, True)) - member.active_source = group_player.player_id + member.active_group = group_player.player_id + member.active_source = group_player.active_source if power and group_player.player_id.startswith(SYNCGROUP_PREFIX): - await self._sync_syncgroup(group_player.player_id) + await self.sync_syncgroup(group_player.player_id) self.update(player_id) @api_command("players/cmd/volume_mute") @@ -646,6 +642,25 @@ class PlayerController(CoreController): player.active_group, url, use_pre_announce, volume_level ) return + if player.type in (PlayerType.SYNC_GROUP, PlayerType.GROUP) and not player.powered: + # announcement request sent to inactive group, + # redirect to all underlying players instead + self.logger.warning( + "Detected announcement request to an inactive playergroup, " + "this will be redirected to the individual players." + ) + async with asyncio.TaskGroup() as tg: + for group_member in player.group_childs: + tg.create_task( + self.play_announcement( + group_member, + url=url, + use_pre_announce=use_pre_announce, + volume_level=volume_level, + ) + ) + return + # determine pre-announce from (group)player config if use_pre_announce is None and "tts" in url: use_pre_announce = self.mass.config.get_raw_player_config_value( @@ -659,25 +674,13 @@ class PlayerController(CoreController): use_pre_announce, url, ) - # create a queue item for the announcement so + # create a PlayerMedia object for the announcement so # we can send a regular play-media call downstream - announcement = QueueItem( - queue_id=player.player_id, - queue_item_id=url, - name="Announcement", - duration=None, - streamdetails=StreamDetails( - provider="url", - item_id=url, - audio_format=AudioFormat( - content_type=ContentType.try_parse(url), - ), - stream_type=StreamType.HTTP, - media_type=MediaType.ANNOUNCEMENT, - path=url, - target_loudness=-10, - data={"url": url, "use_pre_announce": use_pre_announce}, - ), + announcement = PlayerMedia( + uri=self.mass.streams.get_announcement_url(player_id, url, use_pre_announce), + media_type=MediaType.ANNOUNCEMENT, + title="Announcement", + custom_data={"url": url, "use_pre_announce": use_pre_announce}, ) # handle native announce support if native_announce_support: @@ -689,54 +692,41 @@ class PlayerController(CoreController): finally: player.announcement_in_progress = False - async def play_media(self, player_id: str, queue_item: QueueItem) -> None: + @api_command("players/cmd/play_media") + async def play_media(self, player_id: str, media: PlayerMedia) -> None: """Handle PLAY MEDIA on given player. - This is called by the Queue controller to start playing a queue item on the given player. - The provider's own implementation should work out how to handle this request. - - - player_id: player_id of the player to handle the command. - - queue_item: The QueueItem that needs to be played on the player. + - player_id: player_id of the player to handle the command. + - media: The Media that needs to be played on the player. """ if player_id.startswith(SYNCGROUP_PREFIX): # redirect to syncgroup-leader if needed await self.cmd_group_power(player_id, True) group_player = self.get(player_id, True) if sync_leader := self.get_sync_leader(group_player): - await self.play_media(sync_leader.player_id, queue_item=queue_item) + await self.play_media(sync_leader.player_id, media=media) group_player.state = PlayerState.PLAYING return player_prov = self.mass.players.get_player_provider(player_id) await player_prov.play_media( player_id=player_id, - queue_item=queue_item, + media=media, ) - async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: - """ - Handle enqueuing of the next queue item on the player. - - Only called if the player supports PlayerFeature.ENQUE_NEXT. - Called about 1 second after a new track started playing. - Called about 15 seconds before the end of the current track. - - A PlayerProvider implementation is in itself responsible for handling this - so that the queue items keep playing until its empty or the player stopped. - - This will NOT be called if the end of the queue is reached (and repeat disabled). - This will NOT be called if the player is using flow mode to playback the queue. - """ + @api_command("players/cmd/enqueue_next_media") + async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None: + """Handle enqueuing of a next media item on the player.""" if player_id.startswith(SYNCGROUP_PREFIX): # redirect to syncgroup-leader if needed group_player = self.get(player_id, True) if sync_leader := self.get_sync_leader(group_player): - await self.enqueue_next_queue_item( + await self.enqueue_next_media( sync_leader.player_id, - queue_item=queue_item, + media=media, ) return player_prov = self.mass.players.get_player_provider(player_id) - await player_prov.enqueue_next_queue_item(player_id=player_id, queue_item=queue_item) + await player_prov.enqueue_next_media(player_id=player_id, media=media) @api_command("players/cmd/sync") @handle_player_command @@ -761,6 +751,8 @@ class PlayerController(CoreController): if PlayerFeature.SYNC not in parent_player.supported_features: msg = f"Player {parent_player.name} does not support (un)sync commands" raise UnsupportedFeaturedException(msg) + if player_id == target_player: + return if child_player.synced_to: if child_player.synced_to == parent_player.player_id: # nothing to do: already synced to this parent @@ -825,13 +817,10 @@ class PlayerController(CoreController): msg = f"Provider {provider} is not available!" raise ProviderUnavailableError(msg) if ProviderFeature.PLAYER_GROUP_CREATE in player_prov.supported_features: - # provider supports group create feature: forward request to provider - # the provider is itself responsible for - # checking if the members can be used for grouping + # Provider supports group create feature: forward request to provider. + # NOTE: The provider is itself responsible for + # checking if the members can be used for grouping. return await player_prov.create_group(name, members=members) - if ProviderFeature.SYNC_PLAYERS in player_prov.supported_features: - # default syncgroup implementation - return await self._create_syncgroup(player_prov.instance_id, name, members) msg = f"Provider {player_prov.name} does not support creating groups" raise UnsupportedFeaturedException(msg) @@ -1011,29 +1000,6 @@ class PlayerController(CoreController): # Syncgroup specific functions/helpers - async def _create_syncgroup(self, provider: str, name: str, members: list[str]) -> Player: - """Create new (providers-specific) SyncGroup with given name and members.""" - new_group_id = f"{SYNCGROUP_PREFIX}{shortuuid.random(8).lower()}" - # cleanup list, filter groups (should be handled by frontend, but just in case) - members = [ - x.player_id - for x in self - if x.player_id in members - if not x.player_id.startswith(SYNCGROUP_PREFIX) - if x.provider == provider and PlayerFeature.SYNC in x.supported_features - ] - # create default config with the user chosen name - self.mass.config.create_default_player_config( - new_group_id, - provider, - name=name, - enabled=True, - values={CONF_GROUP_MEMBERS: members}, - ) - return self._register_syncgroup( - group_player_id=new_group_id, provider=provider, name=name, members=members - ) - def get_sync_leader(self, group_player: Player) -> Player | None: """Get the active sync leader player for a syncgroup or synced player.""" if group_player.synced_to: @@ -1061,7 +1027,7 @@ class PlayerController(CoreController): return child_player return None - async def _sync_syncgroup(self, player_id: str) -> None: + async def sync_syncgroup(self, player_id: str) -> None: """Sync all (possible) players of a syncgroup.""" group_player = self.get(player_id, True) sync_leader = self.get_sync_leader(group_player) @@ -1078,105 +1044,25 @@ class PlayerController(CoreController): async def _register_syncgroups(self) -> None: """Register all (virtual/fake) syncgroup players.""" - player_configs = await self.mass.config.get_player_configs(include_values=True) + player_configs = await self.mass.config.get_player_configs() for player_config in player_configs: if not player_config.player_id.startswith(SYNCGROUP_PREFIX): continue - members = player_config.get_value(CONF_GROUP_MEMBERS) - self._register_syncgroup( + if not (player_prov := self.mass.get_provider(player_config.provider)): + continue + members = self.mass.config.get_raw_player_config_value( + player_config.player_id, CONF_GROUP_MEMBERS + ) + player_prov.register_syncgroup( group_player_id=player_config.player_id, - provider=player_config.provider, name=player_config.name or player_config.default_name, members=members, ) - def _register_syncgroup( - self, group_player_id: str, provider: str, name: str, members: Iterable[str] - ) -> Player: - """Register a (virtual/fake) syncgroup player.""" - # extract player features from first/random player - for member in members: - if first_player := self.get(member): - break - else: - # edge case: no child player is (yet) available; postpone register - return None - player = Player( - player_id=group_player_id, - provider=provider, - type=PlayerType.SYNC_GROUP, - name=name, - available=True, - powered=False, - device_info=DeviceInfo(model="SyncGroup", manufacturer=provider.title()), - supported_features=first_player.supported_features, - group_childs=set(members), - active_source=group_player_id, - ) - self.mass.players.register_or_update(player) - return player - - def _on_syncgroup_child_power( - self, player_id: str, child_player_id: str, new_power: bool - ) -> None: - """ - Call when a power command was executed on one of the child player of a Player/Sync group. - - This is used to handle special actions such as (re)syncing. - """ - group_player = self.mass.players.get(player_id) - child_player = self.mass.players.get(child_player_id) - - if not group_player.powered: - # guard, this should be caught in the player controller but just in case... - return - - powered_childs = list(self.iter_group_members(group_player, True)) - if not new_power and child_player in powered_childs: - powered_childs.remove(child_player) - if new_power and child_player not in powered_childs: - powered_childs.append(child_player) - - # if the last player of a group turned off, turn off the group - if len(powered_childs) == 0: - self.logger.debug( - "Group %s has no more powered members, turning off group player", - group_player.display_name, - ) - self.mass.create_task(self.cmd_power(player_id, False)) - return - - group_playing = group_player.state == PlayerState.PLAYING - is_sync_leader = ( - len(child_player.group_childs) > 0 - and child_player.active_source == group_player.player_id - ) - if group_playing and not new_power and is_sync_leader: - # the current sync leader player turned OFF while the group player - # should still be playing - we need to select a new sync leader and resume - self.logger.warning( - "Syncleader %s turned off while syncgroup is playing, " - "a forced resume for syngroup %s will be attempted in 5 seconds...", - child_player.display_name, - group_player.display_name, - ) - - async def forced_resync() -> None: - # we need to wait a bit here to not run into massive race conditions - await asyncio.sleep(5) - await self._sync_syncgroup(group_player.player_id) - await self.mass.player_queues.resume(group_player.player_id) - - self.mass.create_task(forced_resync()) - return - if new_power: - # if a child player turned ON while the group player is on, we need to resync/resume - self.mass.create_task(self._sync_syncgroup(group_player.player_id)) - async def _play_announcement( self, player: Player, - announcement: QueueItem, + announcement: PlayerMedia, volume_level: int | None = None, ) -> None: """Handle (default/fallback) implementation of the play announcement feature. @@ -1209,9 +1095,9 @@ class PlayerController(CoreController): # wait for the player to stop with suppress(TimeoutError): await self.wait_for_state(player, PlayerState.IDLE, 10) - # a small amount of pause before the volume command - # prevents that the last piece of music is very loud - await asyncio.sleep(0.2) + # a small amount of pause before the volume command + # prevents that the last piece of music is very loud + await asyncio.sleep(0.2) # adjust volume if needed # in case of a (sync) group, we need to do this for all child players prev_volumes: dict[str, int] = {} @@ -1219,7 +1105,12 @@ class PlayerController(CoreController): for volume_player_id in player.group_childs or (player.player_id,): if not (volume_player := self.get(volume_player_id)): continue - if volume_player.active_source != player.active_source: + # filter out players that have a different source active + if volume_player.active_source not in ( + player.active_source, + volume_player.player_id, + None, + ): continue prev_volume = volume_player.volume_level announcement_volume = self.get_announcement_volume(volume_player_id, volume_level) @@ -1239,8 +1130,8 @@ class PlayerController(CoreController): "Announcement to player %s - playing the announcement on the player...", player.display_name, ) - await self.play_media(player_id=player.player_id, queue_item=announcement) - # wait for the player to play + await self.play_media(player_id=player.player_id, media=announcement) + # wait for the player(s) to play with suppress(TimeoutError): await self.wait_for_state(player, PlayerState.PLAYING, 10) self.logger.debug( @@ -1248,10 +1139,11 @@ class PlayerController(CoreController): player.display_name, ) # wait for the player to stop playing + if not announcement.duration: + media_info = await parse_tags(announcement.custom_data["url"]) + announcement.duration = media_info.duration with suppress(TimeoutError): - await self.wait_for_state( - player, PlayerState.IDLE, (announcement.streamdetails.duration or 60) + 3 - ) + await self.wait_for_state(player, PlayerState.IDLE, (announcement.duration or 60) + 3) self.logger.debug( "Announcement to player %s - restore previous state...", player.display_name ) diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 2cd0fa10..f608609f 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -14,10 +14,8 @@ import os import time import urllib.parse from collections.abc import AsyncGenerator -from contextlib import suppress from typing import TYPE_CHECKING -import shortuuid from aiofiles.os import wrap from aiohttp import web @@ -40,7 +38,6 @@ from music_assistant.constants import ( CONF_OUTPUT_CHANNELS, CONF_PUBLISH_IP, SILENCE_FILE, - UGP_PREFIX, ) from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER from music_assistant.server.helpers.audio import ( @@ -84,191 +81,6 @@ FLOW_DEFAULT_BIT_DEPTH = 24 isfile = wrap(os.path.isfile) -class MultiClientStreamJob: - """ - Representation of a (multiclient) Audio Queue stream job/task. - - The whole idea here is that in case of a player (sync)group, - all client players receive the exact same (PCM) audio chunks from the source audio. - A StreamJob is tied to a Queue and streams the queue flow stream, - In case a stream is restarted (e.g. when seeking), a new MultiClientStreamJob will be created. - """ - - _audio_task: asyncio.Task | None = None - - def __init__( - self, - stream_controller: StreamsController, - queue_id: str, - pcm_format: AudioFormat, - start_queue_item: QueueItem, - ) -> None: - """Initialize MultiClientStreamJob instance.""" - self.stream_controller = stream_controller - self.queue_id = queue_id - self.queue = self.stream_controller.mass.player_queues.get(queue_id) - assert self.queue # just in case - self.pcm_format = pcm_format - self.start_queue_item = start_queue_item - self.job_id = shortuuid.uuid() - self.expected_players: set[str] = set() - self.subscribed_players: dict[str, asyncio.Queue[bytes]] = {} - self.bytes_streamed: int = 0 - self._all_clients_connected = asyncio.Event() - self.logger = stream_controller.logger.getChild("streamjob") - self._finished: bool = False - # start running the audio task in the background - self._audio_task = asyncio.create_task(self._stream_job_runner()) - - @property - def finished(self) -> bool: - """Return if this StreamJob is finished.""" - return self._finished or self._audio_task and self._audio_task.done() - - @property - def pending(self) -> bool: - """Return if this Job is pending start.""" - return not self.finished and not self._all_clients_connected.is_set() - - @property - def running(self) -> bool: - """Return if this Job is running.""" - return not self.finished and not self.pending - - def stop(self) -> None: - """Stop running this job.""" - self._finished = True - if self._audio_task and self._audio_task.done(): - return - if self._audio_task: - self._audio_task.cancel() - for sub_queue in self.subscribed_players.values(): - with suppress(asyncio.QueueFull): - sub_queue.put_nowait(b"") - - def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str: - """Resolve the childplayer specific stream URL to this streamjob.""" - fmt = output_codec.value - # handle raw pcm - if output_codec.is_pcm(): - player = self.stream_controller.mass.players.get(child_player_id) - player_max_bit_depth = 24 if player.supports_24bit else 16 - output_sample_rate = min(self.pcm_format.sample_rate, player.max_sample_rate) - output_bit_depth = min(self.pcm_format.bit_depth, player_max_bit_depth) - output_channels = self.stream_controller.mass.config.get_raw_player_config_value( - child_player_id, CONF_OUTPUT_CHANNELS, "stereo" - ) - channels = 1 if output_channels != "stereo" else 2 - fmt += ( - f";codec=pcm;rate={output_sample_rate};" - f"bitrate={output_bit_depth};channels={channels}" - ) - url = f"{self.stream_controller._server.base_url}/multi/{self.queue_id}/{self.job_id}/{child_player_id}/{self.start_queue_item.queue_item_id}.{fmt}" # noqa: E501 - self.expected_players.add(child_player_id) - return url - - async def subscribe(self, player_id: str) -> AsyncGenerator[bytes, None]: - """Subscribe consumer and iterate incoming chunks on the queue.""" - try: - # some players (e.g. dlna, sonos) misbehave and do multiple GET requests - # to the stream in an attempt to get the audio details such as duration - # which is a bit pointless for our duration-less queue stream - # and it completely messes with the subscription logic - if player_id in self.subscribed_players: - self.logger.warning( - "Player %s is making multiple requests " - "to the same stream, playback may be disturbed!", - player_id, - ) - player_id = f"{player_id}_{shortuuid.random(4)}" - elif self._all_clients_connected.is_set(): - # client subscribes while we're already started - that is going to be messy for sure - self.logger.warning( - "Player %s is is joining while the stream is already started, " - "playback may be disturbed!", - player_id, - ) - - self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2) - - if self._all_clients_connected.is_set(): - # client subscribes while we're already started, - # that will most probably lead to a bad experience but support it anyways - self.logger.warning( - "Client %s is joining while the stream is already started", player_id - ) - self.logger.debug("Subscribed client %s", player_id) - - if len(self.subscribed_players) == len(self.expected_players): - # we reached the number of expected subscribers, set event - # so that chunks can be pushed - await asyncio.sleep(0.2) - self._all_clients_connected.set() - - # keep reading audio chunks from the queue until we receive an empty one - while True: - chunk = await sub_queue.get() - if chunk == b"": - # EOF chunk received - break - yield chunk - finally: - self.subscribed_players.pop(player_id, None) - self.logger.debug("Unsubscribed client %s", player_id) - # check if this was the last subscriber and we should cancel - await asyncio.sleep(2) - if len(self.subscribed_players) == 0 and self._audio_task and not self.finished: - self.logger.debug("Cleaning up, all clients disappeared...") - self._audio_task.cancel() - - async def _put_chunk(self, chunk: bytes) -> None: - """Put chunk of data to all subscribers.""" - async with asyncio.TaskGroup() as tg: - for sub_queue in list(self.subscribed_players.values()): - # put this chunk on the player's subqueue - tg.create_task(sub_queue.put(chunk)) - self.bytes_streamed += len(chunk) - - async def _stream_job_runner(self) -> None: - """Feed audio chunks to StreamJob subscribers.""" - chunk_num = 0 - async for chunk in self.stream_controller.get_flow_stream( - self.queue, - self.start_queue_item, - self.pcm_format, - ): - chunk_num += 1 - if chunk_num == 1: - # wait until all expected clients are connected - try: - async with asyncio.timeout(10): - await self._all_clients_connected.wait() - except TimeoutError: - if len(self.subscribed_players) == 0: - self.stream_controller.logger.exception( - "Abort multi client stream job for queue %s: " - "clients did not connect within timeout", - self.queue.display_name, - ) - break - # not all clients connected but timeout expired, set flag and move on - # with all clients that did connect - self._all_clients_connected.set() - else: - self.stream_controller.logger.debug( - "Starting multi client stream job for queue %s " - "with %s out of %s connected clients", - self.queue.display_name, - len(self.subscribed_players), - len(self.expected_players), - ) - - await self._put_chunk(chunk) - - # mark EOF with empty chunk - await self._put_chunk(b"") - - def parse_pcm_info(content_type: str) -> tuple[int, int, int]: """Parse PCM info from a codec/content_type string.""" params = ( @@ -289,7 +101,6 @@ class StreamsController(CoreController): """Initialize instance.""" super().__init__(*args, **kwargs) self._server = Webserver(self.logger, enable_dynamic_routes=True) - self.multi_client_jobs: dict[str, MultiClientStreamJob] = {} self.register_dynamic_route = self._server.register_dynamic_route self.unregister_dynamic_route = self._server.unregister_dynamic_route self.manifest.name = "Streamserver" @@ -387,11 +198,6 @@ class StreamsController(CoreController): "/single/{queue_id}/{queue_item_id}.{fmt}", self.serve_queue_item_stream, ), - ( - "*", - "/multi/{queue_id}/{job_id}/{player_id}/{queue_item_id}.{fmt}", - self.serve_multi_subscriber_stream, - ), ( "*", "/command/{queue_id}/{command}.mp3", @@ -411,26 +217,12 @@ class StreamsController(CoreController): def resolve_stream_url( self, - player_id: str, queue_item: QueueItem, - output_codec: ContentType, flow_mode: bool = False, + output_codec: ContentType = ContentType.FLAC, ) -> str: """Resolve the stream URL for the given QueueItem.""" fmt = output_codec.value - # handle special stream created by UGP - if queue_item.queue_id.startswith(UGP_PREFIX): - return self.multi_client_jobs[queue_item.queue_id].resolve_stream_url( - player_id, output_codec - ) - # handle announcement item - if queue_item.media_type == MediaType.ANNOUNCEMENT: - return self.get_announcement_url( - player_id=queue_item.queue_id, - announcement_url=queue_item.streamdetails.data["url"], - use_pre_announce=queue_item.streamdetails.data["use_pre_announce"], - content_type=output_codec, - ) # handle raw pcm without exact format specifiers if output_codec.is_pcm() and ";" not in fmt: fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}" @@ -444,40 +236,6 @@ class StreamsController(CoreController): url += "?" + urllib.parse.urlencode(query_params) return url - def create_multi_client_stream_job( - self, - queue_id: str, - start_queue_item: QueueItem, - pcm_bit_depth: int = FLOW_DEFAULT_BIT_DEPTH, - pcm_sample_rate: int = FLOW_DEFAULT_SAMPLE_RATE, - ) -> MultiClientStreamJob: - """Create a MultiClientStreamJob for the given queue.. - - This is called by player/sync group implementations to start streaming - the queue audio to multiple players at once. - """ - if existing_job := self.multi_client_jobs.pop(queue_id, None): - if ( - queue_id.startswith(UGP_PREFIX) - and existing_job.job_id == start_queue_item.queue_item_id - ): - return existing_job - # cleanup existing job first - if not existing_job.finished: - existing_job.stop() - self.multi_client_jobs[queue_id] = stream_job = MultiClientStreamJob( - self, - queue_id=queue_id, - pcm_format=AudioFormat( - content_type=ContentType.from_bit_depth(pcm_bit_depth), - sample_rate=pcm_sample_rate, - bit_depth=pcm_bit_depth, - channels=2, - ), - start_queue_item=start_queue_item, - ) - return stream_job - async def serve_queue_item_stream(self, request: web.Request) -> web.Response: """Stream single queueitem audio to a player.""" self._log_request(request) @@ -642,64 +400,6 @@ class StreamsController(CoreController): return resp - async def serve_multi_subscriber_stream(self, request: web.Request) -> web.Response: - """Stream Queue Flow audio to a child player within a multi subscriber setup.""" - self._log_request(request) - queue_id = request.match_info["queue_id"] - streamjob = self.multi_client_jobs.get(queue_id) - if not streamjob: - raise web.HTTPNotFound(reason=f"Unknown StreamJob for queue: {queue_id}") - job_id = request.match_info["job_id"] - if job_id != streamjob.job_id: - raise web.HTTPNotFound(reason=f"StreamJob ID {job_id} mismatch for queue: {queue_id}") - child_player_id = request.match_info["player_id"] - child_player = self.mass.players.get(child_player_id) - if not child_player: - raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}") - # work out (childplayer specific!) output format/details - output_format = await self._get_output_format( - output_format_str=request.match_info["fmt"], - queue_player=child_player, - default_sample_rate=streamjob.pcm_format.sample_rate, - default_bit_depth=streamjob.pcm_format.bit_depth, - ) - # prepare request, add some DLNA/UPNP compatible headers - headers = { - **DEFAULT_STREAM_HEADERS, - "Content-Type": f"audio/{output_format.output_format_str}", - } - resp = web.StreamResponse( - status=200, - reason="OK", - headers=headers, - ) - await resp.prepare(request) - - # return early if this is not a GET request - if request.method != "GET": - return resp - - # all checks passed, start streaming! - self.logger.debug( - "Start serving multi-subscriber Queue flow audio stream for queue %s to player %s", - streamjob.queue.display_name, - child_player.display_name, - ) - - async for chunk in get_ffmpeg_stream( - audio_input=streamjob.subscribe(child_player_id), - input_format=streamjob.pcm_format, - output_format=output_format, - filter_params=get_player_filter_params(self.mass, child_player_id), - ): - try: - await resp.write(chunk) - except (BrokenPipeError, ConnectionResetError): - # race condition - break - - return resp - async def serve_command_request(self, request: web.Request) -> web.Response: """Handle special 'command' request for a player.""" self._log_request(request) @@ -805,6 +505,7 @@ class StreamsController(CoreController): queue.display_name, use_crossfade, ) + total_bytes_sent = 0 while True: # get (next) queue item to stream @@ -844,26 +545,22 @@ class StreamsController(CoreController): queue_track.streamdetails, pcm_format=pcm_format, # strip silence from begin/end if track is being crossfaded - strip_silence_begin=use_crossfade and bytes_written > 0, + strip_silence_begin=use_crossfade and total_bytes_sent > 0, strip_silence_end=use_crossfade, ): - # required buffer size is a bit dynamic, - # it needs to be small when the flow stream starts - seconds_streamed = int(bytes_written / pcm_sample_size) - if not use_crossfade or seconds_streamed < 5: - buffer_size = pcm_sample_size - elif seconds_streamed < 10: - buffer_size = pcm_sample_size * 2 - elif use_crossfade and seconds_streamed < 20: - buffer_size = pcm_sample_size * 5 + # buffer size needs to be big enough to include the crossfade part + # allow it to be a bit smaller when playback just starts + if not use_crossfade: + req_buffer_size = pcm_sample_size + elif (total_bytes_sent + bytes_written) < crossfade_size: + req_buffer_size = int(crossfade_size / 2) else: - buffer_size = crossfade_size + pcm_sample_size * 2 - # buffer size needs to be big enough to include the crossfade part + req_buffer_size = crossfade_size # ALWAYS APPEND CHUNK TO BUFFER buffer += chunk del chunk - if len(buffer) < buffer_size: + if len(buffer) < req_buffer_size: # buffer is not full enough, move on continue @@ -892,12 +589,10 @@ class StreamsController(CoreController): buffer = b"" #### OTHER: enough data in buffer, feed to output - while len(buffer) > buffer_size: - subchunk = buffer[:pcm_sample_size] + while len(buffer) > req_buffer_size: + yield buffer[:pcm_sample_size] + bytes_written += pcm_sample_size buffer = buffer[pcm_sample_size:] - bytes_written += len(subchunk) - yield subchunk - del subchunk #### HANDLE END OF TRACK if last_fadeout_part: @@ -909,10 +604,11 @@ class StreamsController(CoreController): # if crossfade is enabled, save fadeout part to pickup for next track last_fadeout_part = buffer[-crossfade_size:] remaining_bytes = buffer[:-crossfade_size] - yield remaining_bytes - bytes_written += len(remaining_bytes) + if remaining_bytes: + yield remaining_bytes + bytes_written += len(remaining_bytes) del remaining_bytes - else: + elif buffer: # no crossfade enabled, just yield the buffer last part bytes_written += len(buffer) yield buffer @@ -926,6 +622,7 @@ class StreamsController(CoreController): queue_track.streamdetails.duration = ( queue_track.streamdetails.seek_position + seconds_streamed ) + total_bytes_sent += bytes_written self.logger.debug( "Finished Streaming queue track: %s (%s) on queue %s", queue_track.streamdetails.uri, @@ -941,7 +638,7 @@ class StreamsController(CoreController): queue_track.streamdetails.seconds_streamed += last_part_seconds queue_track.streamdetails.duration += last_part_seconds del last_fadeout_part - + total_bytes_sent += bytes_written self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name) async def get_announcement_stream( @@ -991,8 +688,9 @@ class StreamsController(CoreController): strip_silence_end = False # pcm_sample_size = chunk size = 1 second of pcm audio pcm_sample_size = pcm_format.pcm_sample_size - buffer_size_begin = pcm_sample_size * 2 if strip_silence_begin else pcm_sample_size * 1 - buffer_size_end = pcm_sample_size * 5 if strip_silence_end else pcm_sample_size * 1 + buffer_size = ( + pcm_sample_size * 5 if (strip_silence_begin or strip_silence_end) else pcm_sample_size + ) # collect all arguments for ffmpeg filter_params = [] @@ -1037,9 +735,13 @@ class StreamsController(CoreController): input_format=streamdetails.audio_format, output_format=pcm_format, filter_params=filter_params, - # we criple ffmpeg a bit on purpose with the filter_threads - # option so it doesn't consume all cpu when calculating loudnorm - extra_input_args=[*extra_input_args, "-filter_threads", "1"], + extra_input_args=[ + *extra_input_args, + # we criple ffmpeg a bit on purpose with the filter_threads + # option so it doesn't consume all cpu when calculating loudnorm + "-filter_threads", + "1", + ], name="ffmpeg_media_stream", stderr_enabled=True, ) as ffmpeg_proc: @@ -1121,11 +823,10 @@ class StreamsController(CoreController): chunk_num = 0 async for chunk in ffmpeg_proc.iter_chunked(pcm_sample_size): chunk_num += 1 - required_buffer = buffer_size_begin if chunk_num < 60 else buffer_size_end buffer += chunk del chunk - if len(buffer) < required_buffer: + if len(buffer) < buffer_size: # buffer is not full enough, move on continue @@ -1144,12 +845,10 @@ class StreamsController(CoreController): continue #### OTHER: enough data in buffer, feed to output - while len(buffer) > required_buffer: - subchunk = buffer[:pcm_sample_size] + while len(buffer) > buffer_size: + yield buffer[:pcm_sample_size] + state_data["bytes_sent"] += pcm_sample_size buffer = buffer[pcm_sample_size:] - state_data["bytes_sent"] += len(subchunk) - yield subchunk - del subchunk # all chunks received, strip silence of last part if needed and yield remaining bytes if strip_silence_end: diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index c07e6095..02397131 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -65,7 +65,7 @@ HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"} class FFMpeg(AsyncProcess): """FFMpeg wrapped as AsyncProcess.""" - def __init__( + def __init__( # noqa: PLR0913 self, audio_input: AsyncGenerator[bytes, None] | str | int, input_format: AudioFormat, @@ -76,6 +76,7 @@ class FFMpeg(AsyncProcess): name: str = "ffmpeg", stderr_enabled: bool = False, audio_output: str | int = "-", + loglevel: str | None = None, ) -> None: """Initialize AsyncProcess.""" ffmpeg_args = get_ffmpeg_args( @@ -86,7 +87,7 @@ class FFMpeg(AsyncProcess): input_path=audio_input if isinstance(audio_input, str) else "-", output_path=audio_output if isinstance(audio_output, str) else "-", extra_input_args=extra_input_args or [], - loglevel="info" + loglevel=loglevel or "info" if stderr_enabled or LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "error", ) @@ -807,7 +808,7 @@ def get_ffmpeg_args( "-nostats", "-ignore_unknown", "-protocol_whitelist", - "file,http,https,tcp,tls,crypto,pipe,data,fd", + "file,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp", ] # collect input args input_args = [] diff --git a/music_assistant/server/helpers/didl_lite.py b/music_assistant/server/helpers/didl_lite.py index 69845357..59b02c65 100644 --- a/music_assistant/server/helpers/didl_lite.py +++ b/music_assistant/server/helpers/didl_lite.py @@ -6,72 +6,47 @@ import datetime from typing import TYPE_CHECKING from music_assistant.common.models.enums import MediaType -from music_assistant.constants import MASS_LOGO_ONLINE, UGP_PREFIX +from music_assistant.constants import MASS_LOGO_ONLINE if TYPE_CHECKING: - from music_assistant.common.models.queue_item import QueueItem - from music_assistant.server import MusicAssistant + from music_assistant.common.models.player import PlayerMedia # ruff: noqa: E501 -def create_didl_metadata(mass: MusicAssistant, url: str, queue_item: QueueItem) -> str: - """Create DIDL metadata string from url and (optional) QueueItem.""" - ext = url.split(".")[-1].split("?")[0] - if "flow" in url or queue_item.queue_id.startswith(UGP_PREFIX): - # flow stream +def create_didl_metadata(media: PlayerMedia) -> str: + """Create DIDL metadata string from url and PlayerMedia.""" + ext = media.uri.split(".")[-1].split("?")[0] + image_url = media.image_url or MASS_LOGO_ONLINE + if media.media_type in (MediaType.FLOW_STREAM, MediaType.RADIO) or not media.duration: + # flow stream, radio or other duration-less stream + title = media.title or media.uri return ( '' f'' - f"Music Assistant" - f"{escape_string(MASS_LOGO_ONLINE)}" - f"{queue_item.queue_id}" - "object.item.audioItem.audioBroadcast" - f"audio/{ext}" - f'{escape_string(url)}' - "" - "" - ) - image_url = ( - mass.metadata.get_image_url(queue_item.image) if queue_item.image else MASS_LOGO_ONLINE - ) - if queue_item.media_type != MediaType.TRACK or not queue_item.duration: - # radio or other non-track item - return ( - '' - f'' - f"{escape_string(queue_item.name)}" + f"{escape_string(title)}" f"{escape_string(image_url)}" - f"{queue_item.queue_item_id}" + f"{media.uri}" "object.item.audioItem.audioBroadcast" f"audio/{ext}" - f'{escape_string(url)}' + f'{escape_string(media.uri)}' "" "" ) - title = escape_string(queue_item.media_item.name) - if queue_item.media_item.artists and queue_item.media_item.artists[0].name: - artist = escape_string(queue_item.media_item.artists[0].name) - else: - artist = "" - if queue_item.media_item.album and queue_item.media_item.album.name: - album = escape_string(queue_item.media_item.album.name) - else: - album = "" - duration_str = str(datetime.timedelta(seconds=queue_item.duration)) + ".000" + duration_str = str(datetime.timedelta(seconds=media.duration or 0)) + ".000" return ( '' '' - f"{title}" - f"{artist}" - f"{album}" - f"{artist}" - f"{int(queue_item.duration)}" - f"{queue_item.queue_item_id}" + f"{escape_string(media.title or media.uri)}" + f"{escape_string(media.artist or '')}" + f"{escape_string(media.album or '')}" + f"{escape_string(media.artist or '')}" + f"{int(media.duration or 0)}" + f"{media.uri}" f"{escape_string(image_url)}" "object.item.audioItem.audioBroadcast" f"audio/{ext}" - f'{escape_string(url)}' + f'{escape_string(media.uri)}' "" "" ) diff --git a/music_assistant/server/helpers/multi_client_stream.py b/music_assistant/server/helpers/multi_client_stream.py new file mode 100644 index 00000000..3dea07ec --- /dev/null +++ b/music_assistant/server/helpers/multi_client_stream.py @@ -0,0 +1,98 @@ +"""Implementation of a simple multi-client stream task/job.""" + +import asyncio +import logging +from collections.abc import AsyncGenerator +from contextlib import suppress + +from music_assistant.common.helpers.util import empty_queue +from music_assistant.common.models.media_items import AudioFormat +from music_assistant.server.helpers.audio import get_ffmpeg_stream + +LOGGER = logging.getLogger(__name__) + + +class MultiClientStream: + """Implementation of a simple multi-client (audio) stream task/job.""" + + def __init__( + self, + audio_source: AsyncGenerator[bytes, None], + audio_format: AudioFormat, + expected_clients: int = 0, + ) -> None: + """Initialize MultiClientStream.""" + self.audio_source = audio_source + self.audio_format = audio_format + self.subscribers: list[asyncio.Queue] = [] + self.expected_clients = expected_clients + self.task = asyncio.create_task(self._runner()) + + @property + def done(self) -> bool: + """Return if this stream is already done.""" + return self.task.done() + + async def stop(self) -> None: + """Stop/cancel the stream.""" + if self.done: + return + self.task.cancel() + with suppress(asyncio.CancelledError): + await self.task + for sub_queue in list(self.subscribers): + empty_queue(sub_queue) + + async def get_stream( + self, output_format: AudioFormat, filter_params: list[str] | None = None + ) -> AsyncGenerator[bytes, None]: + """Get (client specific encoded) ffmpeg stream.""" + async for chunk in get_ffmpeg_stream( + audio_input=self.subscribe_raw(), + input_format=self.audio_format, + output_format=output_format, + filter_params=filter_params, + ): + yield chunk + + async def subscribe_raw(self) -> AsyncGenerator[bytes, None]: + """Subscribe to the raw/unaltered audio stream.""" + try: + queue = asyncio.Queue(1) + self.subscribers.append(queue) + while True: + chunk = await queue.get() + if chunk == b"": + break + yield chunk + finally: + with suppress(ValueError): + self.subscribers.remove(queue) + + async def _runner(self) -> None: + """Run the stream for the given audio source.""" + expected_clients = self.expected_clients or 1 + # wait for first/all subscriber + count = 0 + while count < 50: + await asyncio.sleep(0.5) + count += 1 + if len(self.subscribers) >= expected_clients: + break + if count == 50: + return + LOGGER.debug( + "Starting multi-client stream with %s/%s clients", + len(self.subscribers), + self.expected_clients, + ) + async for chunk in self.audio_source: + if len(self.subscribers) == 0: + return + async with asyncio.TaskGroup() as tg: + for sub in list(self.subscribers): + tg.create_task(sub.put(chunk)) + # EOF: send empty chunk + async with asyncio.TaskGroup() as tg: + for sub in list(self.subscribers): + tg.create_task(sub.put(b"")) diff --git a/music_assistant/server/helpers/webserver.py b/music_assistant/server/helpers/webserver.py index 29dcc235..9ea282bd 100644 --- a/music_assistant/server/helpers/webserver.py +++ b/music_assistant/server/helpers/webserver.py @@ -53,7 +53,7 @@ class Webserver: }, ) self.logger.info("Starting server on %s:%s - base url: %s", bind_ip, bind_port, base_url) - self._apprunner = web.AppRunner(self._webapp, access_log=None) + self._apprunner = web.AppRunner(self._webapp, access_log=None, shutdown_timeout=10) # add static routes if self._static_routes: for method, path, handler in self._static_routes: @@ -68,9 +68,7 @@ class Webserver: await self._apprunner.setup() # set host to None to bind to all addresses on both IPv4 and IPv6 host = None if bind_ip == "0.0.0.0" else bind_ip - self._tcp_site = web.TCPSite( - self._apprunner, host=host, port=bind_port, shutdown_timeout=10 - ) + self._tcp_site = web.TCPSite(self._apprunner, host=host, port=bind_port) await self._tcp_site.start() async def close(self) -> None: diff --git a/music_assistant/server/models/player_provider.py b/music_assistant/server/models/player_provider.py index d7f3db8a..c85d0e35 100644 --- a/music_assistant/server/models/player_provider.py +++ b/music_assistant/server/models/player_provider.py @@ -3,7 +3,9 @@ from __future__ import annotations from abc import abstractmethod -from typing import TYPE_CHECKING +from collections.abc import Iterable + +import shortuuid from music_assistant.common.models.config_entries import ( CONF_ENTRY_ANNOUNCE_VOLUME, @@ -21,21 +23,18 @@ from music_assistant.common.models.config_entries import ( ConfigValueOption, PlayerConfig, ) -from music_assistant.common.models.enums import ConfigEntryType -from music_assistant.constants import ( - CONF_GROUP_MEMBERS, - CONF_GROUP_PLAYERS, - SYNCGROUP_PREFIX, - UGP_PREFIX, +from music_assistant.common.models.enums import ( + ConfigEntryType, + PlayerFeature, + PlayerState, + PlayerType, + ProviderFeature, ) +from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia +from music_assistant.constants import CONF_GROUP_MEMBERS, CONF_GROUP_PLAYERS, SYNCGROUP_PREFIX from .provider import Provider -if TYPE_CHECKING: - from music_assistant.common.models.player import Player - from music_assistant.common.models.queue_item import QueueItem - - # ruff: noqa: ARG001, ARG002 @@ -75,7 +74,7 @@ class PlayerProvider(Provider): ), CONF_ENTRY_PLAYER_ICON_GROUP, ) - if not player_id.startswith((SYNCGROUP_PREFIX, UGP_PREFIX)): + if not player_id.startswith(SYNCGROUP_PREFIX): # add default entries for announce feature entries = ( *entries, @@ -130,21 +129,21 @@ class PlayerProvider(Provider): async def play_media( self, player_id: str, - queue_item: QueueItem, + media: PlayerMedia, ) -> None: """Handle PLAY MEDIA on given player. - This is called by the Queue controller to start playing a queue item on the given player. + This is called by the Players controller to start playing a mediaitem on the given player. The provider's own implementation should work out how to handle this request. - player_id: player_id of the player to handle the command. - - queue_item: The QueueItem that needs to be played on the player. + - media: Details of the item that needs to be played on the player. """ raise NotImplementedError - async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: + async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None: """ - Handle enqueuing of the next queue item on the player. + Handle enqueuing of the next (queue) item on the player. Only called if the player supports PlayerFeature.ENQUE_NEXT. Called about 1 second after a new track started playing. @@ -158,7 +157,7 @@ class PlayerProvider(Provider): """ async def play_announcement( - self, player_id: str, announcement: QueueItem, volume_level: int | None = None + self, player_id: str, announcement: PlayerMedia, volume_level: int | None = None ) -> None: """Handle (provider native) playback of an announcement on given player.""" # will only be called for players with PLAY_ANNOUNCEMENT feature set. @@ -229,8 +228,28 @@ class PlayerProvider(Provider): - name: Name for the new group to create. - members: A list of player_id's that should be part of this group. """ - # will only be called for players with PLAYER_GROUP_CREATE feature set. - raise NotImplementedError + # should only be called for providers with PLAYER_GROUP_CREATE feature set. + if ProviderFeature.PLAYER_GROUP_CREATE not in self.supported_features: + raise NotImplementedError + # default implementation: create syncgroup + new_group_id = f"{SYNCGROUP_PREFIX}{shortuuid.random(8).lower()}" + # cleanup list, filter groups (should be handled by frontend, but just in case) + members = [ + x.player_id + for x in self.players + if x.player_id in members + if not x.player_id.startswith(SYNCGROUP_PREFIX) + if x.provider == self.instance_id and PlayerFeature.SYNC in x.supported_features + ] + # create default config with the user chosen name + self.mass.config.create_default_player_config( + new_group_id, + self.instance_id, + name=name, + enabled=True, + values={CONF_GROUP_MEMBERS: members}, + ) + return self.register_syncgroup(group_player_id=new_group_id, name=name, members=members) async def poll_player(self, player_id: str) -> None: """Poll player for state updates. @@ -250,10 +269,88 @@ class PlayerProvider(Provider): def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None: """ - Call when a power command was executed on one of the child player of a Player/Sync group. + Call when a power command was executed on one of the child players of a Sync group. This is used to handle special actions such as (re)syncing. """ + group_player = self.mass.players.get(player_id) + child_player = self.mass.players.get(child_player_id) + + if not group_player.powered: + # guard, this should be caught in the player controller but just in case... + return + + powered_childs = list(self.mass.players.iter_group_members(group_player, True)) + if not new_power and child_player in powered_childs: + powered_childs.remove(child_player) + if new_power and child_player not in powered_childs: + powered_childs.append(child_player) + + # if the last player of a group turned off, turn off the group + if len(powered_childs) == 0: + self.logger.debug( + "Group %s has no more powered members, turning off group player", + group_player.display_name, + ) + self.mass.create_task(self.mass.players.cmd_power(player_id, False)) + return + + # the below actions are only suitable for syncgroups + if ProviderFeature.SYNC_PLAYERS not in self.supported_features: + return + + group_playing = group_player.state == PlayerState.PLAYING + is_sync_leader = ( + len(child_player.group_childs) > 0 + and child_player.active_source == group_player.player_id + ) + if group_playing and not new_power and is_sync_leader: + # the current sync leader player turned OFF while the group player + # should still be playing - we need to select a new sync leader and resume + self.logger.warning( + "Syncleader %s turned off while syncgroup is playing, " + "a forced resume for syngroup %s will be attempted in 5 seconds...", + child_player.display_name, + group_player.display_name, + ) + + async def full_resync() -> None: + await self.mass.players.sync_syncgroup(group_player.player_id) + await self.mass.player_queues.resume(group_player.player_id) + + self.mass.call_later(5, full_resync, task_id=f"forced_resync_{player_id}") + return + elif new_power: + # if a child player turned ON while the group is already active, we need to resync + sync_leader = self.mass.players.get_sync_leader(group_player) + if sync_leader.player_id != child_player_id: + self.mass.create_task( + self.cmd_sync(child_player_id, sync_leader.player_id), + ) + + def register_syncgroup(self, group_player_id: str, name: str, members: Iterable[str]) -> Player: + """Register a (virtual/fake) syncgroup player.""" + # extract player features from first/random player + for member in members: + if first_player := self.mass.players.get(member): + break + else: + # edge case: no child player is (yet) available; postpone register + return None + player = Player( + player_id=group_player_id, + provider=self.instance_id, + type=PlayerType.SYNC_GROUP, + name=name, + available=True, + powered=False, + device_info=DeviceInfo(model="SyncGroup", manufacturer=self.name), + supported_features=first_player.supported_features, + group_childs=set(members), + active_source=group_player_id, + ) + self.mass.players.register_or_update(player) + return player # DO NOT OVERRIDE BELOW diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 5c5fe3a3..96cc8c3d 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -40,19 +40,23 @@ from music_assistant.common.models.enums import ( ProviderFeature, ) from music_assistant.common.models.media_items import AudioFormat -from music_assistant.common.models.player import DeviceInfo, Player +from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia from music_assistant.common.models.player_queue import PlayerQueue -from music_assistant.constants import CONF_SYNC_ADJUST, UGP_PREFIX, VERBOSE_LOG_LEVEL -from music_assistant.server.helpers.audio import get_ffmpeg_args, get_player_filter_params +from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL +from music_assistant.server.helpers.audio import ( + get_ffmpeg_args, + get_ffmpeg_stream, + get_player_filter_params, +) from music_assistant.server.helpers.process import AsyncProcess, check_output from music_assistant.server.models.player_provider import PlayerProvider if TYPE_CHECKING: from music_assistant.common.models.config_entries import ProviderConfig from music_assistant.common.models.provider import ProviderManifest - from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType + from music_assistant.server.providers.ugp import UniversalGroupProvider DOMAIN = "airplay" @@ -463,7 +467,7 @@ class AirplayProvider(PlayerProvider): @property def supported_features(self) -> tuple[ProviderFeature, ...]: """Return the features supported by this Provider.""" - return (ProviderFeature.SYNC_PLAYERS,) + return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE) async def handle_async_init(self) -> None: """Handle async initialization of the provider.""" @@ -491,7 +495,6 @@ class AirplayProvider(PlayerProvider): server=f"{socket.gethostname()}.local", ) await self.mass.aiozc.async_register_service(self._dacp_info) - self._resync_handle: asyncio.TimerHandle | None = None async def on_mdns_service_state_change( self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None @@ -590,44 +593,51 @@ class AirplayProvider(PlayerProvider): async def play_media( self, player_id: str, - queue_item: QueueItem, + media: PlayerMedia, ) -> None: """Handle PLAY MEDIA on given player.""" player = self.mass.players.get(player_id) if player.synced_to: # should not happen, but just in case raise RuntimeError("Player is synced") - # fix race condition where resync and play media are called at more or less the same time - if self._resync_handle: - self._resync_handle.cancel() - self._resync_handle = None # always stop existing stream first - wait_stopped = not queue_item.streamdetails or queue_item.streamdetails.seek_position == 0 async with asyncio.TaskGroup() as tg: for airplay_player in self._get_sync_clients(player_id): if airplay_player.active_stream and airplay_player.active_stream.running: - tg.create_task(airplay_player.active_stream.stop(wait=wait_stopped)) + tg.create_task(airplay_player.active_stream.stop(wait=False)) # select audio source - if queue_item.media_type == MediaType.ANNOUNCEMENT: + if media.media_type == MediaType.ANNOUNCEMENT: # special case: stream announcement input_format = AIRPLAY_PCM_FORMAT audio_source = self.mass.streams.get_announcement_stream( - queue_item.streamdetails.data["url"], + media.custom_data["url"], output_format=AIRPLAY_PCM_FORMAT, - use_pre_announce=queue_item.streamdetails.data["use_pre_announce"], + use_pre_announce=media.custom_data["use_pre_announce"], ) - elif queue_item.queue_id.startswith(UGP_PREFIX): - # special case: we got forwarded a request from the UGP - # use the existing stream job that was already created by UGP - stream_job = self.mass.streams.multi_client_jobs[queue_item.queue_id] - stream_job.expected_players.add(player_id) - input_format = stream_job.pcm_format - audio_source = stream_job.subscribe(player_id) - else: - queue = self.mass.player_queues.get(queue_item.queue_id) + elif media.queue_id.startswith("ugp_"): + # special case: UGP stream + ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp") + ugp_stream = ugp_provider.streams[media.queue_id] + input_format = ugp_stream.audio_format + audio_source = ugp_stream.subscribe_raw() + elif media.queue_id and media.queue_item_id: + # regular queue stream request input_format = AIRPLAY_PCM_FORMAT audio_source = self.mass.streams.get_flow_stream( - queue, start_queue_item=queue_item, pcm_format=AIRPLAY_PCM_FORMAT + queue=self.mass.player_queues.get(media.queue_id), + start_queue_item=self.mass.player_queues.get_item( + media.queue_id, media.queue_item_id + ), + pcm_format=AIRPLAY_PCM_FORMAT, + ) + else: + # assume url or some other direct path + # NOTE: this will fail if its an uri not playable by ffmpeg + input_format = AIRPLAY_PCM_FORMAT + audio_source = get_ffmpeg_stream( + audio_input=media.uri, + input_format=AudioFormat(ContentType.try_parse(media.uri)), + output_format=AIRPLAY_PCM_FORMAT, ) self.mass.create_task(self._handle_stream_audio, player_id, audio_source, input_format) @@ -714,17 +724,13 @@ class AirplayProvider(PlayerProvider): active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id) if active_queue.state == PlayerState.PLAYING: # playback needs to be restarted to form a new multi client stream session - def resync() -> None: - self._resync_handle = None - self.mass.create_task( - self.mass.player_queues.resume(active_queue.queue_id, fade_in=False) - ) - # this could potentially be called by multiple players at the exact same time # so we debounce the resync a bit here with a timer - if self._resync_handle: - self._resync_handle.cancel() - self._resync_handle = self.mass.loop.call_later(0.5, resync) + self.mass.call_later( + 1, + self.mass.player_queues.resume(active_queue.queue_id, fade_in=False), + task_id=f"resume_{active_queue.queue_id}", + ) else: # make sure that the player manager gets an update self.mass.players.update(child_player.player_id, skip_forward=True) diff --git a/music_assistant/server/providers/chromecast/__init__.py b/music_assistant/server/providers/chromecast/__init__.py index 361733a4..7bb06958 100644 --- a/music_assistant/server/providers/chromecast/__init__.py +++ b/music_assistant/server/providers/chromecast/__init__.py @@ -8,7 +8,6 @@ import logging import threading import time from dataclasses import dataclass -from logging import Logger from typing import TYPE_CHECKING, Any from uuid import UUID @@ -26,14 +25,13 @@ from music_assistant.common.models.config_entries import ( ) from music_assistant.common.models.enums import ( ConfigEntryType, - ContentType, MediaType, PlayerFeature, PlayerState, PlayerType, ) from music_assistant.common.models.errors import PlayerUnavailableError -from music_assistant.common.models.player import DeviceInfo, Player +from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia from music_assistant.constants import ( CONF_CROSSFADE, CONF_FLOW_MODE, @@ -52,7 +50,6 @@ if TYPE_CHECKING: from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig from music_assistant.common.models.provider import ProviderManifest - from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType @@ -125,7 +122,6 @@ class CastPlayer: cast_info: ChromecastInfo cc: pychromecast.Chromecast player: Player - logger: Logger status_listener: CastStatusListener | None = None mz_controller: MultizoneController | None = None active_group: str | None = None @@ -237,41 +233,25 @@ class ChromecastProvider(PlayerProvider): async def play_media( self, player_id: str, - queue_item: QueueItem, + media: PlayerMedia, ) -> None: """Handle PLAY MEDIA on given player.""" castplayer = self.castplayers[player_id] - use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE) - url = self.mass.streams.resolve_stream_url( - player_id, - queue_item=queue_item, - output_codec=ContentType.FLAC, - flow_mode=use_flow_mode, - ) + is_flow_mode = "/flow/" in media.uri queuedata = { "type": "LOAD", - "media": self._create_cc_media_item(queue_item, url), + "media": self._create_cc_media_item(media), } - # make sure that the media controller app is launched - app_id = ALT_APP_ID if use_flow_mode else DEFAULT_APP_ID + app_id = ALT_APP_ID if is_flow_mode else DEFAULT_APP_ID await self._launch_app(castplayer, app_id) # send queue info to the CC media_controller = castplayer.cc.media_controller await asyncio.to_thread(media_controller.send_message, data=queuedata, inc_session_id=True) - async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: - """Handle enqueuing of the next queue item on the player.""" + async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None: + """Handle enqueuing of the next item on the player.""" castplayer = self.castplayers[player_id] - if isinstance(queue_item, str): - url = self.mass.streams.get_command_url(queue_item, "next") - queue_item = None - else: - url = self.mass.streams.resolve_stream_url( - player_id, - queue_item=queue_item, - output_codec=ContentType.FLAC, - ) next_item_id = None status = castplayer.cc.media_controller.status # lookup position of current track in cast queue @@ -286,7 +266,7 @@ class ChromecastProvider(PlayerProvider): continue next_item_id = item["itemId"] # check if the next queue item isn't already queued - if item.get("media", {}).get("customData", {}).get("uri") == url: + if item.get("media", {}).get("customData", {}).get("uri") == media.uri: return queuedata = { "type": "QUEUE_INSERT", @@ -296,7 +276,7 @@ class ChromecastProvider(PlayerProvider): "autoplay": True, "startTime": 0, "preloadTime": 0, - "media": self._create_cc_media_item(queue_item, url), + "media": self._create_cc_media_item(media), } ], } @@ -305,7 +285,7 @@ class ChromecastProvider(PlayerProvider): self.mass.create_task(media_controller.send_message, data=queuedata, inc_session_id=True) self.logger.debug( "Enqued next track (%s) to player %s", - queue_item.name if queue_item else url, + media.title or media.uri, castplayer.player.display_name, ) @@ -424,7 +404,6 @@ class ChromecastProvider(PlayerProvider): supports_24bit=not cast_info.is_audio_group, enabled_by_default=enabled_by_default, ), - logger=self.logger.getChild(cast_info.friendly_name), ) self.castplayers[player_id] = castplayer @@ -452,8 +431,10 @@ class ChromecastProvider(PlayerProvider): """Handle updated CastStatus.""" if status is None: return # guard - castplayer.logger.debug( - "Received cast status - app_id: %s - volume: %s", + self.logger.log( + VERBOSE_LOG_LEVEL, + "Received cast status for %s - app_id: %s - volume: %s", + castplayer.player.display_name, status.app_id, status.volume_level, ) @@ -485,7 +466,12 @@ class ChromecastProvider(PlayerProvider): def on_new_media_status(self, castplayer: CastPlayer, status: MediaStatus) -> None: """Handle updated MediaStatus.""" - castplayer.logger.debug("Received media status update: %s", status.player_state) + self.logger.log( + VERBOSE_LOG_LEVEL, + "Received media status for %s update: %s", + castplayer.player.display_name, + status.player_state, + ) # player state castplayer.player.elapsed_time_last_updated = time.time() if status.player_is_playing: @@ -521,7 +507,12 @@ class ChromecastProvider(PlayerProvider): def on_new_connection_status(self, castplayer: CastPlayer, status: ConnectionStatus) -> None: """Handle updated ConnectionStatus.""" - castplayer.logger.debug("Received connection status update - status: %s", status.status) + self.logger.log( + VERBOSE_LOG_LEVEL, + "Received connection status update for %s - status: %s", + castplayer.player.display_name, + status.status, + ) if status.status == CONNECTION_STATUS_DISCONNECTED: castplayer.player.available = False @@ -565,7 +556,7 @@ class ChromecastProvider(PlayerProvider): # Quit the previous app before starting splash screen or media player if castplayer.cc.app_id is not None: castplayer.cc.quit_app() - castplayer.logger.debug("Launching App %s.", app_id) + self.logger.debug("Launching App %s.", app_id) castplayer.cc.socket_client.receiver_controller.launch_app( app_id, force_launch=True, @@ -577,59 +568,38 @@ class ChromecastProvider(PlayerProvider): async def _disconnect_chromecast(self, castplayer: CastPlayer) -> None: """Disconnect Chromecast object if it is set.""" - castplayer.logger.debug("Disconnecting from chromecast socket") + self.logger.debug("Disconnecting from chromecast socket %s", castplayer.player.display_name) await self.mass.loop.run_in_executor(None, castplayer.cc.disconnect, 10) castplayer.mz_controller = None castplayer.status_listener.invalidate() castplayer.status_listener = None self.castplayers.pop(castplayer.player_id, None) - def _create_cc_media_item(self, queue_item: QueueItem, stream_url: str) -> dict[str, Any]: - """Create CC media item from MA QueueItem.""" - duration = int(queue_item.duration) if queue_item.duration else None - image_url = self.mass.metadata.get_image_url(queue_item.image) if queue_item.image else "" - if queue_item.media_type == MediaType.TRACK and queue_item.media_item: + def _create_cc_media_item(self, media: PlayerMedia) -> dict[str, Any]: + """Create CC media item from MA PlayerMedia.""" + if media.media_type == MediaType.TRACK: stream_type = STREAM_TYPE_BUFFERED - metadata = { - "metadataType": 3, - "albumName": ( - queue_item.media_item.album.name if queue_item.media_item.album else "" - ), - "songName": queue_item.media_item.name, - "artist": ( - queue_item.media_item.artists[0].name if queue_item.media_item.artists else "" - ), - "title": queue_item.media_item.name, - "images": [{"url": image_url}] if image_url else None, - } - elif queue_item.streamdetails and queue_item.streamdetails.stream_title: - stream_type = STREAM_TYPE_LIVE - metadata = { - "metadataType": 3, - "songName": queue_item.streamdetails.stream_title.split(" - ")[-1], - "artist": queue_item.streamdetails.stream_title.split(" - ")[0], - "albumName": queue_item.name, - "images": [{"url": image_url}] if image_url else None, - "title": queue_item.streamdetails.stream_title.split(" - ")[-1], - } else: stream_type = STREAM_TYPE_LIVE - metadata = { - "metadataType": 0, - "title": queue_item.name, - "images": [{"url": image_url}] if image_url else None, - } + metadata = { + "metadataType": 3, + "albumName": media.album or "", + "songName": media.title or "", + "artist": media.title or "", + "title": media.title or "", + "images": [{"url": media.image_url}] if media.image_url else None, + } return { - "contentId": stream_url, + "contentId": media.uri, "customData": { - "uri": queue_item.uri, - "queue_item_id": queue_item.queue_item_id, + "uri": media.uri, + "queue_item_id": media.uri, "deviceName": "Music Assistant", }, "contentType": "audio/flac", "streamType": stream_type, "metadata": metadata, - "duration": duration, + "duration": media.duration, } async def update_flow_metadata(self, castplayer: CastPlayer) -> None: @@ -648,12 +618,21 @@ class ChromecastProvider(PlayerProvider): media_controller = castplayer.cc.media_controller # update metadata of current item chromecast if media_controller.status.media_custom_data["queue_item_id"] != current_item.queue_item_id: - cc_item = self._create_cc_media_item(current_item, "") + image_url = self.mass.metadata.get_image_url(current_item.image) queuedata = { "type": "PLAY", "mediaSessionId": media_controller.status.media_session_id, "customData": { - "metadata": cc_item["metadata"], + "metadata": { + "metadataType": 3, + "albumName": album.name + if (album := getattr(current_item.media_item, "album", None)) + else "", + "songName": current_item.media_item.name, + "artist": getattr(current_item.media_item, "artist_str", ""), + "title": current_item.media_item.name, + "images": [{"url": image_url}] if image_url else None, + } }, } self.mass.create_task( diff --git a/music_assistant/server/providers/dlna/__init__.py b/music_assistant/server/providers/dlna/__init__.py index 96bdb8d9..592d42e2 100644 --- a/music_assistant/server/providers/dlna/__init__.py +++ b/music_assistant/server/providers/dlna/__init__.py @@ -32,13 +32,12 @@ from music_assistant.common.models.config_entries import ( ) from music_assistant.common.models.enums import ( ConfigEntryType, - ContentType, PlayerFeature, PlayerState, PlayerType, ) from music_assistant.common.models.errors import PlayerUnavailableError -from music_assistant.common.models.player import DeviceInfo, Player +from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia from music_assistant.constants import ( CONF_CROSSFADE, CONF_ENFORCE_MP3, @@ -59,7 +58,6 @@ if TYPE_CHECKING: from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig from music_assistant.common.models.provider import ProviderManifest - from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType @@ -342,27 +340,17 @@ class DLNAPlayerProvider(PlayerProvider): await dlna_player.device.async_play() @catch_request_errors - async def play_media( - self, - player_id: str, - queue_item: QueueItem, - ) -> None: + async def play_media(self, player_id: str, media: PlayerMedia) -> None: """Handle PLAY MEDIA on given player.""" - use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE) - enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) - url = self.mass.streams.resolve_stream_url( - player_id, - queue_item=queue_item, - output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, - flow_mode=use_flow_mode, - ) + if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, False): + media.uri = media.uri.replace(".flac", ".mp3") dlna_player = self.dlnaplayers[player_id] # always clear queue (by sending stop) first if dlna_player.device.can_stop: await self.cmd_stop(player_id) - didl_metadata = create_didl_metadata(self.mass, url, queue_item) - title = queue_item.name if queue_item else "Music Assistant" - await dlna_player.device.async_set_transport_uri(url, title, didl_metadata) + didl_metadata = create_didl_metadata(media) + title = media.title or media.uri + await dlna_player.device.async_set_transport_uri(media.uri, title, didl_metadata) # Play it await dlna_player.device.async_wait_for_can_play(10) # optimistically set this timestamp to help in case of a player @@ -378,17 +366,14 @@ class DLNAPlayerProvider(PlayerProvider): await self.poll_player(dlna_player.udn) @catch_request_errors - async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: + async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None: """Handle enqueuing of the next queue item on the player.""" dlna_player = self.dlnaplayers[player_id] - url = self.mass.streams.resolve_stream_url( - player_id, - queue_item=queue_item, - output_codec=ContentType.FLAC, - ) - didl_metadata = create_didl_metadata(self.mass, url, queue_item) - title = queue_item.name - await dlna_player.device.async_set_next_transport_uri(url, title, didl_metadata) + if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, False): + media.uri = media.uri.replace(".flac", ".mp3") + didl_metadata = create_didl_metadata(media) + title = media.title or media.uri + await dlna_player.device.async_set_next_transport_uri(media.uri, title, didl_metadata) self.logger.debug( "Enqued next track (%s) to player %s", title, diff --git a/music_assistant/server/providers/fully_kiosk/__init__.py b/music_assistant/server/providers/fully_kiosk/__init__.py index 54bffd91..bb505641 100644 --- a/music_assistant/server/providers/fully_kiosk/__init__.py +++ b/music_assistant/server/providers/fully_kiosk/__init__.py @@ -16,20 +16,18 @@ from music_assistant.common.models.config_entries import ( ) from music_assistant.common.models.enums import ( ConfigEntryType, - ContentType, PlayerFeature, PlayerState, PlayerType, ) from music_assistant.common.models.errors import PlayerUnavailableError, SetupFailedError -from music_assistant.common.models.player import DeviceInfo, Player +from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia from music_assistant.constants import CONF_IP_ADDRESS, CONF_PASSWORD, CONF_PORT from music_assistant.server.models.player_provider import PlayerProvider if TYPE_CHECKING: from music_assistant.common.models.config_entries import ProviderConfig from music_assistant.common.models.provider import ProviderManifest - from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType @@ -181,19 +179,14 @@ class FullyKioskProvider(PlayerProvider): async def play_media( self, player_id: str, - queue_item: QueueItem, + media: PlayerMedia, ) -> None: """Handle PLAY MEDIA on given player.""" player = self.mass.players.get(player_id) - enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) - url = self.mass.streams.resolve_stream_url( - player_id, - queue_item=queue_item, - output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, - flow_mode=True, - ) - await self._fully.playSound(url, AUDIOMANAGER_STREAM_MUSIC) - player.current_item_id = queue_item.queue_id + if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, False): + media.uri = media.uri.replace(".flac", ".mp3") + await self._fully.playSound(media.uri, AUDIOMANAGER_STREAM_MUSIC) + player.current_media = media player.elapsed_time = 0 player.elapsed_time_last_updated = time.time() player.state = PlayerState.PLAYING diff --git a/music_assistant/server/providers/hass_players/__init__.py b/music_assistant/server/providers/hass_players/__init__.py index f8ef8e21..89ad8488 100644 --- a/music_assistant/server/providers/hass_players/__init__.py +++ b/music_assistant/server/providers/hass_players/__init__.py @@ -21,13 +21,12 @@ from music_assistant.common.models.config_entries import ( ) from music_assistant.common.models.enums import ( ConfigEntryType, - ContentType, PlayerFeature, PlayerState, PlayerType, ) from music_assistant.common.models.errors import SetupFailedError -from music_assistant.common.models.player import DeviceInfo, Player +from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia from music_assistant.constants import CONF_CROSSFADE, CONF_FLOW_MODE from music_assistant.server.models.player_provider import PlayerProvider from music_assistant.server.providers.hass import DOMAIN as HASS_DOMAIN @@ -42,7 +41,6 @@ if TYPE_CHECKING: from music_assistant.common.models.config_entries import ProviderConfig from music_assistant.common.models.provider import ProviderManifest - from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType from music_assistant.server.providers.hass import HomeAssistant as HomeAssistantProvider @@ -245,25 +243,15 @@ class HomeAssistantPlayers(PlayerProvider): target={"entity_id": player_id}, ) - async def play_media( - self, - player_id: str, - queue_item: QueueItem, - ) -> None: + async def play_media(self, player_id: str, media: PlayerMedia) -> None: """Handle PLAY MEDIA on given player.""" - use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE) - enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) - url = self.mass.streams.resolve_stream_url( - player_id, - queue_item=queue_item, - output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, - flow_mode=use_flow_mode, - ) + if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, False): + media.uri = media.uri.replace(".flac", ".mp3") await self.hass_prov.hass.call_service( domain="media_player", service="play_media", service_data={ - "media_content_id": url, + "media_content_id": media.uri, "media_content_type": "music", "enqueue": "replace", }, @@ -274,30 +262,15 @@ class HomeAssistantPlayers(PlayerProvider): player.elapsed_time = 0 player.elapsed_time_last_updated = time.time() - async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: - """ - Handle enqueuing of the next queue item on the player. - - Only called if the player supports PlayerFeature.ENQUE_NEXT. - Called about 1 second after a new track started playing. - Called about 15 seconds before the end of the current track. - - A PlayerProvider implementation is in itself responsible for handling this - so that the queue items keep playing until its empty or the player stopped. - - This will NOT be called if the end of the queue is reached (and repeat disabled). - This will NOT be called if the player is using flow mode to playback the queue. - """ - url = self.mass.streams.resolve_stream_url( - player_id, - queue_item=queue_item, - output_codec=ContentType.FLAC, - ) + async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None: + """Handle enqueuing of the next queue item on the player.""" + if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, False): + media.uri = media.uri.replace(".flac", ".mp3") await self.hass_prov.hass.call_service( domain="media_player", service="play_media", service_data={ - "media_content_id": url, + "media_content_id": media.uri, "media_content_type": "music", "enqueue": "next", }, diff --git a/music_assistant/server/providers/slimproto/__init__.py b/music_assistant/server/providers/slimproto/__init__.py index 36dcd28c..c4d3dacf 100644 --- a/music_assistant/server/providers/slimproto/__init__.py +++ b/music_assistant/server/providers/slimproto/__init__.py @@ -12,6 +12,7 @@ from contextlib import suppress from dataclasses import dataclass from typing import TYPE_CHECKING +from aiohttp import web from aioslimproto.client import PlayerState as SlimPlayerState from aioslimproto.client import SlimClient from aioslimproto.client import TransitionType as SlimTransition @@ -45,24 +46,26 @@ from music_assistant.common.models.enums import ( RepeatMode, ) from music_assistant.common.models.errors import MusicAssistantError, SetupFailedError -from music_assistant.common.models.player import DeviceInfo, Player +from music_assistant.common.models.media_items import AudioFormat +from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia from music_assistant.constants import ( CONF_CROSSFADE, CONF_CROSSFADE_DURATION, CONF_ENFORCE_MP3, CONF_PORT, CONF_SYNC_ADJUST, - MASS_LOGO_ONLINE, VERBOSE_LOG_LEVEL, ) +from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params +from music_assistant.server.helpers.multi_client_stream import MultiClientStream from music_assistant.server.models.player_provider import PlayerProvider +from music_assistant.server.providers.ugp import UniversalGroupProvider if TYPE_CHECKING: from aioslimproto.models import SlimEvent from music_assistant.common.models.config_entries import ProviderConfig from music_assistant.common.models.provider import ProviderManifest - from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType @@ -82,7 +85,7 @@ REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2} # sync constants MIN_DEVIATION_ADJUST = 8 # 5 milliseconds MIN_REQ_PLAYPOINTS = 8 # we need at least 8 measurements -DEVIATION_JUMP_IGNORE = 5000 # ignore a sudden unrealistic jump +DEVIATION_JUMP_IGNORE = 500 # ignore a sudden unrealistic jump MAX_SKIP_AHEAD_MS = 800 # 0.8 seconds @@ -91,7 +94,7 @@ class SyncPlayPoint: """Simple structure to describe a Sync Playpoint.""" timestamp: float - sync_job_id: str + sync_master: str diff: int @@ -218,17 +221,18 @@ class SlimprotoProvider(PlayerProvider): slimproto: SlimServer _sync_playpoints: dict[str, deque[SyncPlayPoint]] _do_not_resync_before: dict[str, float] + _multi_streams: dict[str, MultiClientStream] @property def supported_features(self) -> tuple[ProviderFeature, ...]: """Return the features supported by this Provider.""" - return (ProviderFeature.SYNC_PLAYERS,) + return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE) async def handle_async_init(self) -> None: """Handle async initialization of the provider.""" self._sync_playpoints = {} self._do_not_resync_before = {} - self._resync_handle: asyncio.TimerHandle | None = None + self._multi_streams = {} control_port = self.config.get_value(CONF_PORT) telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT) json_port = self.config.get_value(CONF_CLI_JSON_PORT) @@ -248,15 +252,21 @@ class SlimprotoProvider(PlayerProvider): try: await self.slimproto.start() except OSError as err: - msg = f"Unable to start the Slimproto server - is port {control_port} already taken ?" - raise SetupFailedError(msg) from err + raise SetupFailedError( + "Unable to start the Slimproto server - " + "is one of the required TCP ports already taken ?" + ) from err async def loaded_in_mass(self) -> None: """Call after the provider has been loaded.""" self.slimproto.subscribe(self._client_callback) + self.mass.streams.register_dynamic_route( + "/slimproto/multi", self._serve_multi_client_stream + ) async def unload(self) -> None: """Handle close/cleanup of the provider.""" + self.mass.streams.unregister_dynamic_route("/slimproto/multi") await self.slimproto.stop() async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: @@ -330,67 +340,98 @@ class SlimprotoProvider(PlayerProvider): async def play_media( self, player_id: str, - queue_item: QueueItem, + media: PlayerMedia, ) -> None: """Handle PLAY MEDIA on given player.""" - # fix race condition where resync and play media are called at more or less the same time - if self._resync_handle: - self._resync_handle.cancel() - self._resync_handle = None player = self.mass.players.get(player_id) if player.synced_to: msg = "A synced player cannot receive play commands directly" raise RuntimeError(msg) - if player.group_childs and queue_item.media_type != MediaType.ANNOUNCEMENT: - # player has sync members, we need to start a (multi-player) stream job - # to make sure that all clients receive the exact same audio - stream_job = self.mass.streams.create_multi_client_stream_job( - queue_id=queue_item.queue_id, - start_queue_item=queue_item, + if not player.group_childs: + slimplayer = self.slimproto.get_player(player_id) + # simple, single-player playback + await self._handle_play_url( + slimplayer, + url=media.uri, + media=media, + send_flush=True, + auto_play=False, + ) + return + + # this is a syncgroup, we need to handle this with a multi client stream + master_audio_format = AudioFormat( + content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24 + ) + if media.media_type == MediaType.ANNOUNCEMENT: + # special case: stream announcement + audio_source = self.mass.streams.get_announcement_stream( + media.custom_data["url"], + output_format=master_audio_format, + use_pre_announce=media.custom_data["use_pre_announce"], + ) + elif media.queue_id.startswith("ugp_"): + # special case: UGP stream + ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp") + ugp_stream = ugp_provider.streams[media.queue_id] + audio_source = ugp_stream.subscribe_raw() + elif media.queue_id and media.queue_item_id: + # regular queue stream request + audio_source = self.mass.streams.get_flow_stream( + queue=self.mass.player_queues.get(media.queue_id), + start_queue_item=self.mass.player_queues.get_item( + media.queue_id, media.queue_item_id + ), + pcm_format=master_audio_format, ) else: - stream_job = None - # forward command to player and any connected sync members + # assume url or some other direct path + # NOTE: this will fail if its an uri not playable by ffmpeg + audio_source = get_ffmpeg_stream( + audio_input=media.uri, + input_format=AudioFormat(ContentType.try_parse(media.uri)), + output_format=master_audio_format, + ) + # start the stream task + self._multi_streams[player_id] = stream = MultiClientStream( + audio_source=audio_source, audio_format=master_audio_format + ) + base_url = f"{self.mass.streams.base_url}/slimproto/multi?player_id={player_id}&fmt=flac" + + # forward to downstream play_media commands async with asyncio.TaskGroup() as tg: for slimplayer in self._get_sync_clients(player_id): - enforce_mp3 = await self.mass.config.get_player_config_value( - slimplayer.player_id, CONF_ENFORCE_MP3 - ) + url = f"{base_url}&child_player_id={slimplayer.player_id}" + if self.mass.config.get_raw_player_config_value( + slimplayer.player_id, CONF_ENFORCE_MP3, False + ): + url = url.replace("flac", "mp3") + stream.expected_clients += 1 tg.create_task( self._handle_play_url( slimplayer, - url=stream_job.resolve_stream_url( - slimplayer.player_id, - output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, - ) - if stream_job - else self.mass.streams.resolve_stream_url( - slimplayer.player_id, - queue_item=queue_item, - output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, - ), - queue_item=None, + url=url, + media=media, send_flush=True, - auto_play=stream_job is None, + auto_play=False, ) ) - async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: + async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None: """Handle enqueuing of the next queue item on the player.""" if not (slimplayer := self.slimproto.get_player(player_id)): return - enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) - url = self.mass.streams.resolve_stream_url( - player_id, - queue_item=queue_item, - output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, - flow_mode=False, - ) + url = media.uri + if self.mass.config.get_raw_player_config_value( + slimplayer.player_id, CONF_ENFORCE_MP3, False + ): + url = url.replace("flac", "mp3") + await self._handle_play_url( slimplayer, url=url, - queue_item=queue_item, + media=media, enqueue=True, send_flush=False, auto_play=True, @@ -400,7 +441,7 @@ class SlimprotoProvider(PlayerProvider): self, slimplayer: SlimClient, url: str, - queue_item: QueueItem | None, + media: PlayerMedia, enqueue: bool = False, send_flush: bool = True, auto_play: bool = False, @@ -414,43 +455,15 @@ class SlimprotoProvider(PlayerProvider): else: transition_duration = 0 - if queue_item and queue_item.media_item: - album = getattr(queue_item.media_item, "album", None) - metadata = { - "item_id": queue_item.queue_item_id, - "title": queue_item.media_item.name, - "album": album.name if album else "", - "artist": getattr(queue_item.media_item, "artist_str", "Music Assistant"), - "image_url": self.mass.metadata.get_image_url( - queue_item.image, - size=512, - prefer_proxy=True, - ) - if queue_item.image - else MASS_LOGO_ONLINE, - "duration": queue_item.duration, - } - elif queue_item: - metadata = { - "item_id": queue_item.queue_item_id, - "title": queue_item.name, - "artist": "Music Assistant", - "image_url": self.mass.metadata.get_image_url( - queue_item.image, - size=512, - prefer_proxy=True, - ) - if queue_item.image - else MASS_LOGO_ONLINE, - "duration": queue_item.duration, - } - else: - metadata = { - "item_id": "flow", - "title": "Music Assistant", - "image_url": MASS_LOGO_ONLINE, - } - queue = self.mass.player_queues.get(queue_item.queue_id if queue_item else player_id) + metadata = { + "item_id": media.queue_item_id or media.uri, + "title": media.title, + "album": media.album, + "artist": media.artist, + "image_url": media.image_url, + "duration": media.duration, + } + queue = self.mass.player_queues.get(media.queue_id or player_id) slimplayer.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode] slimplayer.extra_data["playlist shuffle"] = int(queue.shuffle_enabled) # slimplayer.extra_data["can_seek"] = 1 if queue_item else 0 @@ -505,7 +518,7 @@ class SlimprotoProvider(PlayerProvider): parent_player = self.mass.players.get(target_player) assert parent_player # guard if parent_player.synced_to: - raise RuntimeError("Player is already synced") + raise RuntimeError("Parent player is already synced!") if child_player.synced_to and child_player.synced_to != target_player: raise RuntimeError("Player is already synced to another player") # always make sure that the parent player is part of the sync group @@ -515,18 +528,14 @@ class SlimprotoProvider(PlayerProvider): # check if we should (re)start or join a stream session active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id) if active_queue.state == PlayerState.PLAYING: - # playback needs to be restarted to form a new multi slimplayer stream session - def resync() -> None: - self._resync_handle = None - self.mass.create_task( - self.mass.player_queues.resume(active_queue.queue_id, fade_in=False) - ) - + # playback needs to be restarted to form a new multi client stream session # this could potentially be called by multiple players at the exact same time # so we debounce the resync a bit here with a timer - if self._resync_handle: - self._resync_handle.cancel() - self._resync_handle = self.mass.loop.call_later(0.5, resync) + self.mass.call_later( + 1, + self.mass.player_queues.resume(active_queue.queue_id, fade_in=False), + task_id=f"resume_{active_queue.queue_id}", + ) else: # make sure that the player manager gets an update self.mass.players.update(child_player.player_id, skip_forward=True) @@ -709,12 +718,6 @@ class SlimprotoProvider(PlayerProvider): # average lag/drift so we can adjust accordingly sync_playpoints = self._sync_playpoints[slimplayer.player_id] - active_queue = self.mass.player_queues.get_active_queue(slimplayer.player_id) - stream_job = self.mass.streams.multi_client_jobs.get(active_queue.queue_id) - if not stream_job: - # should not happen, but just in case - return - now = time.time() if now < self._do_not_resync_before[slimplayer.player_id]: return @@ -723,8 +726,8 @@ class SlimprotoProvider(PlayerProvider): if last_playpoint and (now - last_playpoint.timestamp) > 10: # last playpoint is too old, invalidate sync_playpoints.clear() - if last_playpoint and last_playpoint.sync_job_id != stream_job.job_id: - # streamjob has changed, invalidate + if last_playpoint and last_playpoint.sync_master != sync_master.player_id: + # this should not happen, but just in case sync_playpoints.clear() diff = int( @@ -732,12 +735,15 @@ class SlimprotoProvider(PlayerProvider): - self._get_corrected_elapsed_milliseconds(slimplayer) ) - if last_playpoint and abs(last_playpoint.diff - diff) > DEVIATION_JUMP_IGNORE: - # ignore unexpected spikes + # ignore unexpected spikes + if ( + sync_playpoints + and abs(statistics.fmean(x.diff for x in sync_playpoints)) > DEVIATION_JUMP_IGNORE + ): return # we can now append the current playpoint to our list - sync_playpoints.append(SyncPlayPoint(now, stream_job.job_id, diff)) + sync_playpoints.append(SyncPlayPoint(now, sync_master.player_id, diff)) min_req_playpoints = 2 if sync_master.elapsed_seconds < 2 else MIN_REQ_PLAYPOINTS if len(sync_playpoints) < min_req_playpoints: @@ -803,7 +809,7 @@ class SlimprotoProvider(PlayerProvider): # Instead just start playback on all players and let the sync logic work out # the delays etc. self._do_not_resync_before[_client.player_id] = time.time() + 1 - tg.create_task(_client.unpause_at(0)) + tg.create_task(_client.pause_for(200)) async def _handle_connected(self, slimplayer: SlimClient) -> None: """Handle a slimplayer connected event.""" @@ -888,3 +894,52 @@ class SlimprotoProvider(PlayerProvider): await slimplayer.configure_display( visualisation=SlimVisualisationType(visualization), disabled=not display_enabled ) + + async def _serve_multi_client_stream(self, request: web.Request) -> web.Response: + """Serve the multi-client flow stream audio to a player.""" + player_id = request.query.get("player_id") + fmt = request.query.get("fmt") + child_player_id = request.query.get("child_player_id") + + if not (player := self.mass.players.get(player_id)): + raise web.HTTPNotFound(reason=f"Unknown player: {player_id}") + + if not (child_player := self.mass.players.get(child_player_id)): + raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}") + + if not (stream := self._multi_streams.get(player_id, None)) or stream.done: + raise web.HTTPNotFound(f"There is no active stream for {player_id}!") + + resp = web.StreamResponse( + status=200, + reason="OK", + headers={ + "Content-Type": f"audio/{fmt}", + }, + ) + await resp.prepare(request) + + # return early if this is not a GET request + if request.method != "GET": + return resp + + # all checks passed, start streaming! + self.logger.debug( + "Start serving multi-client flow audio stream for player %s to %s", + player.display_name, + child_player.display_name, + ) + + async for chunk in stream.get_stream( + output_format=AudioFormat(content_type=ContentType.try_parse(fmt)), + filter_params=get_player_filter_params(self.mass, child_player_id) + if child_player_id + else None, + ): + try: + await resp.write(chunk) + except (BrokenPipeError, ConnectionResetError): + # race condition + break + + return resp diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index 88106e25..a180dc96 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -34,9 +34,8 @@ from music_assistant.common.models.enums import ( ) from music_assistant.common.models.errors import SetupFailedError from music_assistant.common.models.media_items import AudioFormat -from music_assistant.common.models.player import DeviceInfo, Player -from music_assistant.constants import UGP_PREFIX -from music_assistant.server.helpers.audio import FFMpeg, get_player_filter_params +from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia +from music_assistant.server.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params from music_assistant.server.helpers.process import AsyncProcess, check_output from music_assistant.server.models.player_provider import PlayerProvider @@ -47,9 +46,9 @@ if TYPE_CHECKING: from music_assistant.common.models.config_entries import ProviderConfig from music_assistant.common.models.provider import ProviderManifest - from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType + from music_assistant.server.providers.ugp import UniversalGroupProvider CONF_SERVER_HOST = "snapcast_server_host" CONF_SERVER_CONTROL_PORT = "snapcast_server_control_port" @@ -145,7 +144,7 @@ class SnapCastProvider(PlayerProvider): @property def supported_features(self) -> tuple[ProviderFeature, ...]: """Return the features supported by this Provider.""" - return (ProviderFeature.SYNC_PLAYERS,) + return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE) async def handle_async_init(self) -> None: """Handle async initialization of the provider.""" @@ -262,10 +261,14 @@ class SnapCastProvider(PlayerProvider): ) player.synced_to = self._synced_to(player_id) player.group_childs = self._group_childs(player_id) - if player.current_item_id and player_id in player.current_item_id: - player.active_source = player_id - elif stream := self._get_snapstream(player_id): - player.active_source = stream.name + if player.active_group is None: + if stream := self._get_snapstream(player_id): + if stream.name.startswith(("MusicAssistant", "default")): + player.active_source = player_id + else: + player.active_source = stream.name + else: + player.active_source = player_id self.mass.players.register_or_update(player) async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: @@ -308,7 +311,7 @@ class SnapCastProvider(PlayerProvider): await self._get_snapgroup(player_id).set_stream("default") self._handle_update() - async def play_media(self, player_id: str, queue_item: QueueItem) -> None: + async def play_media(self, player_id: str, media: PlayerMedia) -> None: """Handle PLAY MEDIA on given player.""" player = self.mass.players.get(player_id) if player.synced_to: @@ -316,31 +319,43 @@ class SnapCastProvider(PlayerProvider): raise RuntimeError(msg) # stop any existing streams first await self.cmd_stop(player_id) - queue = self.mass.player_queues.get(queue_item.queue_id) stream, port = await self._create_stream() snap_group = self._get_snapgroup(player_id) await snap_group.set_stream(stream.identifier) - if queue_item.media_type == MediaType.ANNOUNCEMENT: + # select audio source + if media.media_type == MediaType.ANNOUNCEMENT: # special case: stream announcement input_format = DEFAULT_SNAPCAST_FORMAT audio_source = self.mass.streams.get_announcement_stream( - queue_item.streamdetails.data["url"], + media.custom_data["url"], output_format=DEFAULT_SNAPCAST_FORMAT, - use_pre_announce=queue_item.streamdetails.data["use_pre_announce"], + use_pre_announce=media.custom_data["use_pre_announce"], ) - elif queue_item.queue_id.startswith(UGP_PREFIX): - # special case: we got forwarded a request from the UGP - # use the existing stream job that was already created by UGP - stream_job = self.mass.streams.multi_client_jobs[queue_item.queue_id] - stream_job.expected_players.add(player_id) - input_format = stream_job.pcm_format - audio_source = stream_job.subscribe(player_id) - else: - queue = self.mass.player_queues.get(queue_item.queue_id) + elif media.queue_id.startswith("ugp_"): + # special case: UGP stream + ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp") + ugp_stream = ugp_provider.streams[media.queue_id] + input_format = ugp_stream.audio_format + audio_source = ugp_stream.subscribe_raw() + elif media.queue_id and media.queue_item_id: + # regular queue stream request input_format = DEFAULT_SNAPCAST_FORMAT audio_source = self.mass.streams.get_flow_stream( - queue, start_queue_item=queue_item, pcm_format=DEFAULT_SNAPCAST_FORMAT + queue=self.mass.player_queues.get(media.queue_id), + start_queue_item=self.mass.player_queues.get_item( + media.queue_id, media.queue_item_id + ), + pcm_format=DEFAULT_SNAPCAST_FORMAT, + ) + else: + # assume url or some other direct path + # NOTE: this will fail if its an uri not playable by ffmpeg + input_format = DEFAULT_SNAPCAST_FORMAT + audio_source = get_ffmpeg_stream( + audio_input=media.uri, + input_format=AudioFormat(ContentType.try_parse(media.uri)), + output_format=DEFAULT_SNAPCAST_FORMAT, ) async def _streamer() -> None: @@ -350,7 +365,7 @@ class SnapCastProvider(PlayerProvider): def stream_callback(_stream) -> None: player.state = PlayerState(_stream.status) if player.state == PlayerState.PLAYING: - player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}" + player.current_media = media player.elapsed_time = 0 player.elapsed_time_last_updated = time.time() self.mass.players.update(player_id) diff --git a/music_assistant/server/providers/sonos/__init__.py b/music_assistant/server/providers/sonos/__init__.py index 5027dfff..274e9d44 100644 --- a/music_assistant/server/providers/sonos/__init__.py +++ b/music_assistant/server/providers/sonos/__init__.py @@ -26,13 +26,12 @@ from music_assistant.common.models.config_entries import ( ) from music_assistant.common.models.enums import ( ConfigEntryType, - ContentType, PlayerFeature, PlayerType, ProviderFeature, ) from music_assistant.common.models.errors import PlayerCommandFailed, PlayerUnavailableError -from music_assistant.common.models.player import DeviceInfo, Player +from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia from music_assistant.constants import CONF_CROSSFADE, SYNCGROUP_PREFIX, VERBOSE_LOG_LEVEL from music_assistant.server.helpers.didl_lite import create_didl_metadata from music_assistant.server.models.player_provider import PlayerProvider @@ -44,7 +43,6 @@ if TYPE_CHECKING: from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig from music_assistant.common.models.provider import ProviderManifest - from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType @@ -144,7 +142,7 @@ class SonosPlayerProvider(PlayerProvider): @property def supported_features(self) -> tuple[ProviderFeature, ...]: """Return the features supported by this Provider.""" - return (ProviderFeature.SYNC_PLAYERS,) + return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE) async def handle_async_init(self) -> None: """Handle async initialization of the provider.""" @@ -336,7 +334,7 @@ class SonosPlayerProvider(PlayerProvider): async def play_media( self, player_id: str, - queue_item: QueueItem, + media: PlayerMedia, ) -> None: """Handle PLAY MEDIA on given player.""" sonos_player = self.sonosplayers[player_id] @@ -349,36 +347,13 @@ class SonosPlayerProvider(PlayerProvider): ) raise PlayerCommandFailed(msg) - url = self.mass.streams.resolve_stream_url( - player_id, - queue_item=queue_item, - output_codec=ContentType.FLAC, - ) - self.mass.create_task( - sonos_player.soco.play_uri, url, meta=create_didl_metadata(self.mass, url, queue_item) - ) - - async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: - """ - Handle enqueuing of the next queue item on the player. - - If the player supports PlayerFeature.ENQUE_NEXT: - This will be called about 10 seconds before the end of the track. - If the player does NOT report support for PlayerFeature.ENQUE_NEXT: - This will be called when the end of the track is reached. + didl_metadata = create_didl_metadata(media) + self.mass.create_task(sonos_player.soco.play_uri, media.uri, meta=didl_metadata) - A PlayerProvider implementation is in itself responsible for handling this - so that the queue items keep playing until its empty or the player stopped. - - This will NOT be called if the end of the queue is reached (and repeat disabled). - This will NOT be called if flow mode is enabled on the queue. - """ + async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None: + """Handle enqueuing of the next queue item on the player.""" sonos_player = self.sonosplayers[player_id] - url = self.mass.streams.resolve_stream_url( - player_id, - queue_item=queue_item, - output_codec=ContentType.FLAC, - ) + didl_metadata = create_didl_metadata(media) # set crossfade according to player setting crossfade = await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE) if sonos_player.crossfade != crossfade: @@ -394,10 +369,25 @@ class SonosPlayerProvider(PlayerProvider): await asyncio.to_thread(set_crossfade) - await self._enqueue_item(sonos_player, url=url, queue_item=queue_item) + try: + await asyncio.to_thread( + sonos_player.soco.avTransport.SetNextAVTransportURI, + [("InstanceID", 0), ("NextURI", media.uri), ("NextURIMetaData", didl_metadata)], + timeout=60, + ) + except Exception as err: + self.logger.warning( + "Unable to enqueue next track on player: %s: %s", sonos_player.zone_name, err + ) + else: + self.logger.debug( + "Enqued next track (%s) to player %s", + media.title or media.uri, + sonos_player.soco.player_name, + ) async def play_announcement( - self, player_id: str, announcement: QueueItem, volume_level: int | None = None + self, player_id: str, announcement: PlayerMedia, volume_level: int | None = None ) -> None: """Handle (provider native) playback of an announcement on given player.""" if player_id.startswith(SYNCGROUP_PREFIX): @@ -410,19 +400,16 @@ class SonosPlayerProvider(PlayerProvider): self.play_announcement(child_player_id, announcement, volume_level) ) return - announcement_url = self.mass.streams.resolve_stream_url( - player_id, announcement, ContentType.MP3 - ) sonos_player = self.sonosplayers[player_id] self.logger.debug( "Playing announcement %s using websocket audioclip on %s", - announcement_url, + announcement.uri, sonos_player.zone_name, ) volume_level = self.mass.players.get_announcement_volume(player_id, volume_level) try: response, _ = await sonos_player.websocket.play_clip( - announcement_url, + announcement.uri, volume=volume_level, ) except SonosWebsocketError as exc: @@ -544,28 +531,3 @@ class SonosPlayerProvider(PlayerProvider): self.mass.loop.call_soon_threadsafe( self.mass.players.register_or_update, sonos_player.mass_player ) - - async def _enqueue_item( - self, - sonos_player: SonosPlayer, - url: str, - queue_item: QueueItem | None, - ) -> None: - """Enqueue a queue item to the Sonos player Queue.""" - metadata = create_didl_metadata(self.mass, url, queue_item) - try: - await asyncio.to_thread( - sonos_player.soco.avTransport.SetNextAVTransportURI, - [("InstanceID", 0), ("NextURI", url), ("NextURIMetaData", metadata)], - timeout=60, - ) - except Exception as err: - self.logger.warning( - "Unable to enqueue next track on player: %s: %s", sonos_player.zone_name, err - ) - else: - self.logger.debug( - "Enqued next track (%s) to player %s", - queue_item.name if queue_item else url, - sonos_player.soco.player_name, - ) diff --git a/music_assistant/server/providers/sonos/player.py b/music_assistant/server/providers/sonos/player.py index 20388f42..711a1485 100644 --- a/music_assistant/server/providers/sonos/player.py +++ b/music_assistant/server/providers/sonos/player.py @@ -535,17 +535,17 @@ class SonosPlayer: """Handle callback for topology change event.""" if xml := event.variables.get("zone_group_state"): zgs = ET.fromstring(xml) - vanished_devices = zgs.find("VanishedDevices") or [] - for vanished_device in vanished_devices: - if (reason := vanished_device.get("Reason")) not in SUPPORTED_VANISH_REASONS: - self.logger.debug( - "Ignoring %s marked %s as vanished with reason: %s", - self.zone_name, - vanished_device.get("ZoneName"), - reason, - ) - continue - self.mass.create_task(self._vanished(reason)) + if vanished_devices := zgs.find("VanishedDevices"): + for vanished_device in vanished_devices: + if (reason := vanished_device.get("Reason")) not in SUPPORTED_VANISH_REASONS: + self.logger.debug( + "Ignoring %s marked %s as vanished with reason: %s", + self.zone_name, + vanished_device.get("ZoneName"), + reason, + ) + continue + self.mass.create_task(self._vanished(reason)) if "zone_player_uui_ds_in_group" not in event.variables: return diff --git a/music_assistant/server/providers/ugp/__init__.py b/music_assistant/server/providers/ugp/__init__.py index 900f5424..cb764060 100644 --- a/music_assistant/server/providers/ugp/__init__.py +++ b/music_assistant/server/providers/ugp/__init__.py @@ -8,9 +8,11 @@ allowing the user to create player groups from all players known in the system. from __future__ import annotations import asyncio +from time import time from typing import TYPE_CHECKING import shortuuid +from aiohttp import web from music_assistant.common.models.config_entries import ( CONF_ENTRY_CROSSFADE_DURATION, @@ -20,20 +22,19 @@ from music_assistant.common.models.config_entries import ( ) from music_assistant.common.models.enums import ( ConfigEntryType, + ContentType, MediaType, PlayerFeature, PlayerState, PlayerType, ProviderFeature, ) -from music_assistant.common.models.player import DeviceInfo, Player -from music_assistant.common.models.queue_item import QueueItem -from music_assistant.constants import ( - CONF_CROSSFADE, - CONF_GROUP_MEMBERS, - SYNCGROUP_PREFIX, - UGP_PREFIX, -) +from music_assistant.common.models.media_items import AudioFormat +from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia +from music_assistant.constants import CONF_CROSSFADE, CONF_GROUP_MEMBERS, SYNCGROUP_PREFIX +from music_assistant.server.controllers.streams import DEFAULT_STREAM_HEADERS +from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params +from music_assistant.server.helpers.multi_client_stream import MultiClientStream from music_assistant.server.models.player_provider import PlayerProvider if TYPE_CHECKING: @@ -47,6 +48,10 @@ if TYPE_CHECKING: # ruff: noqa: ARG002 +UGP_FORMAT = AudioFormat( + content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24 +) + async def setup( mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig @@ -74,9 +79,6 @@ async def get_config_entries( class UniversalGroupProvider(PlayerProvider): """Base/builtin provider for universally grouping players.""" - prev_sync_leaders: dict[str, tuple[str]] | None = None - debounce_id: str | None = None - @property def supported_features(self) -> tuple[ProviderFeature, ...]: """Return the features supported by this Provider.""" @@ -87,12 +89,23 @@ class UniversalGroupProvider(PlayerProvider): ) -> None: """Initialize MusicProvider.""" super().__init__(mass, manifest, config) - self.prev_sync_leaders = {} + self._registered_routes: set[str] = set() + self.streams: dict[str, MultiClientStream] = {} async def loaded_in_mass(self) -> None: """Call after the provider has been loaded.""" await self._register_all_players() + async def unload(self) -> None: + """ + Handle unload/close of the provider. + + Called when provider is deregistered (e.g. MA exiting or config reloading). + """ + for route_path in list(self._registered_routes): + self._registered_routes.remove(route_path) + self.mass.streams.unregister_dynamic_route(route_path) + async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: """Return all (provider/player specific) Config Entries for the given player (if any).""" base_entries = await super().get_player_config_entries(player_id) @@ -136,21 +149,19 @@ class UniversalGroupProvider(PlayerProvider): """Send STOP command to given player.""" group_player = self.mass.players.get(player_id) group_player.state = PlayerState.IDLE + self.mass.players.update(player_id) # forward command to player and any connected sync child's async with asyncio.TaskGroup() as tg: for member in self.mass.players.iter_group_members(group_player, only_powered=True): if member.state == PlayerState.IDLE: continue tg.create_task(self.mass.players.cmd_stop(member.player_id)) - if existing := self.mass.streams.multi_client_jobs.pop(player_id, None): - existing.stop() + if (stream := self.streams.pop(player_id, None)) and not stream.done: + await stream.stop() async def cmd_play(self, player_id: str) -> None: """Send PLAY command to given player.""" - async def cmd_pause(self, player_id: str) -> None: - """Send PAUSE command to given player.""" - async def cmd_power(self, player_id: str, powered: bool) -> None: """Send POWER command to given player.""" await self.mass.players.cmd_group_power(player_id, powered) @@ -159,52 +170,74 @@ class UniversalGroupProvider(PlayerProvider): """Send VOLUME_SET command to given player.""" # group volume is already handled in the player manager - async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: - """Send VOLUME MUTE command to given player.""" - async def play_media( self, player_id: str, - queue_item: QueueItem, + media: PlayerMedia, ) -> None: """Handle PLAY MEDIA on given player.""" # power ON await self.cmd_power(player_id, True) group_player = self.mass.players.get(player_id) + # stop any existing stream first + if (existing := self.streams.pop(player_id, None)) and not existing.done: + existing.task.cancel() + + # select audio source + if media.media_type == MediaType.ANNOUNCEMENT: + # special case: stream announcement + audio_source = self.mass.streams.get_announcement_stream( + media.custom_data["url"], + output_format=UGP_FORMAT, + use_pre_announce=media.custom_data["use_pre_announce"], + ) + elif media.queue_id and media.queue_item_id: + # regular queue stream request + audio_source = self.mass.streams.get_flow_stream( + queue=self.mass.player_queues.get(media.queue_id), + start_queue_item=self.mass.player_queues.get_item( + media.queue_id, media.queue_item_id + ), + pcm_format=UGP_FORMAT, + ) + else: + # assume url or some other direct path + # NOTE: this will fail if its an uri not playable by ffmpeg + audio_source = get_ffmpeg_stream( + audio_input=media.uri, + input_format=AudioFormat(ContentType.try_parse(media.uri)), + output_format=UGP_FORMAT, + ) - # create a multi-client stream job - all (direct) child's of this UGP group - # will subscribe to this multi client queue stream - queue = self.mass.player_queues.get(player_id) - stream_job = self.mass.streams.create_multi_client_stream_job( - queue.queue_id, - start_queue_item=queue_item, + # start the stream task + self.streams[player_id] = MultiClientStream( + audio_source=audio_source, audio_format=UGP_FORMAT ) - # create a fake queue item to forward to downstream play_media commands - ugp_queue_item = QueueItem( - player_id, - queue_item_id=stream_job.job_id, - name="Music Assistant", - duration=None, - ) - # special case: handle announcement sent to this UGP - # we just forward this as-is downstream and let all child players handle this themselves - if queue_item.media_type == MediaType.ANNOUNCEMENT: - ugp_queue_item = queue_item + base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.flac" - # forward the stream job to all group members + # forward to downstream play_media commands async with asyncio.TaskGroup() as tg: for member in self.mass.players.iter_group_members(group_player, only_powered=True): - player_prov = self.mass.players.get_player_provider(member.player_id) if member.player_id.startswith(SYNCGROUP_PREFIX): member = self.mass.players.get_sync_leader(member) # noqa: PLW2901 if member is None: continue - tg.create_task(player_prov.play_media(member.player_id, ugp_queue_item)) - - async def poll_player(self, player_id: str) -> None: - """Poll player for state updates.""" - self.update_attributes(player_id) - self.mass.players.update(player_id, skip_forward=True) + tg.create_task( + self.mass.players.play_media( + member.player_id, + media=PlayerMedia( + uri=f"{base_url}?player_id={member.player_id}", + media_type=MediaType.FLOW_STREAM, + title=group_player.display_name, + queue_id=group_player.player_id, + ), + ) + ) + # set the state optimistically + group_player.elapsed_time = 0 + group_player.elapsed_time_last_updated = time() - 1 + group_player.state = PlayerState.PLAYING + self.mass.players.update(player_id) async def create_group(self, name: str, members: list[str]) -> Player: """Create new PlayerGroup on this provider. @@ -214,7 +247,7 @@ class UniversalGroupProvider(PlayerProvider): - name: Name for the new group to create. - members: A list of player_id's that should be part of this group. """ - new_group_id = f"{UGP_PREFIX}{shortuuid.random(8).lower()}" + new_group_id = f"{self.domain}_{shortuuid.random(8).lower()}" # cleanup list, filter groups (should be handled by frontend, but just in case) members = [ x.player_id @@ -261,22 +294,13 @@ class UniversalGroupProvider(PlayerProvider): group_childs=set(members), ) self.mass.players.register_or_update(player) - return player - - def update_attributes(self, player_id: str) -> None: - """Update player attributes.""" - group_player = self.mass.players.get(player_id) - if not group_player.powered: - group_player.state = PlayerState.IDLE - return + # register dynamic routes for the ugp stream (both flac and mp3) + for fmt in ("mp3", "flac"): + route_path = f"/ugp/{group_player_id}.{fmt}" + self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream) + self._registered_routes.add(route_path) - # read the state from the first active group member - for member in self.mass.players.iter_group_members(group_player, only_powered=True): - group_player.current_item_id = member.current_item_id - group_player.elapsed_time = member.elapsed_time - group_player.elapsed_time_last_updated = member.elapsed_time_last_updated - group_player.state = member.state - break + return player def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None: """ @@ -309,18 +333,66 @@ class UniversalGroupProvider(PlayerProvider): return False # if a child player turned ON while the group player is already playing - # we need to resync/resume + # we just direct it to the existing stream (we dont care about the audio being in sync) if new_power and group_player.state == PlayerState.PLAYING: - self.logger.warning( - "Player %s turned on while syncgroup is playing, " - "a forced resume for %s will be performed...", - child_player.display_name, - group_player.display_name, + base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.flac" + self.mass.create_task( + self.mass.players.play_media( + child_player.player_id, + media=PlayerMedia( + uri=f"{base_url}?player_id={child_player.player_id}", + media_type=MediaType.FLOW_STREAM, + title=group_player.display_name, + queue_id=group_player.player_id, + ), + ) ) - self.mass.loop.call_later( - 1, - self.mass.create_task, - self.mass.player_queues.resume(group_player.player_id), - ) - return None + return None + + async def _serve_ugp_stream(self, request: web.Request) -> web.Response: + """Serve the UGP (multi-client) flow stream audio to a player.""" + ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1] + fmt = request.path.rsplit(".")[-1] + child_player_id = request.query.get("player_id") # optional! + + if not (ugp_player := self.mass.players.get(ugp_player_id)): + raise web.HTTPNotFound(reason=f"Unknown UGP player: {ugp_player_id}") + + if not (stream := self.streams.get(ugp_player_id, None)) or stream.done: + raise web.HTTPNotFound(f"There is no active UGP stream for {ugp_player_id}!") + + resp = web.StreamResponse( + status=200, + reason="OK", + headers={ + **DEFAULT_STREAM_HEADERS, + "Content-Type": f"audio/{fmt}", + }, + ) + await resp.prepare(request) + + # return early if this is not a GET request + if request.method != "GET": + return resp + + # all checks passed, start streaming! + self.logger.debug( + "Start serving UGP flow audio stream for UGP-player %s to %s", + ugp_player.display_name, + child_player_id or request.remote, + ) + + async for chunk in stream.get_stream( + output_format=AudioFormat(content_type=ContentType.try_parse(fmt)), + filter_params=get_player_filter_params(self.mass, child_player_id) + if child_player_id + else None, + ): + try: + await resp.write(chunk) + except (BrokenPipeError, ConnectionResetError): + # race condition + break + + return resp diff --git a/music_assistant/server/server.py b/music_assistant/server/server.py index e930ea9d..7953fb39 100644 --- a/music_assistant/server/server.py +++ b/music_assistant/server/server.py @@ -98,6 +98,7 @@ class MusicAssistant: self._provider_manifests: dict[str, ProviderManifest] = {} self._providers: dict[str, ProviderInstanceType] = {} self._tracked_tasks: dict[str, asyncio.Task] = {} + self._tracked_timers: dict[str, asyncio.TimerHandle] = {} self.closing = False self.running_as_hass_addon: bool = False self.version: str = "0.0.0" @@ -363,12 +364,24 @@ class MusicAssistant: task_id: str | None = None, **kwargs: Any, ) -> asyncio.TimerHandle: - """Run callable/awaitable after given delay.""" + """ + Run callable/awaitable after given delay. + + Use task_id for debouncing. + """ + if not task_id: + task_id = uuid4().hex + + if existing := self._tracked_timers.get(task_id): + existing.cancel() def _create_task() -> None: + self._tracked_timers.pop(task_id) self.create_task(target, *args, task_id=task_id, **kwargs) - return self.loop.call_later(delay, _create_task) + handle = self.loop.call_later(delay, _create_task) + self._tracked_timers[task_id] = handle + return handle def get_task(self, task_id: str) -> asyncio.Task | asyncio.Future: """Get existing scheduled task.""" -- 2.34.1