From: Marcel van der Veldt Date: Fri, 19 Jan 2024 14:32:57 +0000 (+0100) Subject: Refactor enqueing of items during playback and standardize player settings (#1008) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=d1874b678754047f21fc706246963fd316fd6074;p=music-assistant-server.git Refactor enqueing of items during playback and standardize player settings (#1008) --- diff --git a/music_assistant/client/players.py b/music_assistant/client/players.py index 562b3140..6e6e1f8f 100644 --- a/music_assistant/client/players.py +++ b/music_assistant/client/players.py @@ -259,12 +259,6 @@ class Players: "players/queue/repeat", queue_id=queue_id, repeat_mode=repeat_mode ) - async def queue_command_crossfade(self, queue_id: str, crossfade_enabled: bool) -> None: - """Configure crossfade mode on the the queue.""" - await self.client.send_command( - "players/queue/crossfade", queue_id=queue_id, crossfade_enabled=crossfade_enabled - ) - async def play_media( self, queue_id: str, diff --git a/music_assistant/common/models/config_entries.py b/music_assistant/common/models/config_entries.py index 826b2ba0..27a519f5 100644 --- a/music_assistant/common/models/config_entries.py +++ b/music_assistant/common/models/config_entries.py @@ -12,6 +12,7 @@ from mashumaro import DataClassDictMixin from music_assistant.common.models.enums import ProviderType from music_assistant.constants import ( CONF_AUTO_PLAY, + CONF_CROSSFADE, CONF_CROSSFADE_DURATION, CONF_EQ_BASS, CONF_EQ_MID, @@ -20,7 +21,6 @@ from music_assistant.constants import ( CONF_HIDE_GROUP_CHILDS, CONF_LOG_LEVEL, CONF_OUTPUT_CHANNELS, - CONF_OUTPUT_CODEC, CONF_VOLUME_NORMALIZATION, CONF_VOLUME_NORMALIZATION_TARGET, SECURE_STRING_SUBSTITUTE, @@ -297,36 +297,18 @@ DEFAULT_CORE_CONFIG_ENTRIES = (CONF_ENTRY_LOG_LEVEL,) # some reusable player config entries -CONF_ENTRY_OUTPUT_CODEC = ConfigEntry( - key=CONF_OUTPUT_CODEC, - type=ConfigEntryType.STRING, - label="Output codec", - options=[ - ConfigValueOption("FLAC (lossless, compact file size)", "flac"), - ConfigValueOption("AAC (lossy, superior quality)", "aac"), - ConfigValueOption("MP3 (lossy, average quality)", "mp3"), - ConfigValueOption("WAV (lossless, huge file size)", "wav"), - ConfigValueOption("PCM (lossless, huge file size)", "pcm"), - ], - default_value="flac", - description="Define the codec that is sent to the player when streaming audio. " - "By default Music Assistant prefers FLAC because it is lossless, has a " - "respectable filesize and is supported by most player devices. " - "Change this setting only if needed for your device/environment.", - advanced=True, -) - CONF_ENTRY_FLOW_MODE = ConfigEntry( key=CONF_FLOW_MODE, type=ConfigEntryType.BOOLEAN, label="Enable queue flow mode", default_value=False, description='Enable "flow" mode where all queue tracks are sent as a continuous ' - "audio stream. Use for players that do not natively support gapless and/or " + "audio stream. \nUse for players that do not natively support gapless and/or " "crossfading or if the player has trouble transitioning between tracks.", advanced=False, ) + CONF_ENTRY_AUTO_PLAY = ConfigEntry( key=CONF_AUTO_PLAY, type=ConfigEntryType.BOOLEAN, @@ -418,27 +400,22 @@ CONF_ENTRY_HIDE_GROUP_MEMBERS = ConfigEntry( advanced=False, ) +CONF_ENTRY_CROSSFADE = ConfigEntry( + key=CONF_CROSSFADE, + type=ConfigEntryType.BOOLEAN, + label="Enable crossfade", + default_value=False, + description="Enable a crossfade transition between (queue) tracks.", + advanced=False, +) + CONF_ENTRY_CROSSFADE_DURATION = ConfigEntry( key=CONF_CROSSFADE_DURATION, type=ConfigEntryType.INTEGER, - range=(1, 20), + range=(1, 10), default_value=8, label="Crossfade duration", description="Duration in seconds of the crossfade between tracks (if enabled)", - depends_on=CONF_FLOW_MODE, + depends_on=CONF_CROSSFADE, advanced=True, ) - - -DEFAULT_PLAYER_CONFIG_ENTRIES = ( - CONF_ENTRY_VOLUME_NORMALIZATION, - CONF_ENTRY_FLOW_MODE, - CONF_ENTRY_AUTO_PLAY, - CONF_ENTRY_OUTPUT_CODEC, - CONF_ENTRY_VOLUME_NORMALIZATION_TARGET, - CONF_ENTRY_EQ_BASS, - CONF_ENTRY_EQ_MID, - CONF_ENTRY_EQ_TREBLE, - CONF_ENTRY_OUTPUT_CHANNELS, - CONF_ENTRY_CROSSFADE_DURATION, -) diff --git a/music_assistant/common/models/enums.py b/music_assistant/common/models/enums.py index 14e340a4..15398b72 100644 --- a/music_assistant/common/models/enums.py +++ b/music_assistant/common/models/enums.py @@ -229,8 +229,7 @@ class PlayerFeature(StrEnum): SYNC = "sync" ACCURATE_TIME = "accurate_time" SEEK = "seek" - SET_MEMBERS = "set_members" - QUEUE = "queue" + ENQUEUE_NEXT = "enqueue_next" CROSSFADE = "crossfade" diff --git a/music_assistant/common/models/player.py b/music_assistant/common/models/player.py index 52c201ee..20d3660e 100644 --- a/music_assistant/common/models/player.py +++ b/music_assistant/common/models/player.py @@ -34,7 +34,6 @@ class Player(DataClassDictMixin): elapsed_time: float = 0 elapsed_time_last_updated: float = time.time() - current_url: str | None = None state: PlayerState = PlayerState.IDLE volume_level: int = 100 @@ -53,6 +52,10 @@ class Player(DataClassDictMixin): # otherwise it will be set to the own player_id active_source: str = "" + # current_item_id: return item_id/uri of the current active/loaded item on the player + # this may be a MA queue_item_id, url, uri or some provider specific string + current_item_id: str | None = None + # can_sync_with: return tuple of player_ids that can be synced to/with this player # usually this is just a list of all player_ids within the playerprovider can_sync_with: tuple[str, ...] = field(default=tuple()) diff --git a/music_assistant/common/models/player_queue.py b/music_assistant/common/models/player_queue.py index 66a35af0..c08b0a5e 100644 --- a/music_assistant/common/models/player_queue.py +++ b/music_assistant/common/models/player_queue.py @@ -24,7 +24,6 @@ class PlayerQueue(DataClassDictMixin): shuffle_enabled: bool = False repeat_mode: RepeatMode = RepeatMode.OFF - crossfade_enabled: bool = True # current_index: index that is active (e.g. being played) by the player current_index: int | None = None # index_in_buffer: index that has been preloaded/buffered by the player @@ -37,6 +36,9 @@ class PlayerQueue(DataClassDictMixin): radio_source: list[MediaItemType] = field(default_factory=list) announcement_in_progress: bool = False flow_mode: bool = False + # flow_mode_start_index: index of the first item of the flow stream + flow_mode_start_index: int = 0 + next_track_enqueued: bool = False @property def corrected_elapsed_time(self) -> float: diff --git a/music_assistant/constants.py b/music_assistant/constants.py index ea64aa71..b96d78b9 100755 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -44,13 +44,13 @@ CONF_OUTPUT_CHANNELS: Final[str] = "output_channels" CONF_FLOW_MODE: Final[str] = "flow_mode" CONF_LOG_LEVEL: Final[str] = "log_level" CONF_HIDE_GROUP_CHILDS: Final[str] = "hide_group_childs" -CONF_OUTPUT_CODEC: Final[str] = "output_codec" CONF_GROUPED_POWER_ON: Final[str] = "grouped_power_on" CONF_CROSSFADE_DURATION: Final[str] = "crossfade_duration" CONF_BIND_IP: Final[str] = "bind_ip" CONF_BIND_PORT: Final[str] = "bind_port" CONF_PUBLISH_IP: Final[str] = "publish_ip" CONF_AUTO_PLAY: Final[str] = "auto_play" +CONF_CROSSFADE: Final[str] = "crossfade" # config default values DEFAULT_HOST: Final[str] = "0.0.0.0" diff --git a/music_assistant/server/controllers/config.py b/music_assistant/server/controllers/config.py index 1a5bb03b..7aaa3ade 100644 --- a/music_assistant/server/controllers/config.py +++ b/music_assistant/server/controllers/config.py @@ -18,7 +18,6 @@ from music_assistant.common.helpers.json import JSON_DECODE_EXCEPTIONS, json_dum from music_assistant.common.models import config_entries from music_assistant.common.models.config_entries import ( DEFAULT_CORE_CONFIG_ENTRIES, - DEFAULT_PLAYER_CONFIG_ENTRIES, DEFAULT_PROVIDER_CONFIG_ENTRIES, ConfigEntry, ConfigValueType, @@ -327,36 +326,30 @@ class ConfigController: """Return configuration for a single player.""" if raw_conf := self.get(f"{CONF_PLAYERS}/{player_id}"): if prov := self.mass.get_provider(raw_conf["provider"]): - prov_entries = await prov.get_player_config_entries(player_id) + conf_entries = await prov.get_player_config_entries(player_id) if player := self.mass.players.get(player_id, False): raw_conf["default_name"] = player.display_name else: - prov_entries = tuple() + conf_entries = tuple() raw_conf["available"] = False raw_conf["name"] = raw_conf.get("name") raw_conf["default_name"] = raw_conf.get("default_name") or raw_conf["player_id"] - prov_entries_keys = {x.key for x in prov_entries} - # combine provider defined entries with default player config entries - entries = prov_entries + tuple( - x for x in DEFAULT_PLAYER_CONFIG_ENTRIES if x.key not in prov_entries_keys - ) - return PlayerConfig.parse(entries, raw_conf) + return PlayerConfig.parse(conf_entries, raw_conf) raise KeyError(f"No config found for player id {player_id}") @api_command("config/players/get_value") - async def get_player_config_value(self, player_id: str, key: str) -> ConfigValueType: + async def get_player_config_value( + self, + player_id: str, + key: str, + ) -> ConfigValueType: """Return single configentry value for a player.""" - cache_key = f"player_conf_value_{player_id}.{key}" - if (cached_value := self._value_cache.get(cache_key)) and cached_value is not None: - return cached_value conf = await self.get_player_config(player_id) val = ( conf.values[key].value if conf.values[key].value is not None else conf.values[key].default_value ) - # store value in cache because this method can potentially be called very often - self._value_cache[cache_key] = val return val def get_raw_player_config_value( diff --git a/music_assistant/server/controllers/player_queues.py b/music_assistant/server/controllers/player_queues.py index 60de382f..55aa3b57 100755 --- a/music_assistant/server/controllers/player_queues.py +++ b/music_assistant/server/controllers/player_queues.py @@ -5,6 +5,7 @@ import logging import random import time from collections.abc import AsyncGenerator +from contextlib import suppress from typing import TYPE_CHECKING, Any from music_assistant.common.helpers.util import get_changed_keys @@ -25,9 +26,9 @@ from music_assistant.common.models.errors import ( from music_assistant.common.models.media_items import MediaItemType, media_from_dict from music_assistant.common.models.player_queue import PlayerQueue from music_assistant.common.models.queue_item import QueueItem -from music_assistant.constants import CONF_FLOW_MODE, FALLBACK_DURATION, ROOT_LOGGER_NAME +from music_assistant.constants import FALLBACK_DURATION, ROOT_LOGGER_NAME from music_assistant.server.helpers.api import api_command -from music_assistant.server.helpers.audio import get_stream_details +from music_assistant.server.helpers.audio import set_stream_details from music_assistant.server.models.core_controller import CoreController if TYPE_CHECKING: @@ -139,15 +140,6 @@ class PlayerQueuesController(CoreController): queue.repeat_mode = repeat_mode self.signal_update(queue_id) - @api_command("players/queue/crossfade") - def set_crossfade(self, queue_id: str, crossfade_enabled: bool) -> None: - """Configure crossfade setting on the the queue.""" - queue = self._queues[queue_id] - if queue.crossfade_enabled == crossfade_enabled: - return # no change - queue.crossfade_enabled = crossfade_enabled - self.signal_update(queue_id) - @api_command("players/queue/play_media") async def play_media( self, @@ -408,6 +400,11 @@ class PlayerQueuesController(CoreController): if self._queues[queue_id].announcement_in_progress: LOGGER.warning("Ignore queue command for %s because an announcement is in progress.") return + player = self.mass.players.get(queue_id, True) + if PlayerFeature.PAUSE not in player.supported_features: + # if player does not support pause, we need to send stop + await self.stop(queue_id) + return # simply forward the command to underlying player await self.mass.players.cmd_pause(queue_id) @@ -429,7 +426,7 @@ class PlayerQueuesController(CoreController): - queue_id: queue_id of the queue to handle the command. """ current_index = self._queues[queue_id].current_index - next_index = self.get_next_index(queue_id, current_index, True) + next_index = self._get_next_index(queue_id, current_index, True) if next_index is None: return await self.play_index(queue_id, next_index) @@ -528,41 +525,14 @@ class PlayerQueuesController(CoreController): raise FileNotFoundError(f"Unknown index/id: {index}") queue.current_index = index queue.index_in_buffer = index - # execute the play_media command on the player - queue_player = self.mass.players.get(queue_id) - need_multi_stream = ( - queue_player.provider in ("airplay", "ugp", "slimproto") - and len(queue_player.group_childs) > 1 - ) + queue.flow_mode_start_index = index + queue.flow_mode = False # reset player_prov = self.mass.players.get_player_provider(queue_id) - if need_multi_stream: - # handle special multi client stream - queue.flow_mode = True - stream_job = await self.mass.streams.create_multi_client_stream_job( - queue_id=queue_id, - start_queue_item=queue_item, - seek_position=int(seek_position), - fade_in=fade_in, - ) - await player_prov.cmd_handle_stream_job(player_id=queue_id, stream_job=stream_job) - return - # regular stream - queue.flow_mode = await self.mass.config.get_player_config_value( - queue.queue_id, CONF_FLOW_MODE - ) - url = await self.mass.streams.resolve_stream_url( - queue_id=queue_id, + await player_prov.play_media( + player_id=queue_id, queue_item=queue_item, seek_position=int(seek_position), fade_in=fade_in, - flow_mode=queue.flow_mode, - ) - await player_prov.cmd_play_url( - player_id=queue_id, - url=url, - # set queue_item to None if we're sending a flow mode url - # as the metadata is rather useless then - queue_item=None if queue.flow_mode else queue_item, ) # Interaction with player @@ -599,7 +569,11 @@ class PlayerQueuesController(CoreController): def on_player_update( self, player: Player, changed_values: dict[str, tuple[Any, Any]] # noqa: ARG002 ) -> None: - """Call when a PlayerQueue needs to be updated (e.g. when player updates).""" + """ + Call when a PlayerQueue needs to be updated (e.g. when player updates). + + NOTE: This is called every second if the player is playing. + """ if player.player_id not in self._queues: # race condition return @@ -613,37 +587,34 @@ class PlayerQueuesController(CoreController): queue.items = len(self._queue_items[queue_id]) # determine if this queue is currently active for this player queue.active = player.active_source == queue.queue_id - if queue.active: - queue.state = player.state - # update current item from player report - player_item_index = self._get_player_item_index(queue_id, player.current_url) - if player_item_index is not None: - if queue.flow_mode: - # flow mode active, calculate current item - current_index, item_time = self.__get_queue_stream_index( - queue, player, player_item_index - ) - else: - # queue is active and player has one of our tracks loaded, update state - current_index = player_item_index - item_time = int(player.corrected_elapsed_time) - # only update these attributes if the queue is active - # and has an item loaded so we are able to resume it - queue.current_index = current_index - queue.elapsed_time = item_time - queue.elapsed_time_last_updated = time.time() - queue.current_item = self.get_item(queue_id, queue.current_index) - queue.next_item = self.get_next_item(queue_id) - # correct elapsed time when seeking - if ( - queue.current_item - and queue.current_item.streamdetails - and queue.current_item.streamdetails.seconds_skipped - and not queue.flow_mode - ): - queue.elapsed_time += queue.current_item.streamdetails.seconds_skipped - else: + if not queue.active: queue.state = PlayerState.IDLE + return + # update current item from player report + if queue.flow_mode: + # flow mode active, calculate current item + queue.current_index, queue.elapsed_time = self.__get_queue_stream_index(queue, player) + else: + # queue is active and player has one of our tracks loaded, update state + if item_id := self._parse_player_current_item_id(queue_id, player.current_item_id): + queue.current_index = self.index_by_id(queue_id, item_id) + queue.elapsed_time = int(player.corrected_elapsed_time) + + # only update these attributes if the queue is active + # and has an item loaded so we are able to resume it + queue.state = player.state + queue.elapsed_time_last_updated = time.time() + queue.current_item = self.get_item(queue_id, queue.current_index) + queue.next_item = self._get_next_item(queue_id) + # correct elapsed time when seeking + if ( + queue.current_item + and queue.current_item.streamdetails + and queue.current_item.streamdetails.seconds_skipped + and not queue.flow_mode + ): + queue.elapsed_time += queue.current_item.streamdetails.seconds_skipped + # basic throttle: do not send state changed events if queue did not actually change prev_state = self._prev_states.get(queue_id, {}) new_state = queue.to_dict() @@ -653,6 +624,9 @@ class PlayerQueuesController(CoreController): # return early if nothing changed if len(changed_keys) == 0: return + # handle enqueuing of next item to play + if not queue.flow_mode: + self._check_enqueue_next(player, queue, prev_state, new_state) # do not send full updates if only time was updated if changed_keys == {"elapsed_time"}: self.mass.signal_event( @@ -692,58 +666,45 @@ class PlayerQueuesController(CoreController): self._queues.pop(player_id, None) self._queue_items.pop(player_id, None) - async def preload_next_url( - self, queue_id: str, current_item_id: str | None = None - ) -> tuple[str, QueueItem, bool]: - """Call when a player wants to load the next track/url into the buffer. + async def preload_next_item( + self, queue_id: str, current_item_id_or_index: str | int | None = None + ) -> QueueItem: + """Call when a player wants to (pre)load the next item into the buffer. - The result is a tuple of the next url + QueueItem to Play, - and a bool if the player should crossfade (if supported). Raises QueueEmpty if there are no more tracks left. """ queue = self.get(queue_id) if not queue: raise PlayerUnavailableError(f"PlayerQueue {queue_id} is not available") - if current_item_id: - cur_index = self.index_by_id(queue_id, current_item_id) or 0 - else: + if current_item_id_or_index is None: cur_index = queue.index_in_buffer or queue.current_index or 0 - cur_item = self.get_item(queue_id, cur_index) + elif isinstance(current_item_id_or_index, str): + cur_index = self.index_by_id(queue_id, current_item_id_or_index) + else: + cur_index = current_item_id_or_index idx = 0 while True: - next_index = self.get_next_index(queue_id, cur_index + idx) - next_item = self.get_item(queue_id, next_index) - if not cur_item or not next_item: + next_index = self._get_next_index(queue_id, cur_index + idx) + if next_index is None: raise QueueEmpty("No more tracks left in the queue.") + next_item = self.get_item(queue_id, next_index) try: # Check if the QueueItem is playable. For example, YT Music returns Radio Items # that are not playable which will stop playback. - next_item.streamdetails = await get_stream_details( - mass=self.mass, queue_item=next_item - ) + await set_stream_details(mass=self.mass, queue_item=next_item) # Lazy load the full MediaItem for the QueueItem, making sure to get the # maximum quality of thumbs next_item.media_item = await self.mass.music.get_item_by_uri(next_item.uri) break except MediaNotFoundError: # No stream details found, skip this QueueItem + next_item = None idx += 1 + if next_item is None: + raise QueueEmpty("No more (playable) tracks left in the queue.") queue.index_in_buffer = next_index - # work out crossfade - crossfade = queue.crossfade_enabled - if ( - cur_item.media_type == MediaType.TRACK - and next_item.media_type == MediaType.TRACK - and cur_item.media_item.album == next_item.media_item.album - ): - # disable crossfade if playing tracks from same album - # TODO: make this a bit more intelligent. - crossfade = False - url = await self.mass.streams.resolve_stream_url( - queue_id=queue_id, - queue_item=next_item, - ) - return (url, next_item, crossfade) + queue.next_track_enqueued = True + return next_item # Main queue manipulation methods @@ -798,43 +759,6 @@ class PlayerQueuesController(CoreController): return next((x for x in queue_items if x.queue_item_id == item_id_or_index), None) return None - def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None: - """Get index by queue_item_id.""" - queue_items = self._queue_items[queue_id] - for index, item in enumerate(queue_items): - if item.queue_item_id == queue_item_id: - return index - return None - - def get_next_index(self, queue_id: str, cur_index: int | None, is_skip: bool = False) -> int: - """Return the next index for the queue, accounting for repeat settings.""" - queue = self._queues[queue_id] - queue_items = self._queue_items[queue_id] - # handle repeat single track - if queue.repeat_mode == RepeatMode.ONE and not is_skip: - return cur_index - # handle repeat all - if ( - queue.repeat_mode == RepeatMode.ALL - and queue_items - and cur_index == (len(queue_items) - 1) - ): - return 0 - # simply return the next index. other logic is guarded to detect the index - # being higher than the number of items to detect end of queue and/or handle repeat. - if cur_index is None: - return 0 - next_index = cur_index + 1 - return next_index - - def get_next_item(self, queue_id: str, cur_index: int | None = None) -> QueueItem | None: - """Return next QueueItem for given queue.""" - queue = self._queues[queue_id] - if cur_index is None: - cur_index = queue.current_index - next_index = self.get_next_index(queue_id, cur_index) - return self.get_item(queue_id, next_index) - def signal_update(self, queue_id: str, items_changed: bool = False) -> None: """Signal state changed of given queue.""" queue = self._queues[queue_id] @@ -858,6 +782,44 @@ class PlayerQueuesController(CoreController): ) ) + def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None: + """Get index by queue_item_id.""" + queue_items = self._queue_items[queue_id] + for index, item in enumerate(queue_items): + if item.queue_item_id == queue_item_id: + return index + return None + + def _get_next_index( + self, queue_id: str, cur_index: int | None, is_skip: bool = False + ) -> int | None: + """ + Return the next index for the queue, accounting for repeat settings. + + Will return None if there are no (more) items in the queue. + """ + queue = self._queues[queue_id] + queue_items = self._queue_items[queue_id] + if not queue_items or cur_index is None: + # queue is empty + return None + if cur_index is None: + cur_index = queue.current_index + # handle repeat single track + if queue.repeat_mode == RepeatMode.ONE and not is_skip: + return cur_index + # handle cur_index is last index of the queue + if cur_index >= (len(queue_items) - 1): + # if repeat all is enabled, we simply start again from the beginning + return 0 if RepeatMode.ALL else None + return cur_index + 1 + + def _get_next_item(self, queue_id: str, cur_index: int | None = None) -> QueueItem | None: + """Return next QueueItem for given queue.""" + if (next_index := self._get_next_index(queue_id, cur_index)) is not None: + return self.get_item(queue_id, next_index) + return None + async def _fill_radio_tracks(self, queue_id: str) -> None: """Fill a Queue with (additional) Radio tracks.""" tracks = await self._get_radio_tracks(queue_id) @@ -869,6 +831,56 @@ class PlayerQueuesController(CoreController): insert_at_index=len(self._queue_items[queue_id]) - 1, ) + def _check_enqueue_next( + self, + player: Player, + queue: PlayerQueue, + prev_state: dict[str, Any], + new_state: dict[str, Any], + ) -> None: + """Check if we need to enqueue the next item to the player itself.""" + if not queue.active: + return + if prev_state.get("state") != PlayerState.PLAYING: + return + current_item = self.get_item(queue.queue_id, queue.current_index) + if not current_item: + return # guard, just in case something bad happened + if not current_item.duration: + return + if current_item.streamdetails and current_item.streamdetails.seconds_streamed: + duration = current_item.streamdetails.seconds_streamed + else: + duration = current_item.duration + seconds_remaining = duration - player.corrected_elapsed_time + + if PlayerFeature.ENQUEUE_NEXT in player.supported_features: + # player supports enqueue next feature. + # we enqueue the next track 15 seconds before the current track ends + end_of_track_reached = seconds_remaining <= 15 + else: + # player does not support enqueue next feature. + # we wait for the player to stop after it reaches the end of the track + prev_seconds_remaining = prev_state.get("seconds_remaining", seconds_remaining) + end_of_track_reached = prev_seconds_remaining <= 6 and queue.state == PlayerState.IDLE + new_state["seconds_remaining"] = seconds_remaining + + if not end_of_track_reached: + queue.next_track_enqueued = False # reset + return + if queue.next_track_enqueued: + return # already enqueued + + async def _enqueue_next(index: int): + player_prov = self.mass.players.get_player_provider(player.player_id) + with suppress(QueueEmpty): + next_item = await self.preload_next_item(queue.queue_id, index) + await player_prov.enqueue_next_queue_item( + player_id=player.player_id, queue_item=next_item + ) + + self.mass.create_task(_enqueue_next(queue.current_index)) + async def _get_radio_tracks(self, queue_id: str) -> list[MediaItemType]: """Call the registered music providers for dynamic tracks.""" queue = self._queues[queue_id] @@ -884,9 +896,7 @@ class PlayerQueuesController(CoreController): break return tracks - def __get_queue_stream_index( - self, queue: PlayerQueue, player: Player, start_index: int - ) -> tuple[int, int]: + def __get_queue_stream_index(self, queue: PlayerQueue, player: Player) -> tuple[int, int]: """Calculate current queue index and current track elapsed time.""" # player is playing a constant stream so we need to do this the hard way queue_index = 0 @@ -894,9 +904,9 @@ class PlayerQueuesController(CoreController): total_time = 0 track_time = 0 queue_items = self._queue_items[queue.queue_id] - if queue_items and len(queue_items) > start_index: + if queue_items and len(queue_items) > queue.flow_mode_start_index: # start_index: holds the position from which the flow stream started - queue_index = start_index + queue_index = queue.flow_mode_start_index queue_track = None while len(queue_items) > queue_index: # keep enumerating the queue tracks to find current track @@ -905,10 +915,12 @@ class PlayerQueuesController(CoreController): if not queue_track.streamdetails: track_time = elapsed_time_queue - total_time break - if queue_track.streamdetails.seconds_streamed is not None: - track_duration = queue_track.streamdetails.seconds_streamed - else: - track_duration = queue_track.duration or FALLBACK_DURATION + track_duration = ( + queue_track.streamdetails.seconds_streamed + or queue_track.streamdetails.duration + or queue_track.duration + or FALLBACK_DURATION + ) if elapsed_time_queue > (track_duration + total_time): # total elapsed time is more than (streamed) track duration # move index one up @@ -922,10 +934,13 @@ class PlayerQueuesController(CoreController): break return queue_index, track_time - def _get_player_item_index(self, queue_id: str, url: str) -> str | None: - """Parse (start) QueueItem ID from Player's current url.""" - if url and self.mass.streams.base_url in url and queue_id in url: - # try to extract the item id from the uri - current_item_id = url.rsplit("/")[-1].split(".")[0] - return self.index_by_id(queue_id, current_item_id) + def _parse_player_current_item_id(self, queue_id: str, current_item_id: str) -> str | None: + """Parse QueueItem ID from Player's current url.""" + if not current_item_id: + return None + if queue_id in current_item_id: + # try to extract the item id from either a url or queue_id/item_id combi + current_item_id = current_item_id.rsplit("/")[-1].split(".")[0] + if self.get_item(queue_id, current_item_id): + return current_item_id return None diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index 83576ed4..6701dff1 100755 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -328,6 +328,11 @@ class PlayerController(CoreController): - player_id: player_id of the player to handle the command. """ player_id = self._check_redirect(player_id) + player = self.get(player_id, True) + if PlayerFeature.PAUSE not in player.supported_features: + # if player does not support pause, we need to send stop + await self.cmd_stop(player_id) + return player_provider = self.get_player_provider(player_id) await player_provider.cmd_pause(player_id) @@ -636,24 +641,16 @@ class PlayerController(CoreController): if group_players := self._get_player_groups(player.player_id): # prefer the first playing (or paused) group parent for group_player in group_players: - # if the group player's playerid is within the curtrent url, + # if the group player's playerid is within the current_item_id # this group is definitely active - if player.current_url and group_player.player_id in player.current_url: + if player.current_item_id and group_player.player_id in player.current_item_id: return group_player.player_id # fallback to the first powered group player for group_player in group_players: if group_player.powered: return group_player.player_id - # guess source from player's current url - if player.current_url and player.state in (PlayerState.PLAYING, PlayerState.PAUSED): - if self.mass.streams.base_url in player.current_url: - return player.player_id - if ":" in player.current_url: - # extract source from uri/url - return player.current_url.split(":")[0] - return player.current_url - # defaults to the player's own player id - return player.player_id + # defaults to the player's own player id if not active source set + return player.active_source or player.player_id def _get_group_volume_level(self, player: Player) -> int: """Calculate a group volume from the grouped members.""" diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index cdde438c..fe485d07 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -32,19 +32,19 @@ from music_assistant.common.models.queue_item import QueueItem from music_assistant.constants import ( CONF_BIND_IP, CONF_BIND_PORT, + CONF_CROSSFADE, CONF_CROSSFADE_DURATION, CONF_EQ_BASS, CONF_EQ_MID, CONF_EQ_TREBLE, CONF_OUTPUT_CHANNELS, - CONF_OUTPUT_CODEC, CONF_PUBLISH_IP, ) from music_assistant.server.helpers.audio import ( check_audio_support, crossfade_pcm_parts, get_media_stream, - get_stream_details, + set_stream_details, ) from music_assistant.server.helpers.process import AsyncProcess from music_assistant.server.helpers.util import get_ips @@ -132,16 +132,8 @@ class MultiClientStreamJob: with suppress(asyncio.QueueFull): sub_queue.put_nowait(b"") - async def resolve_stream_url( - self, - child_player_id: str, - ) -> str: + def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str: """Resolve the childplayer specific stream URL to this streamjob.""" - output_codec = ContentType( - await self.stream_controller.mass.config.get_player_config_value( - child_player_id, CONF_OUTPUT_CODEC - ) - ) fmt = output_codec.value # handle raw pcm if output_codec.is_pcm(): @@ -149,8 +141,8 @@ class MultiClientStreamJob: player_max_bit_depth = 24 if player.supports_24bit else 16 output_sample_rate = min(self.pcm_format.sample_rate, player.max_sample_rate) output_bit_depth = min(self.pcm_format.bit_depth, player_max_bit_depth) - output_channels = await self.stream_controller.mass.config.get_player_config_value( - child_player_id, CONF_OUTPUT_CHANNELS + output_channels = self.stream_controller.mass.config.get_raw_player_config_value( + child_player_id, CONF_OUTPUT_CHANNELS, "stereo" ) channels = 1 if output_channels != "stereo" else 2 fmt += ( @@ -373,35 +365,31 @@ class StreamsController(CoreController): async def resolve_stream_url( self, - queue_id: str, queue_item: QueueItem, + output_codec: ContentType, seek_position: int = 0, fade_in: bool = False, flow_mode: bool = False, ) -> str: - """Resolve the (regular, single player) stream URL for the given QueueItem. - - This is called just-in-time by the Queue controller to get the URL to the audio. - """ - output_codec = ContentType( - await self.mass.config.get_player_config_value(queue_id, CONF_OUTPUT_CODEC) - ) + """Resolve the stream URL for the given QueueItem.""" fmt = output_codec.value # handle raw pcm if output_codec.is_pcm(): - player = self.mass.players.get(queue_id) + player = self.mass.players.get(queue_item.queue_id) player_max_bit_depth = 24 if player.supports_24bit else 16 if flow_mode: output_sample_rate = min(FLOW_MAX_SAMPLE_RATE, player.max_sample_rate) output_bit_depth = min(FLOW_MAX_BIT_DEPTH, player_max_bit_depth) else: - streamdetails = await get_stream_details(self.mass, queue_item) + await set_stream_details(self.mass, queue_item) output_sample_rate = min( - streamdetails.audio_format.sample_rate, player.max_sample_rate + queue_item.streamdetails.audio_format.sample_rate, player.max_sample_rate ) - output_bit_depth = min(streamdetails.audio_format.bit_depth, player_max_bit_depth) - output_channels = await self.mass.config.get_player_config_value( - queue_id, CONF_OUTPUT_CHANNELS + output_bit_depth = min( + queue_item.streamdetails.audio_format.bit_depth, player_max_bit_depth + ) + output_channels = self.mass.config.get_raw_player_config_value( + queue_item.queue_id, CONF_OUTPUT_CHANNELS, "stereo" ) channels = 1 if output_channels != "stereo" else 2 fmt += ( @@ -410,7 +398,7 @@ class StreamsController(CoreController): ) query_params = {} base_path = "flow" if flow_mode else "single" - url = f"{self._server.base_url}/{queue_id}/{base_path}/{queue_item.queue_item_id}.{fmt}" + url = f"{self._server.base_url}/{queue_item.queue_id}/{base_path}/{queue_item.queue_item_id}.{fmt}" # noqa: E501 if seek_position: query_params["seek_position"] = str(seek_position) if fade_in: @@ -470,7 +458,7 @@ class StreamsController(CoreController): if not queue_item: raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}") try: - streamdetails = await get_stream_details(self.mass, queue_item=queue_item) + await set_stream_details(self.mass, queue_item=queue_item) except MediaNotFoundError: raise web.HTTPNotFound( reason=f"Unable to retrieve streamdetails for item: {queue_item}" @@ -481,8 +469,8 @@ class StreamsController(CoreController): output_format = await self._get_output_format( output_format_str=request.match_info["fmt"], queue_player=queue_player, - default_sample_rate=streamdetails.audio_format.sample_rate, - default_bit_depth=streamdetails.audio_format.bit_depth, + default_sample_rate=queue_item.streamdetails.audio_format.sample_rate, + default_bit_depth=queue_item.streamdetails.audio_format.bit_depth, ) # prepare request, add some DLNA/UPNP compatible headers @@ -508,9 +496,11 @@ class StreamsController(CoreController): # collect player specific ffmpeg args to re-encode the source PCM stream pcm_format = AudioFormat( - content_type=ContentType.from_bit_depth(streamdetails.audio_format.bit_depth), - sample_rate=streamdetails.audio_format.sample_rate, - bit_depth=streamdetails.audio_format.bit_depth, + content_type=ContentType.from_bit_depth( + queue_item.streamdetails.audio_format.bit_depth + ), + sample_rate=queue_item.streamdetails.audio_format.sample_rate, + bit_depth=queue_item.streamdetails.audio_format.bit_depth, ) ffmpeg_args = await self._get_player_ffmpeg_args( queue_player, @@ -524,7 +514,7 @@ class StreamsController(CoreController): try: async for chunk in get_media_stream( self.mass, - streamdetails=streamdetails, + streamdetails=queue_item.streamdetails, pcm_format=pcm_format, seek_position=seek_position, fade_in=fade_in, @@ -766,43 +756,35 @@ class StreamsController(CoreController): queue_track = None last_fadeout_part = b"" total_bytes_written = 0 - self.logger.info("Start Queue Flow stream for Queue %s", queue.display_name) + queue.flow_mode = True + use_crossfade = self.mass.config.get_raw_player_config_value( + queue.queue_id, CONF_CROSSFADE, False + ) + pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2) + self.logger.info( + "Start Queue Flow stream for Queue %s - crossfade: %s", + queue.display_name, + use_crossfade, + ) while True: # get (next) queue item to stream if queue_track is None: queue_track = start_queue_item - use_crossfade = queue.crossfade_enabled + await set_stream_details(self.mass, queue_track) else: seek_position = 0 fade_in = False try: - ( - _, - queue_track, - use_crossfade, - ) = await self.mass.player_queues.preload_next_url(queue.queue_id) + queue_track = await self.mass.player_queues.preload_next_item(queue.queue_id) except QueueEmpty: break - # get streamdetails - try: - streamdetails = await get_stream_details(self.mass, queue_track) - except MediaNotFoundError as err: - # streamdetails retrieval failed, skip to next track instead of bailing out... - self.logger.warning( - "Skip track %s due to missing streamdetails", - queue_track.name, - exc_info=err, - ) - continue - self.logger.debug( - "Start Streaming queue track: %s (%s) for queue %s - crossfade: %s", - streamdetails.uri, + "Start Streaming queue track: %s (%s) for queue %s", + queue_track.streamdetails.uri, queue_track.name, queue.display_name, - use_crossfade, ) # set some basic vars @@ -813,22 +795,28 @@ class StreamsController(CoreController): crossfade_size = int(pcm_sample_size * crossfade_duration) queue_track.streamdetails.seconds_skipped = seek_position buffer_size = crossfade_size if use_crossfade else int(pcm_sample_size * 2) - - buffer = b"" bytes_written = 0 + buffer = b"" chunk_num = 0 # handle incoming audio chunks async for chunk in get_media_stream( self.mass, - streamdetails, + queue_track.streamdetails, pcm_format=pcm_format, seek_position=seek_position, fade_in=fade_in, - # only allow strip silence from begin if track is being crossfaded - strip_silence_begin=last_fadeout_part != b"", + # strip silence from begin/end if track is being crossfaded + strip_silence_begin=use_crossfade, + strip_silence_end=use_crossfade, ): chunk_num += 1 + # throttle buffer, do not allow more than 30 seconds in buffer + seconds_buffered = total_bytes_written / pcm_sample_size + player = self.mass.players.get(queue.queue_id) + while (seconds_buffered - player.corrected_elapsed_time) > 30: + await asyncio.sleep(1) + #### HANDLE FIRST PART OF TRACK # buffer full for crossfade @@ -869,12 +857,6 @@ class StreamsController(CoreController): #### HANDLE END OF TRACK - if bytes_written == 0: - # stream error: got empty first chunk ?! - self.logger.warning("Stream error on %s", streamdetails.uri) - queue_track.streamdetails.seconds_streamed = 0 - continue - if buffer and use_crossfade: # if crossfade is enabled, save fadeout part to pickup for next track last_fadeout_part = buffer[-crossfade_size:] @@ -886,14 +868,18 @@ class StreamsController(CoreController): yield buffer bytes_written += len(buffer) - # end of the track reached - store accurate duration + # update duration details based on the actual pcm data we sent + # this also accounts for crossfade and silence stripping queue_track.streamdetails.seconds_streamed = bytes_written / pcm_sample_size - total_bytes_written += bytes_written + queue_track.streamdetails.duration = ( + seek_position + queue_track.streamdetails.seconds_streamed + ) self.logger.debug( - "Finished Streaming queue track: %s (%s) on queue %s", + "Finished Streaming queue track: %s (%s) on queue %s - seconds streamed: %s", queue_track.streamdetails.uri, queue_track.name, queue.display_name, + queue_track.streamdetails.seconds_streamed, ) self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name) @@ -905,7 +891,6 @@ class StreamsController(CoreController): output_format: AudioFormat, ) -> list[str]: """Get player specific arguments for the given (pcm) input and output details.""" - player_conf = await self.mass.config.get_player_config(player.player_id) # generic args generic_args = [ "ffmpeg", @@ -954,16 +939,28 @@ class StreamsController(CoreController): # the below is a very basic 3-band equalizer, # this could be a lot more sophisticated at some point - if eq_bass := player_conf.get_value(CONF_EQ_BASS): + if ( + eq_bass := self.mass.config.get_raw_player_config_value( + player.player_id, CONF_EQ_BASS, 0 + ) + ) != 0: filter_params.append(f"equalizer=frequency=100:width=200:width_type=h:gain={eq_bass}") - if eq_mid := player_conf.get_value(CONF_EQ_MID): + if ( + eq_mid := self.mass.config.get_raw_player_config_value(player.player_id, CONF_EQ_MID, 0) + ) != 0: filter_params.append(f"equalizer=frequency=900:width=1800:width_type=h:gain={eq_mid}") - if eq_treble := player_conf.get_value(CONF_EQ_TREBLE): + if ( + eq_treble := self.mass.config.get_raw_player_config_value( + player.player_id, CONF_EQ_TREBLE, 0 + ) + ) != 0: filter_params.append( f"equalizer=frequency=9000:width=18000:width_type=h:gain={eq_treble}" ) # handle output mixing only left or right - conf_channels = player_conf.get_value(CONF_OUTPUT_CHANNELS) + conf_channels = self.mass.config.get_raw_player_config_value( + player.player_id, CONF_OUTPUT_CHANNELS, "stereo" + ) if conf_channels == "left": filter_params.append("pan=mono|c0=FL") elif conf_channels == "right": @@ -1008,8 +1005,8 @@ class StreamsController(CoreController): output_sample_rate = min(default_sample_rate, queue_player.max_sample_rate) player_max_bit_depth = 24 if queue_player.supports_24bit else 16 output_bit_depth = min(default_bit_depth, player_max_bit_depth) - output_channels_str = await self.mass.config.get_player_config_value( - queue_player.player_id, CONF_OUTPUT_CHANNELS + output_channels_str = self.mass.config.get_raw_player_config_value( + queue_player.player_id, CONF_OUTPUT_CHANNELS, "stereo" ) output_channels = 1 if output_channels_str != "stereo" else 2 return AudioFormat( diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 44402061..62ae2b81 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -243,8 +243,8 @@ async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> N ) -async def get_stream_details(mass: MusicAssistant, queue_item: QueueItem) -> StreamDetails: - """Get streamdetails for the given QueueItem. +async def set_stream_details(mass: MusicAssistant, queue_item: QueueItem) -> None: + """Set streamdetails for the given QueueItem. This is called just-in-time when a PlayerQueue wants a MediaItem to be played. Do not try to request streamdetails in advance as this is expiring data. @@ -301,10 +301,8 @@ async def get_stream_details(mass: MusicAssistant, queue_item: QueueItem) -> Str and streamdetails.data.startswith("http") ): streamdetails.direct = streamdetails.data - # set streamdetails as attribute on the media_item - # this way the app knows what content is playing + # set streamdetails as attribute on the queue_item queue_item.streamdetails = streamdetails - return streamdetails async def get_gain_correct( @@ -494,6 +492,7 @@ async def get_media_stream( # noqa: PLR0915 # update duration details based on the actual pcm data we sent streamdetails.seconds_streamed = bytes_sent / pcm_sample_size + streamdetails.duration = seek_position + streamdetails.seconds_streamed except (asyncio.CancelledError, GeneratorExit) as err: LOGGER.debug("media stream aborted for: %s", streamdetails.uri) diff --git a/music_assistant/server/helpers/util.py b/music_assistant/server/helpers/util.py index b02e2baf..4627e500 100644 --- a/music_assistant/server/helpers/util.py +++ b/music_assistant/server/helpers/util.py @@ -9,6 +9,7 @@ import tempfile import urllib.error import urllib.parse import urllib.request +from collections.abc import Iterator from functools import lru_cache from importlib.metadata import PackageNotFoundError from importlib.metadata import version as pkg_version @@ -102,3 +103,9 @@ def create_tempfile(): if platform.system() == "Linux": return memory_tempfile.MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) return tempfile.NamedTemporaryFile(buffering=0) + + +def divide_chunks(data: bytes, chunk_size: int) -> Iterator[bytes]: + """Chunk bytes data into smaller chunks.""" + for i in range(0, len(data), chunk_size): + yield data[i : i + chunk_size] diff --git a/music_assistant/server/models/player_provider.py b/music_assistant/server/models/player_provider.py index a6942ef6..29584b4e 100644 --- a/music_assistant/server/models/player_provider.py +++ b/music_assistant/server/models/player_provider.py @@ -4,14 +4,18 @@ from __future__ import annotations from abc import abstractmethod from typing import TYPE_CHECKING +from music_assistant.common.models.config_entries import ( + CONF_ENTRY_AUTO_PLAY, + CONF_ENTRY_VOLUME_NORMALIZATION, + CONF_ENTRY_VOLUME_NORMALIZATION_TARGET, +) from music_assistant.common.models.player import Player -from music_assistant.common.models.queue_item import QueueItem from .provider import Provider if TYPE_CHECKING: from music_assistant.common.models.config_entries import ConfigEntry, PlayerConfig - from music_assistant.server.controllers.streams import MultiClientStreamJob + from music_assistant.common.models.queue_item import QueueItem # ruff: noqa: ARG001, ARG002 @@ -24,7 +28,11 @@ class PlayerProvider(Provider): async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]: """Return all (provider/player specific) Config Entries for the given player (if any).""" - return tuple() + return ( + CONF_ENTRY_VOLUME_NORMALIZATION, + CONF_ENTRY_AUTO_PLAY, + CONF_ENTRY_VOLUME_NORMALIZATION_TARGET, + ) def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None: """Call (by config manager) when the configuration of a player changes.""" @@ -46,45 +54,52 @@ class PlayerProvider(Provider): - player_id: player_id of the player to handle the command. """ - @abstractmethod async def cmd_pause(self, player_id: str) -> None: """Send PAUSE command to given player. - player_id: player_id of the player to handle the command. """ + # will only be called for players with Pause feature set. - @abstractmethod - async def cmd_play_url( + async def play_media( self, player_id: str, - url: str, - queue_item: QueueItem | None, + queue_item: QueueItem, + seek_position: int, + fade_in: bool, ) -> None: - """Send PLAY URL command to given player. + """Handle PLAY MEDIA on given player. - This is called when the Queue wants the player to start playing a specific url. - If an item from the Queue is being played, the QueueItem will be provided with - all metadata present. + This is called by the Queue controller to start playing a queue item on the given player. + The provider's own implementation should work out how to handle this request. - player_id: player_id of the player to handle the command. - - url: the url that the player should start playing. - - queue_item: the QueueItem that is related to the URL (None when playing direct url). + - queue_item: The QueueItem that needs to be played on the player. + - seek_position: Optional seek to this position. + - fade_in: Optionally fade in the item at playback start. """ - async def cmd_handle_stream_job(self, player_id: str, stream_job: MultiClientStreamJob) -> None: - """Handle StreamJob play command on given player. + async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): + """ + Handle enqueuing of the next queue item on the player. - This is called when the Queue wants the player to start playing media - to multiple subscribers at the same time using a MultiClientStreamJob. - The default implementation is that the URL to the stream is resolved for the player - and played like any regular play_url command, but implementation may override - this behavior for any more sophisticated handling (e.g. when syncing etc.) + If the player supports PlayerFeature.ENQUE_NEXT: + This will be called about 10 seconds before the end of the track. + If the player does NOT report support for PlayerFeature.ENQUE_NEXT: + This will be called when the end of the track is reached. - - player_id: player_id of the player to handle the command. - - stream_job: the MultiClientStreamJob that the player should start playing. + A PlayerProvider implementation is in itself responsible for handling this + so that the queue items keep playing until its empty or the player stopped. + + This will NOT be called if the end of the queue is reached (and repeat disabled). + This will NOT be called if the player is using flow mode to playback the queue. """ - url = await stream_job.resolve_stream_url(player_id) - await self.cmd_play_url(player_id=player_id, url=url, queue_item=None) + # default implementation (for a player without enqueue_next feature): + # simply start playback of the next track. + # player providers need to override this behavior if/when needed + await self.play_media( + player_id=player_id, queue_item=queue_item, seek_position=0, fade_in=False + ) async def cmd_power(self, player_id: str, powered: bool) -> None: """Send POWER command to given player. diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 6c464a10..b56f4373 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -15,22 +15,17 @@ from typing import TYPE_CHECKING import aiofiles -from music_assistant.common.models.config_entries import ( - CONF_ENTRY_OUTPUT_CODEC, - ConfigEntry, - ConfigValueType, -) +from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType from music_assistant.common.models.enums import ConfigEntryType from music_assistant.common.models.player import DeviceInfo, Player -from music_assistant.common.models.queue_item import QueueItem from music_assistant.constants import CONF_LOG_LEVEL, CONF_PLAYERS from music_assistant.server.models.player_provider import PlayerProvider if TYPE_CHECKING: from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig from music_assistant.common.models.provider import ProviderManifest + from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant - from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType from music_assistant.server.providers.slimproto import SlimprotoProvider @@ -93,9 +88,6 @@ PLAYER_CONFIG_ENTRIES = ( "this number of seconds.", advanced=True, ), - ConfigEntry.from_dict( - {**CONF_ENTRY_OUTPUT_CODEC.to_dict(), "default_value": "flac", "hidden": True} - ), ) NEED_BRIDGE_RESTART = {"values/read_ahead", "values/encryption", "values/alac_encode", "enabled"} @@ -168,7 +160,7 @@ class AirplayProvider(PlayerProvider): """Return all (provider/player specific) Config Entries for the given player (if any).""" slimproto_prov = self.mass.get_provider("slimproto") base_entries = await slimproto_prov.get_player_config_entries(player_id) - return tuple(base_entries + PLAYER_CONFIG_ENTRIES) + return base_entries + PLAYER_CONFIG_ENTRIES def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None: """Call (by config manager) when the configuration of a player changes.""" @@ -200,35 +192,37 @@ class AirplayProvider(PlayerProvider): slimproto_prov = self.mass.get_provider("slimproto") await slimproto_prov.cmd_play(player_id) - async def cmd_play_url( + async def play_media( self, player_id: str, - url: str, - queue_item: QueueItem | None, + queue_item: QueueItem, + seek_position: int, + fade_in: bool, ) -> None: - """Send PLAY URL command to given player. + """Handle PLAY MEDIA on given player. - This is called when the Queue wants the player to start playing a specific url. - If an item from the Queue is being played, the QueueItem will be provided with - all metadata present. + This is called by the Queue controller to start playing a queue item on the given player. + The provider's own implementation should work out how to handle this request. - player_id: player_id of the player to handle the command. - - url: the url that the player should start playing. - - queue_item: the QueueItem that is related to the URL (None when playing direct url). + - queue_item: The QueueItem that needs to be played on the player. + - seek_position: Optional seek to this position. + - fade_in: Optionally fade in the item at playback start. """ # simply forward to underlying slimproto player slimproto_prov = self.mass.get_provider("slimproto") - await slimproto_prov.cmd_play_url( + await slimproto_prov.play_media( player_id, - url=url, queue_item=queue_item, + seek_position=seek_position, + fade_in=fade_in, ) - async def cmd_handle_stream_job(self, player_id: str, stream_job: MultiClientStreamJob) -> None: - """Handle StreamJob play command on given player.""" + async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): + """Handle enqueuing of the next queue item on the player.""" # simply forward to underlying slimproto player slimproto_prov = self.mass.get_provider("slimproto") - await slimproto_prov.cmd_handle_stream_job(player_id=player_id, stream_job=stream_job) + await slimproto_prov.enqueue_next_queue_item(player_id, queue_item) async def cmd_pause(self, player_id: str) -> None: """Send PAUSE command to given player.""" diff --git a/music_assistant/server/providers/chromecast/__init__.py b/music_assistant/server/providers/chromecast/__init__.py index 5cb1e885..4088bce0 100644 --- a/music_assistant/server/providers/chromecast/__init__.py +++ b/music_assistant/server/providers/chromecast/__init__.py @@ -20,21 +20,23 @@ from pychromecast.models import CastInfo from pychromecast.socket_client import CONNECTION_STATUS_CONNECTED, CONNECTION_STATUS_DISCONNECTED from music_assistant.common.models.config_entries import ( + CONF_ENTRY_CROSSFADE_DURATION, CONF_ENTRY_HIDE_GROUP_MEMBERS, ConfigEntry, ConfigValueType, ) from music_assistant.common.models.enums import ( ConfigEntryType, + ContentType, MediaType, PlayerFeature, PlayerState, PlayerType, ) -from music_assistant.common.models.errors import PlayerUnavailableError, QueueEmpty +from music_assistant.common.models.errors import PlayerUnavailableError from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.common.models.queue_item import QueueItem -from music_assistant.constants import CONF_LOG_LEVEL, CONF_PLAYERS, MASS_LOGO_ONLINE +from music_assistant.constants import CONF_CROSSFADE, CONF_LOG_LEVEL, CONF_PLAYERS, MASS_LOGO_ONLINE from music_assistant.server.models.player_provider import PlayerProvider from .helpers import CastStatusListener, ChromecastInfo @@ -53,7 +55,17 @@ if TYPE_CHECKING: CONF_ALT_APP = "alt_app" -BASE_PLAYER_CONFIG_ENTRIES = ( +PLAYER_CONFIG_ENTRIES = ( + ConfigEntry( + key=CONF_CROSSFADE, + type=ConfigEntryType.BOOLEAN, + label="Enable crossfade", + default_value=False, + description="Enable a crossfade transition between (queue) tracks. \n" + "Note that Chromecast does not natively support crossfading so Music Assistant " + "uses a 'flow mode' workaround for this at the cost of on-player metadata.", + advanced=False, + ), ConfigEntry( key=CONF_ALT_APP, type=ConfigEntryType.BOOLEAN, @@ -63,6 +75,7 @@ BASE_PLAYER_CONFIG_ENTRIES = ( "the playback experience but may not work on non-Google hardware.", advanced=True, ), + CONF_ENTRY_CROSSFADE_DURATION, ) @@ -103,7 +116,6 @@ class CastPlayer: logger: Logger status_listener: CastStatusListener | None = None mz_controller: MultizoneController | None = None - next_url: str | None = None active_group: str | None = None current_queue_item_id: str | None = None @@ -161,7 +173,8 @@ class ChromecastProvider(PlayerProvider): async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]: """Return all (provider/player specific) Config Entries for the given player (if any).""" cast_player = self.castplayers.get(player_id) - entries = BASE_PLAYER_CONFIG_ENTRIES + base_entries = await super().get_player_config_entries(player_id) + entries = base_entries + PLAYER_CONFIG_ENTRIES if ( cast_player and cast_player.cast_info.is_audio_group @@ -187,26 +200,64 @@ class ChromecastProvider(PlayerProvider): castplayer = self.castplayers[player_id] await asyncio.to_thread(castplayer.cc.media_controller.play) - async def cmd_play_url( + async def cmd_pause(self, player_id: str) -> None: + """Send PAUSE command to given player.""" + castplayer = self.castplayers[player_id] + await asyncio.to_thread(castplayer.cc.media_controller.pause) + + async def cmd_power(self, player_id: str, powered: bool) -> None: + """Send POWER command to given player.""" + castplayer = self.castplayers[player_id] + # set mute_as_power feature for group members + if castplayer.player.type == PlayerType.GROUP: + for child_player_id in castplayer.player.group_childs: + if child_player := self.mass.players.get(child_player_id): + child_player.mute_as_power = powered + if powered: + await self._launch_app(castplayer) + else: + await asyncio.to_thread(castplayer.cc.quit_app) + + async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: + """Send VOLUME_SET command to given player.""" + castplayer = self.castplayers[player_id] + await asyncio.to_thread(castplayer.cc.set_volume, volume_level / 100) + + async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: + """Send VOLUME MUTE command to given player.""" + castplayer = self.castplayers[player_id] + await asyncio.to_thread(castplayer.cc.set_volume_muted, muted) + + async def play_media( self, player_id: str, - url: str, - queue_item: QueueItem | None, + queue_item: QueueItem, + seek_position: int, + fade_in: bool, ) -> None: - """Send PLAY URL command to given player. + """Handle PLAY MEDIA on given player. - This is called when the Queue wants the player to start playing a specific url. - If an item from the Queue is being played, the QueueItem will be provided with - all metadata present. + This is called by the Queue controller to start playing a queue item on the given player. + The provider's own implementation should work out how to handle this request. - player_id: player_id of the player to handle the command. - - url: the url that the player should start playing. - - queue_item: the QueueItem that is related to the URL (None when playing direct url). + - queue_item: The QueueItem that needs to be played on the player. + - seek_position: Optional seek to this position. + - fade_in: Optionally fade in the item at playback start. """ castplayer = self.castplayers[player_id] - - # in flow/direct url mode, we just send the url and the metadata is of no use - if not queue_item: + # Google cast does not support crossfading so we use flow mode to provide this feature + use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE) + url = await self.mass.streams.resolve_stream_url( + queue_item=queue_item, + output_codec=ContentType.FLAC, + seek_position=seek_position, + fade_in=fade_in, + flow_mode=use_flow_mode, + ) + if use_flow_mode: + # In flow mode, all queue tracks are sent to the player as continuous stream. + # This comes at the cost of metadata (cast does not support ICY metadata). await asyncio.to_thread( castplayer.cc.play_media, url, @@ -215,8 +266,8 @@ class ChromecastProvider(PlayerProvider): thumb=MASS_LOGO_ONLINE, ) return - - cc_queue_items = [self._create_queue_item(queue_item, url)] + # handle normal playback using the chromecast queue to play items one by one + cc_queue_items = [self._create_cc_queue_item(queue_item, url)] queuedata = { "type": "QUEUE_LOAD", "repeatMode": "REPEAT_OFF", # handled by our queue controller @@ -225,40 +276,27 @@ class ChromecastProvider(PlayerProvider): "startIndex": 0, # Item index to play after this request or keep same item if undefined "items": cc_queue_items, } - # make sure that media controller app is launched + # make sure that the media controller app is launched await self._launch_app(castplayer) # send queue info to the CC - castplayer.next_url = None media_controller = castplayer.cc.media_controller await asyncio.to_thread(media_controller.send_message, queuedata, True) - async def cmd_pause(self, player_id: str) -> None: - """Send PAUSE command to given player.""" + async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): + """Handle enqueuing of the next queue item on the player.""" castplayer = self.castplayers[player_id] - await asyncio.to_thread(castplayer.cc.media_controller.pause) - - async def cmd_power(self, player_id: str, powered: bool) -> None: - """Send POWER command to given player.""" - castplayer = self.castplayers[player_id] - # set mute_as_power feature for group members - if castplayer.player.type == PlayerType.GROUP: - for child_player_id in castplayer.player.group_childs: - if child_player := self.mass.players.get(child_player_id): - child_player.mute_as_power = powered - if powered: - await self._launch_app(castplayer) - else: - await asyncio.to_thread(castplayer.cc.quit_app) - - async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: - """Send VOLUME_SET command to given player.""" - castplayer = self.castplayers[player_id] - await asyncio.to_thread(castplayer.cc.set_volume, volume_level / 100) - - async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: - """Send VOLUME MUTE command to given player.""" - castplayer = self.castplayers[player_id] - await asyncio.to_thread(castplayer.cc.set_volume_muted, muted) + url = await self.mass.streams.resolve_stream_url( + queue_item=queue_item, + output_codec=ContentType.FLAC, + ) + queuedata = { + "type": "QUEUE_INSERT", + "insertBefore": None, + "items": [self._create_cc_queue_item(queue_item, url)], + } + media_controller = castplayer.cc.media_controller + queuedata["mediaSessionId"] = media_controller.status.media_session_id + await asyncio.to_thread(media_controller.send_message, queuedata, True) async def poll_player(self, player_id: str) -> None: """Poll player for state updates. @@ -358,6 +396,8 @@ class ChromecastProvider(PlayerProvider): PlayerFeature.POWER, PlayerFeature.VOLUME_MUTE, PlayerFeature.VOLUME_SET, + PlayerFeature.ENQUEUE_NEXT, + PlayerFeature.PAUSE, ), max_sample_rate=96000, supports_24bit=True, @@ -430,7 +470,6 @@ class ChromecastProvider(PlayerProvider): """Handle updated MediaStatus.""" castplayer.logger.debug("Received media status update: %s", status.player_state) # player state - prev_state = castplayer.player.state if status.player_is_playing: castplayer.player.state = PlayerState.PLAYING elif status.player_is_paused: @@ -445,36 +484,24 @@ class ChromecastProvider(PlayerProvider): else: castplayer.player.elapsed_time = status.current_time + # active source + if status.content_id and castplayer.player_id in status.content_id: + castplayer.player.active_source = castplayer.player_id + else: + castplayer.player.active_source = castplayer.cc.app_display_name + # current media - castplayer.player.current_url = status.content_id + castplayer.player.current_item_id = status.content_id self.mass.loop.call_soon_threadsafe(self.mass.players.update, castplayer.player_id) - # enqueue next item if player is almost at the end of the track - if ( # noqa: SIM114 - castplayer.player.state == PlayerState.PLAYING - and castplayer.player.active_source == castplayer.player.player_id - and (queue := self.mass.player_queues.get(castplayer.player_id)) - and (current_item := queue.current_item) - and current_item.duration - and (current_item.duration - castplayer.player.elapsed_time) <= 10 - ): - asyncio.run_coroutine_threadsafe(self._enqueue_next_track(castplayer), self.mass.loop) - # failsafe enqueue next item if player stopped at the end of the track - elif ( - castplayer.player.state == PlayerState.IDLE - and prev_state == PlayerState.PLAYING - and castplayer.player.active_source == castplayer.player.player_id - and castplayer.player.current_url == castplayer.next_url - ): - asyncio.run_coroutine_threadsafe(self._enqueue_next_track(castplayer), self.mass.loop) - # handle end of MA queue - set current item to None - elif ( + # handle end of MA queue - reset current_item_id + if ( castplayer.player.state == PlayerState.IDLE - and castplayer.player.current_url + and castplayer.player.current_item_id and (queue := self.mass.player_queues.get(castplayer.player_id)) and queue.next_item is None ): - castplayer.player.current_url = None + castplayer.player.current_item_id = None def on_new_connection_status(self, castplayer: CastPlayer, status: ConnectionStatus) -> None: """Handle updated ConnectionStatus.""" @@ -508,49 +535,6 @@ class ChromecastProvider(PlayerProvider): ### Helpers / utils - async def _enqueue_next_track(self, castplayer: CastPlayer) -> None: - """Enqueue the next track of the MA queue on the CC queue.""" - try: - next_url, next_item, _ = await self.mass.player_queues.preload_next_url( - castplayer.player_id, castplayer.current_queue_item_id - ) - except QueueEmpty: - return - - if castplayer.next_url == next_url: - return # already set ?! - castplayer.next_url = next_url - castplayer.current_queue_item_id = next_item.queue_item_id - - # in flow/direct url mode, we just send the url and the metadata is of no use - if not next_item: - await asyncio.to_thread( - castplayer.cc.play_media, - next_url, - content_type=f'audio/{next_url.split(".")[-1].split("?")[0]}', - title="Music Assistant", - thumb=MASS_LOGO_ONLINE, - enqueue=True, - media_info={ - "customData": { - "queue_item_id": "flow", - } - }, - ) - return - cc_queue_items = [self._create_queue_item(next_item, next_url)] - - queuedata = { - "type": "QUEUE_INSERT", - "insertBefore": None, - "items": cc_queue_items, - } - media_controller = castplayer.cc.media_controller - queuedata["mediaSessionId"] = media_controller.status.media_session_id - - await asyncio.sleep(0.5) # throttle commands to CC a bit or it will crash - await asyncio.to_thread(media_controller.send_message, queuedata, True) - async def _launch_app(self, castplayer: CastPlayer) -> None: """Launch the default Media Receiver App on a Chromecast.""" event = asyncio.Event() @@ -599,7 +583,7 @@ class ChromecastProvider(PlayerProvider): castplayer.status_listener = None self.castplayers.pop(castplayer.player_id, None) - def _create_queue_item(self, queue_item: QueueItem, stream_url: str): + def _create_cc_queue_item(self, queue_item: QueueItem, stream_url: str): """Create CC queue item from MA QueueItem.""" duration = int(queue_item.duration) if queue_item.duration else None image_url = self.mass.metadata.get_image_url(queue_item.image) if queue_item.image else "" diff --git a/music_assistant/server/providers/dlna/__init__.py b/music_assistant/server/providers/dlna/__init__.py index 7db38698..94f827fa 100644 --- a/music_assistant/server/providers/dlna/__init__.py +++ b/music_assistant/server/providers/dlna/__init__.py @@ -24,17 +24,23 @@ from async_upnp_client.profiles.dlna import DmrDevice, TransportState from async_upnp_client.search import async_search from async_upnp_client.utils import CaseInsensitiveDict -from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType +from music_assistant.common.models.config_entries import ( + CONF_ENTRY_CROSSFADE_DURATION, + CONF_ENTRY_FLOW_MODE, + ConfigEntry, + ConfigValueType, +) from music_assistant.common.models.enums import ( ConfigEntryType, + ContentType, PlayerFeature, PlayerState, PlayerType, ) -from music_assistant.common.models.errors import PlayerUnavailableError, QueueEmpty +from music_assistant.common.models.errors import PlayerUnavailableError from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.common.models.queue_item import QueueItem -from music_assistant.constants import CONF_PLAYERS +from music_assistant.constants import CONF_CROSSFADE, CONF_FLOW_MODE, CONF_PLAYERS from music_assistant.server.helpers.didl_lite import create_didl_metadata from music_assistant.server.models.player_provider import PlayerProvider @@ -46,13 +52,50 @@ if TYPE_CHECKING: from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType -PLAYER_FEATURES = ( - PlayerFeature.SET_MEMBERS, - PlayerFeature.SYNC, +BASE_PLAYER_FEATURES = ( PlayerFeature.VOLUME_MUTE, PlayerFeature.VOLUME_SET, ) +CONF_ENQUEUE_NEXT = "enqueue_next" +CONF_ENFORCE_MP3 = "enforce_mp3" + +PLAYER_CONFIG_ENTRIES = ( + ConfigEntry( + key=CONF_ENQUEUE_NEXT, + type=ConfigEntryType.BOOLEAN, + label="Player supports enqueue next/gapless", + default_value=False, + description="If the player supports enqueuing the next item for fluid/gapless playback. " + "\n\nUnfortunately this feature is missing or broken on many DLNA players. \n" + "Enable it with care. If music stops after one song, disable this setting.", + ), + ConfigEntry( + key=CONF_CROSSFADE, + type=ConfigEntryType.BOOLEAN, + label="Enable crossfade", + default_value=False, + description="Enable a crossfade transition between (queue) tracks. \n\n" + "Note that DLNA does not natively support crossfading so you need to enable " + "the 'flow mode' workaround to use crossfading with DLNA players.", + advanced=False, + depends_on=CONF_FLOW_MODE, + ), + CONF_ENTRY_FLOW_MODE, + CONF_ENTRY_CROSSFADE_DURATION, + ConfigEntry( + key=CONF_ENFORCE_MP3, + type=ConfigEntryType.BOOLEAN, + label="Enforce (lossy) mp3 stream", + default_value=False, + description="By default, Music Assistant sends lossless, high quality audio " + "to all players. Some players can not deal with that and require the stream to be packed " + "into a lossy mp3 codec. \n\n " + "Only enable when needed. Saves some bandwidth at the cost of audio quality.", + advanced=True, + ), +) + CONF_NETWORK_SCAN = "network_scan" _DLNAPlayerProviderT = TypeVar("_DLNAPlayerProviderT", bound="DLNAPlayerProvider") @@ -142,12 +185,7 @@ class DLNAPlayer: # Track BOOTID in SSDP advertisements for device changes bootid: int | None = None last_seen: float = field(default_factory=time.time) - next_url: str | None = None - next_item: QueueItem | None = None - supports_next_uri: bool | None = None - end_of_track_reached: float | None = None last_command: float = field(default_factory=time.time) - need_elapsed_time_workaround: bool = False def update_attributes(self): """Update attributes of the MA Player from DLNA state.""" @@ -160,7 +198,16 @@ class DLNAPlayer: self.player.volume_muted = self.device.is_volume_muted or False self.player.state = self.get_state(self.device) self.player.supported_features = self.get_supported_features(self.device) - self.player.current_url = self.device.current_track_uri or "" + self.player.current_item_id = self.device.current_track_uri or "" + if self.player.player_id in self.player.current_item_id: + self.player.active_source = self.player.player_id + elif "spotify" in self.player.current_item_id: + self.player.active_source = "spotify" + elif self.player.current_item_id.startswith("http"): + self.player.active_source = "http" + else: + # TODO: handle other possible sources here + self.player.active_source = None if self.device.media_position: # only update elapsed_time if the device actually reports it self.player.elapsed_time = float(self.device.media_position) @@ -168,15 +215,6 @@ class DLNAPlayer: self.player.elapsed_time_last_updated = ( self.device.media_position_updated_at.timestamp() ) - # some dlna players get stuck at the end of the track and won't - # automatically play the next track, try to workaround that - if ( - self.device.media_duration - and self.player.corrected_elapsed_time - and self.player.state == PlayerState.PLAYING - and (self.device.media_duration - self.player.corrected_elapsed_time) <= 10 - ): - self.end_of_track_reached = time.time() else: # device is unavailable self.player.available = False @@ -220,9 +258,6 @@ class DLNAPlayer: if device.has_volume_mute: supported_features.add(PlayerFeature.VOLUME_MUTE) - if device.can_seek_rel_time or device.can_seek_abs_time: - supported_features.add(PlayerFeature.SEEK) - return supported_features @@ -257,19 +292,28 @@ class DLNAPlayerProvider(PlayerProvider): for dlna_player in self.dlnaplayers.values(): tg.create_task(self._device_disconnect(dlna_player)) + async def get_player_config_entries( + self, player_id: str # noqa: ARG002 + ) -> tuple[ConfigEntry, ...]: + """Return all (provider/player specific) Config Entries for the given player (if any).""" + base_entries = await super().get_player_config_entries(player_id) + return base_entries + PLAYER_CONFIG_ENTRIES + def on_player_config_changed( self, config: PlayerConfig, changed_keys: set[str] # noqa: ARG002 ) -> None: """Call (by config manager) when the configuration of a player changes.""" # run discovery to catch any re-enabled players self.mass.create_task(self._run_discovery()) + # reset player features based on config values + if not (dlna_player := self.dlnaplayers.get(config.player_id)): + return + self._set_player_features(dlna_player) @catch_request_errors async def cmd_stop(self, player_id: str) -> None: """Send STOP command to given player.""" dlna_player = self.dlnaplayers[player_id] - dlna_player.end_of_track_reached = None - dlna_player.next_url = None assert dlna_player.device is not None await dlna_player.device.async_stop() @@ -281,31 +325,42 @@ class DLNAPlayerProvider(PlayerProvider): await dlna_player.device.async_play() @catch_request_errors - async def cmd_play_url( + async def play_media( self, player_id: str, - url: str, - queue_item: QueueItem | None, + queue_item: QueueItem, + seek_position: int, + fade_in: bool, ) -> None: - """Send PLAY URL command to given player. + """Handle PLAY MEDIA on given player. - This is called when the Queue wants the player to start playing a specific url. - If an item from the Queue is being played, the QueueItem will be provided with - all metadata present. + This is called by the Queue controller to start playing a queue item on the given player. + The provider's own implementation should work out how to handle this request. - player_id: player_id of the player to handle the command. - - url: the url that the player should start playing. - - queue_item: the QueueItem that is related to the URL (None when playing direct url). + - queue_item: The QueueItem that needs to be played on the player. + - seek_position: Optional seek to this position. + - fade_in: Optionally fade in the item at playback start. """ + # DLNA players do not support crossfading so we enforce flow mode to provide this feature + use_flow_mode = await self.mass.config.get_player_config_value( + player_id, CONF_FLOW_MODE + ) or await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE) + enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) + url = await self.mass.streams.resolve_stream_url( + queue_item=queue_item, + output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, + seek_position=seek_position, + fade_in=fade_in, + flow_mode=use_flow_mode, + ) dlna_player = self.dlnaplayers[player_id] - # always clear queue (by sending stop) first if dlna_player.device.can_stop: await self.cmd_stop(player_id) - dlna_player.next_url = None - dlna_player.end_of_track_reached = None - - didl_metadata = create_didl_metadata(self.mass, url, queue_item) + didl_metadata = create_didl_metadata( + self.mass, url, queue_item if not use_flow_mode else None + ) title = queue_item.name if queue_item else "Music Assistant" await dlna_player.device.async_set_transport_uri(url, title, didl_metadata) # Play it @@ -316,13 +371,55 @@ class DLNAPlayerProvider(PlayerProvider): dlna_player.player.elapsed_time = 0 dlna_player.player.elapsed_time_last_updated = now await dlna_player.device.async_play() - # force poll the device for sleep in (1, 2): await asyncio.sleep(sleep) dlna_player.force_poll = True await self.poll_player(dlna_player.udn) + @catch_request_errors + async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): + """ + Handle enqueuing of the next queue item on the player. + + If the player supports PlayerFeature.ENQUE_NEXT: + This will be called about 10 seconds before the end of the track. + If the player does NOT report support for PlayerFeature.ENQUE_NEXT: + This will be called when the end of the track is reached. + + A PlayerProvider implementation is in itself responsible for handling this + so that the queue items keep playing until its empty or the player stopped. + + This will NOT be called if the end of the queue is reached (and repeat disabled). + This will NOT be called if flow mode is enabled on the queue. + """ + dlna_player = self.dlnaplayers[player_id] + url = await self.mass.streams.resolve_stream_url( + queue_item=queue_item, + output_codec=ContentType.FLAC, + ) + didl_metadata = create_didl_metadata(self.mass, url, queue_item) + title = queue_item.name + if self.mass.config.get_raw_player_config_value(player_id, CONF_ENQUEUE_NEXT, False): + # use the 'next_transport_uri' feature + await dlna_player.device.async_set_next_transport_uri(url, title, didl_metadata) + self.logger.debug( + "Enqued next track (%s) to player %s", + title, + dlna_player.player.display_name, + ) + else: + # simply use regular play command + await dlna_player.device.async_set_transport_uri(url, title, didl_metadata) + # Play it + await dlna_player.device.async_wait_for_can_play(10) + # optimistically set this timestamp to help in case of a player + # that does not report the progress + now = time.time() + dlna_player.player.elapsed_time = 0 + dlna_player.player.elapsed_time_last_updated = now + await dlna_player.device.async_play() + @catch_request_errors async def cmd_pause(self, player_id: str) -> None: """Send PAUSE command to given player.""" @@ -416,6 +513,9 @@ class DLNAPlayerProvider(PlayerProvider): if ssdp_udn in discovered_devices: # already processed this device return + if "rincon" in ssdp_udn.lower(): + # ignore Sonos devices + return discovered_devices.add(ssdp_udn) @@ -466,10 +566,9 @@ class DLNAPlayerProvider(PlayerProvider): dlna_player.description_url = description_url else: # new player detected, setup our DLNAPlayer wrapper - - # ignore disabled players conf_key = f"{CONF_PLAYERS}/{udn}/enabled" enabled = self.mass.config.get(conf_key, True) + # ignore disabled players if not enabled: self.logger.debug("Ignoring disabled player: %s", udn) return @@ -485,7 +584,6 @@ class DLNAPlayerProvider(PlayerProvider): name=udn, available=False, powered=False, - supported_features=PLAYER_FEATURES, # device info will be discovered later after connect device_info=DeviceInfo( model="unknown", @@ -503,6 +601,7 @@ class DLNAPlayerProvider(PlayerProvider): await self._device_connect(dlna_player) + self._set_player_features(dlna_player) dlna_player.update_attributes() self.mass.players.register_or_update(dlna_player.player) @@ -577,50 +676,12 @@ class DLNAPlayerProvider(PlayerProvider): dlna_player.last_seen = time.time() self.mass.create_task(self._update_player(dlna_player)) - async def _enqueue_next_track(self, dlna_player: DLNAPlayer) -> None: - """Enqueue the next track of the MA queue on the CC queue.""" - try: - ( - next_url, - next_item, - _, - ) = await self.mass.player_queues.preload_next_url(dlna_player.udn) - except QueueEmpty: - return - - if dlna_player.next_url == next_url: - return # already set ?! - dlna_player.next_url = next_url - dlna_player.next_item = next_item - - # no need to try setting the next url if we already know the player does not support it - if dlna_player.supports_next_uri is False: - return - - # send queue item to dlna queue - didl_metadata = create_didl_metadata(self.mass, next_url, next_item) - title = next_item.name if next_item else "Music Assistant" - try: - await dlna_player.device.async_set_next_transport_uri(next_url, title, didl_metadata) - except UpnpError: - dlna_player.supports_next_uri = False - self.logger.info( - "Player does not support next transport uri feature, " - "gapless playback is not possible." - ) - - self.logger.debug( - "Enqued next track (%s) to player %s", - title, - dlna_player.player.display_name, - ) - async def _update_player(self, dlna_player: DLNAPlayer) -> None: """Update DLNA Player.""" - prev_url = dlna_player.player.current_url + prev_url = dlna_player.player.current_item_id prev_state = dlna_player.player.state dlna_player.update_attributes() - current_url = dlna_player.player.current_url + current_url = dlna_player.player.current_item_id current_state = dlna_player.player.state if (prev_url != current_url) or (prev_state != current_state): @@ -630,30 +691,11 @@ class DLNAPlayerProvider(PlayerProvider): # let the MA player manager work out if something actually updated self.mass.players.update(dlna_player.udn) - # enqueue next item if needed - if ( - dlna_player.player.state == PlayerState.PLAYING - and dlna_player.player.active_source == dlna_player.player.player_id - and dlna_player.next_url in (None, dlna_player.player.current_url) - # prevent race conditions at start/stop by doing this check - and (time.time() - dlna_player.last_command) > 4 - ): - self.mass.create_task(self._enqueue_next_track(dlna_player)) - # if player does not support next uri, manual play it - if ( - ( - dlna_player.supports_next_uri is False - or (dlna_player.supports_next_uri is None and dlna_player.end_of_track_reached) - ) - and prev_state == PlayerState.PLAYING - and current_state == PlayerState.IDLE - and dlna_player.next_url - ): - self.logger.warning( - "Player does not support next_uri and end of track reached, " - "sending next url manually." + def _set_player_features(self, dlna_player: DLNAPlayer) -> None: + """Set Player Features based on config values and capabilities.""" + dlna_player.player.supported_features = BASE_PLAYER_FEATURES + player_id = dlna_player.player.player_id + if self.mass.config.get_raw_player_config_value(player_id, CONF_ENQUEUE_NEXT, False): + dlna_player.player.supported_features = dlna_player.player.supported_features + ( + PlayerFeature.ENQUEUE_NEXT, ) - await self.cmd_play_url(dlna_player.udn, dlna_player.next_url, dlna_player.next_item) - dlna_player.end_of_track_reached = False - dlna_player.next_url = None - dlna_player.supports_next_uri = False diff --git a/music_assistant/server/providers/slimproto/__init__.py b/music_assistant/server/providers/slimproto/__init__.py index d3e10b1a..a0e85a97 100644 --- a/music_assistant/server/providers/slimproto/__init__.py +++ b/music_assistant/server/providers/slimproto/__init__.py @@ -5,7 +5,7 @@ import asyncio import statistics import time from collections import deque -from collections.abc import Callable, Coroutine, Generator +from collections.abc import Callable, Coroutine from contextlib import suppress from dataclasses import dataclass from typing import TYPE_CHECKING, Any @@ -17,7 +17,11 @@ from aioslimproto.const import EventType as SlimEventType from aioslimproto.discovery import start_discovery from music_assistant.common.models.config_entries import ( - CONF_ENTRY_OUTPUT_CODEC, + CONF_ENTRY_CROSSFADE, + CONF_ENTRY_EQ_BASS, + CONF_ENTRY_EQ_MID, + CONF_ENTRY_EQ_TREBLE, + CONF_ENTRY_OUTPUT_CHANNELS, ConfigEntry, ConfigValueOption, ConfigValueType, @@ -32,7 +36,7 @@ from music_assistant.common.models.enums import ( from music_assistant.common.models.errors import QueueEmpty, SetupFailedError from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.common.models.queue_item import QueueItem -from music_assistant.constants import CONF_CROSSFADE_DURATION, CONF_PORT +from music_assistant.constants import CONF_CROSSFADE, CONF_CROSSFADE_DURATION, CONF_PORT from music_assistant.server.models.player_provider import PlayerProvider from .cli import LmsCli @@ -41,9 +45,12 @@ if TYPE_CHECKING: from music_assistant.common.models.config_entries import ProviderConfig from music_assistant.common.models.provider import ProviderManifest from music_assistant.server import MusicAssistant - from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType + +# monkey patch the SlimClient +SlimClient._process_stat_stmf = lambda x, y: None # noqa: ARG005 + CACHE_KEY_PREV_STATE = "slimproto_prev_state" # sync constants @@ -284,21 +291,13 @@ class SlimprotoProvider(PlayerProvider): async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: """Return all (provider/player specific) Config Entries for the given player (if any).""" - # pick default codec based on capabilities - default_codec = ContentType.PCM - if client := self._socket_clients.get(player_id): - for fmt, fmt_type in ( - ("flc", ContentType.FLAC), - ("pcm", ContentType.PCM), - ("mp3", ContentType.MP3), - ): - if fmt in client.supported_codecs: - default_codec = fmt_type - break + base_entries = await super().get_player_config_entries(player_id) + if not (client := self._socket_clients.get(player_id)): + return base_entries # create preset entries (for players that support it) preset_entries = tuple() - if not (client and client.device_model in self._virtual_providers): + if client.device_model not in self._virtual_providers: presets = [] async for playlist in self.mass.music.playlists.iter_library_items(True): presets.append(ConfigValueOption(playlist.name, playlist.uri)) @@ -327,27 +326,33 @@ class SlimprotoProvider(PlayerProvider): for index in range(1, preset_count + 1) ) - return preset_entries + ( - ConfigEntry( - key=CONF_SYNC_ADJUST, - type=ConfigEntryType.INTEGER, - range=(0, 1500), - default_value=0, - label="Audio synchronization delay correction", - description="If this player is playing audio synced with other players " - "and you always hear the audio too late on this player, " - "you can shift the audio a bit.", - advanced=True, - ), - CONF_ENTRY_CROSSFADE_DURATION, - ConfigEntry.from_dict( - {**CONF_ENTRY_OUTPUT_CODEC.to_dict(), "default_value": default_codec} - ), + return ( + base_entries + + preset_entries + + ( + CONF_ENTRY_CROSSFADE, + CONF_ENTRY_EQ_BASS, + CONF_ENTRY_EQ_MID, + CONF_ENTRY_EQ_TREBLE, + CONF_ENTRY_OUTPUT_CHANNELS, + CONF_ENTRY_CROSSFADE_DURATION, + ConfigEntry( + key=CONF_SYNC_ADJUST, + type=ConfigEntryType.INTEGER, + range=(0, 1500), + default_value=0, + label="Audio synchronization delay correction", + description="If this player is playing audio synced with other players " + "and you always hear the audio too late on this player, " + "you can shift the audio a bit.", + advanced=True, + ), + ) ) async def cmd_stop(self, player_id: str) -> None: """Send STOP command to given player.""" - # forward command to player and any connected sync child's + # forward command to player and any connected sync members for client in self._get_sync_clients(player_id): if client.state == SlimPlayerState.STOPPED: continue @@ -357,7 +362,7 @@ class SlimprotoProvider(PlayerProvider): async def cmd_play(self, player_id: str) -> None: """Send PLAY command to given player.""" - # forward command to player and any connected sync child's + # forward command to player and any connected sync members async with asyncio.TaskGroup() as tg: for client in self._get_sync_clients(player_id): if client.state not in ( @@ -368,64 +373,75 @@ class SlimprotoProvider(PlayerProvider): continue tg.create_task(client.play()) - async def cmd_play_url( + async def play_media( self, player_id: str, - url: str, - queue_item: QueueItem | None, + queue_item: QueueItem, + seek_position: int, + fade_in: bool, ) -> None: - """Send PLAY URL command to given player. + """Handle PLAY MEDIA on given player. - This is called when the Queue wants the player to start playing a specific url. - If an item from the Queue is being played, the QueueItem will be provided with - all metadata present. + This is called by the Queue controller to start playing a queue item on the given player. + The provider's own implementation should work out how to handle this request. - player_id: player_id of the player to handle the command. - - url: the url that the player should start playing. - - queue_item: the QueueItem that is related to the URL (None when playing direct url). + - queue_item: The QueueItem that needs to be played on the player. + - seek_position: Optional seek to this position. + - fade_in: Optionally fade in the item at playback start. """ - # send stop first - await self.cmd_stop(player_id) - player = self.mass.players.get(player_id) if player.synced_to: raise RuntimeError("A synced player cannot receive play commands directly") - - # forward command to player and any connected sync child's - sync_clients = [x for x in self._get_sync_clients(player_id)] - async with asyncio.TaskGroup() as tg: - for client in sync_clients: - tg.create_task( - self._handle_play_url( - client, - url=url, - queue_item=queue_item, - send_flush=True, - auto_play=len(sync_clients) == 1, - ) - ) - - async def cmd_handle_stream_job(self, player_id: str, stream_job: MultiClientStreamJob) -> None: - """Handle StreamJob play command on given player.""" - # send stop first + # stop any existing streams first await self.cmd_stop(player_id) - - player = self.mass.players.get(player_id) - if player.synced_to: - raise RuntimeError("A synced player cannot receive play commands directly") - sync_clients = [x for x in self._get_sync_clients(player_id)] - async with asyncio.TaskGroup() as tg: - for client in sync_clients: - url = await stream_job.resolve_stream_url(client.player_id) - tg.create_task( - self._handle_play_url( - client, - url=url, - queue_item=None, - send_flush=True, - auto_play=len(sync_clients) == 1, + if player.group_childs: + # player has sync members, we need to start a multi client stream job + stream_job = await self.mass.streams.create_multi_client_stream_job( + queue_id=queue_item.queue_id, + start_queue_item=queue_item, + seek_position=int(seek_position), + fade_in=fade_in, + ) + # forward command to player and any connected sync members + sync_clients = self._get_sync_clients(player_id) + async with asyncio.TaskGroup() as tg: + for client in sync_clients: + tg.create_task( + self._handle_play_url( + client, + url=stream_job.resolve_stream_url( + client.player_id, output_codec=ContentType.FLAC + ), + queue_item=None, + send_flush=True, + auto_play=False, + ) ) - ) + else: + # regular, single player playback + client = self._socket_clients[player_id] + url = await self.mass.streams.resolve_stream_url( + queue_item=queue_item, + # for now just hardcode flac as we assume that every (modern) + # slimproto based player can handle that just fine + output_codec=ContentType.FLAC, + seek_position=seek_position, + fade_in=fade_in, + flow_mode=False, + ) + await self._handle_play_url( + client, + url=url, + queue_item=queue_item, + send_flush=True, + auto_play=True, + ) + + async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): + """Handle enqueuing of the next queue item on the player.""" + # we don't have to do anything, + # enqueuing the next item is handled in the buffer ready callback async def _handle_play_url( self, @@ -433,12 +449,11 @@ class SlimprotoProvider(PlayerProvider): url: str, queue_item: QueueItem | None, send_flush: bool = True, - crossfade: bool = False, auto_play: bool = False, ) -> None: - """Handle PlayMedia on slimproto player(s).""" + """Handle playback of an url on slimproto player(s).""" player_id = client.player_id - if crossfade: + if crossfade := await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE): transition_duration = await self.mass.config.get_player_config_value( player_id, CONF_CROSSFADE_DURATION ) @@ -450,7 +465,7 @@ class SlimprotoProvider(PlayerProvider): mime_type=f"audio/{url.split('.')[-1].split('?')[0]}", metadata={"item_id": queue_item.queue_item_id, "title": queue_item.name} if queue_item - else None, + else {"item_id": client.player_id, "title": "Music Assistant"}, send_flush=send_flush, transition=SlimTransition.CROSSFADE if crossfade else SlimTransition.NONE, transition_duration=transition_duration, @@ -462,7 +477,7 @@ class SlimprotoProvider(PlayerProvider): async def cmd_pause(self, player_id: str) -> None: """Send PAUSE command to given player.""" - # forward command to player and any connected sync child's + # forward command to player and any connected sync members async with asyncio.TaskGroup() as tg: for client in self._get_sync_clients(player_id): if client.state not in ( @@ -590,7 +605,12 @@ class SlimprotoProvider(PlayerProvider): # update player state on player events player.available = True - player.current_url = client.current_url + player.current_item_id = ( + client.current_metadata.get("item_id") + if client.current_metadata + else client.current_url + ) + player.active_source = player.player_id player.name = client.name player.powered = client.powered player.state = STATE_MAP[client.state] @@ -632,7 +652,7 @@ class SlimprotoProvider(PlayerProvider): player = self.mass.players.get(client.player_id) sync_master_id = player.synced_to if not sync_master_id: - # we only correct sync child's, not the sync master itself + # we only correct sync members, not the sync master itself return if sync_master_id not in self._socket_clients: return # just here as a guard as bad things can happen @@ -712,24 +732,20 @@ class SlimprotoProvider(PlayerProvider): return if player.active_source != player.player_id: return - try: - next_url, next_item, crossfade = await self.mass.player_queues.preload_next_url( - client.player_id + with suppress(QueueEmpty): + next_item = await self.mass.player_queues.preload_next_item(client.player_id) + url = await self.mass.streams.resolve_stream_url( + queue_item=next_item, + output_codec=ContentType.FLAC, + flow_mode=False, + ) + await self._handle_play_url( + client, + url=url, + queue_item=next_item, + send_flush=False, + auto_play=True, ) - async with asyncio.TaskGroup() as tg: - for client in self._get_sync_clients(client.player_id): - tg.create_task( - self._handle_play_url( - client, - url=next_url, - queue_item=next_item, - send_flush=False, - crossfade=crossfade, - auto_play=True, - ) - ) - except QueueEmpty: - pass async def _handle_buffer_ready(self, client: SlimClient) -> None: """Handle buffer ready event, player has buffered a (new) track.""" @@ -825,15 +841,17 @@ class SlimprotoProvider(PlayerProvider): # https://wiki.slimdevices.com/index.php/SlimProto_TCP_protocol.html#u.2C_p.2C_a_.26_t_commands_and_replay_gain_field await client.send_strm(b"a", replay_gain=int(millis)) - def _get_sync_clients(self, player_id: str) -> Generator[SlimClient]: + def _get_sync_clients(self, player_id: str) -> list[SlimClient]: """Get all sync clients for a player.""" player = self.mass.players.get(player_id) + sync_clients: list[SlimClient] = [] # we need to return the player itself too group_child_ids = {player_id} group_child_ids.update(player.group_childs) for child_id in group_child_ids: if client := self._socket_clients.get(child_id): - yield client + sync_clients.append(client) + return sync_clients def _get_corrected_elapsed_milliseconds(self, client: SlimClient) -> int: """Return corrected elapsed milliseconds.""" diff --git a/music_assistant/server/providers/slimproto/cli.py b/music_assistant/server/providers/slimproto/cli.py index 85c8ff1f..4a5f1505 100644 --- a/music_assistant/server/providers/slimproto/cli.py +++ b/music_assistant/server/providers/slimproto/cli.py @@ -915,9 +915,6 @@ class LmsCli: new_repeat_mode = repeat_map.get(int(arg)) self.mass.player_queues.set_repeat(queue.queue_id, new_repeat_mode) return - if subcommand == "crossfade": - self.mass.player_queues.set_crossfade(queue.queue_id, bool(arg)) - return self.logger.warning("Unhandled command: playlist/%s", subcommand) diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index 4ca85d81..2660f223 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -2,34 +2,43 @@ from __future__ import annotations import asyncio +import random import time -import uuid -from typing import TYPE_CHECKING +from contextlib import suppress +from typing import TYPE_CHECKING, cast -from ffmpeg import FFmpegError -from ffmpeg.asyncio import FFmpeg from snapcast.control import create_server -from snapcast.control.client import Snapclient as SnapClient -from snapcast.control.group import Snapgroup as SnapGroup -from snapcast.control.stream import Snapstream as SnapStream - -from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType +from snapcast.control.client import Snapclient +from snapcast.control.group import Snapgroup +from snapcast.control.stream import Snapstream + +from music_assistant.common.models.config_entries import ( + CONF_ENTRY_CROSSFADE, + CONF_ENTRY_CROSSFADE_DURATION, + ConfigEntry, + ConfigValueType, +) from music_assistant.common.models.enums import ( ConfigEntryType, + ContentType, PlayerFeature, PlayerState, PlayerType, ) from music_assistant.common.models.errors import SetupFailedError +from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.common.models.queue_item import QueueItem from music_assistant.server.models.player_provider import PlayerProvider if TYPE_CHECKING: + from snapcast.control.server import Snapserver + from music_assistant.common.models.config_entries import ProviderConfig from music_assistant.common.models.provider import ProviderManifest from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType + CONF_SNAPCAST_SERVER_HOST = "snapcast_server_host" CONF_SNAPCAST_SERVER_CONTROL_PORT = "snapcast_server_control_port" @@ -84,14 +93,16 @@ async def get_config_entries( class SnapCastProvider(PlayerProvider): """Player provider for Snapcast based players.""" - _snapserver: [asyncio.Server | asyncio.BaseTransport] + _snapserver: Snapserver snapcast_server_host: str snapcast_server_control_port: int + _stream_tasks: dict[str, asyncio.Task] async def handle_setup(self) -> None: """Handle async initialization of the provider.""" self.snapcast_server_host = self.config.get_value(CONF_SNAPCAST_SERVER_HOST) self.snapcast_server_control_port = self.config.get_value(CONF_SNAPCAST_SERVER_CONTROL_PORT) + self._stream_tasks = {} try: self._snapserver = await create_server( self.mass.loop, @@ -105,8 +116,8 @@ class SnapCastProvider(PlayerProvider): f"Started Snapserver connection on:" f"{self.snapcast_server_host}:{self.snapcast_server_control_port}" ) - except OSError: - raise SetupFailedError("Unable to start the Snapserver connection ?") + except OSError as err: + raise SetupFailedError("Unable to start the Snapserver connection ?") from err def _handle_update(self) -> None: """Process Snapcast init Player/Group and set callback .""" @@ -118,18 +129,17 @@ class SnapCastProvider(PlayerProvider): for snap_group in self._snapserver.groups: snap_group.set_callback(self._handle_group_update) - def _handle_group_update(self, snap_group: SnapGroup) -> None: # noqa: ARG002 + def _handle_group_update(self, snap_group: Snapgroup) -> None: # noqa: ARG002 """Process Snapcast group callback.""" for snap_client in self._snapserver.clients: self._handle_player_update(snap_client) - def _handle_player_init(self, snap_client: SnapClient) -> None: + def _handle_player_init(self, snap_client: Snapclient) -> None: """Process Snapcast add to Player controller.""" player_id = snap_client.identifier player = self.mass.players.get(player_id, raise_unavailable=False) if not player: - snap_client = self._snapserver.client(player_id) - self.mass.create_task(self._set_snapclient_empty_stream(player_id)) + snap_client = cast(Snapclient, self._snapserver.client(player_id)) player = Player( player_id=player_id, provider=self.domain, @@ -150,7 +160,7 @@ class SnapCastProvider(PlayerProvider): ) self.mass.players.register_or_update(player) - def _handle_player_update(self, snap_client: SnapClient) -> None: + def _handle_player_update(self, snap_client: Snapclient) -> None: """Process Snapcast update to Player controller.""" player_id = snap_client.identifier player = self.mass.players.get(player_id) @@ -165,6 +175,10 @@ class SnapCastProvider(PlayerProvider): ) player.synced_to = self._synced_to(player_id) player.group_childs = self._group_childs(player_id) + if player.current_item_id and player_id in player.current_item_id: + player.active_source = player_id + elif stream := self._get_snapstream(player_id): + player.active_source = stream.name self.mass.players.register_or_update(player) async def unload(self) -> None: @@ -173,85 +187,31 @@ class SnapCastProvider(PlayerProvider): await self.cmd_stop(client.identifier) self._snapserver.stop() + async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: + """Return all (provider/player specific) Config Entries for the given player (if any).""" + base_entries = await super().get_player_config_entries(player_id) + return base_entries + ( + CONF_ENTRY_CROSSFADE, + CONF_ENTRY_CROSSFADE_DURATION, + ) + async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player.""" - mass_player = self.mass.players.get(player_id) - if mass_player.volume_level != volume_level: - await self._snapserver.client_volume( - player_id, {"percent": volume_level, "muted": mass_player.volume_muted} - ) - self.cmd_volume_mute(player_id, False) - - async def cmd_play_url( - self, - player_id: str, - url: str, - queue_item: QueueItem | None, # noqa: ARG002 - ) -> None: - """Send PLAY URL command to given player. - - This is called when the Queue wants the player to start playing a specific url. - If an item from the Queue is being played, the QueueItem will be provided with - all metadata present. - - - player_id: player_id of the player to handle the command. - - url: the url that the player should start playing. - - queue_item: the QueueItem that is related to the URL (None when playing direct url). - """ - player = self.mass.players.get(player_id) - stream = await self._set_snapclient_empty_stream(player_id) - - stream_host = stream._stream.get("uri").get("host") - stream_host = stream_host.replace("0.0.0.0", self.snapcast_server_host) - ffmpeg = ( - FFmpeg() - .option("y") - .option("re") - .input(url=url) - .output( - f"tcp://{stream_host}", - f="s16le", - acodec="pcm_s16le", - ac=2, - ar=48000, - ) + await self._snapserver.client_volume( + player_id, {"percent": volume_level, "muted": volume_level != 0} ) - await self.cmd_stop(player_id) - - ffmpeg_task = self.mass.create_task(ffmpeg.execute()) - - @ffmpeg.on("start") - async def on_start(arguments: list[str]): - self.logger.debug("Ffmpeg stream is running") - stream.ffmpeg = ffmpeg - stream.ffmpeg_task = ffmpeg_task - player.current_url = url - player.elapsed_time = 0 - player.elapsed_time_last_updated = time.time() - player.state = PlayerState.PLAYING - self._set_childs_state(player_id, PlayerState.PLAYING) - self.mass.players.register_or_update(player) - async def cmd_stop(self, player_id: str) -> None: """Send STOP command to given player.""" player = self.mass.players.get(player_id, raise_unavailable=False) - if player.state != PlayerState.IDLE: - stream = self._get_snapstream(player_id) - if hasattr(stream, "ffmpeg_task") and stream.ffmpeg_task.done() is False: - try: - stream.ffmpeg.terminate() - stream.ffmpeg_task.cancel() - self.logger.debug("ffmpeg player stopped") - except FFmpegError: - self.logger.debug("Fail to stop ffmpeg player") - player.state = PlayerState.IDLE - self._set_childs_state(player_id, PlayerState.IDLE) - self.mass.players.register_or_update(player) - - async def cmd_pause(self, player_id: str) -> None: - """Send PAUSE command to given player.""" - await self.cmd_stop(player_id) + if stream_task := self._stream_tasks.pop(player_id, None): # noqa: SIM102 + if not stream_task.done(): + stream_task.cancel() + player.state = PlayerState.IDLE + self._set_childs_state(player_id, PlayerState.IDLE) + self.mass.players.register_or_update(player) + # assign default/empty stream to the player + await self._get_snapgroup(player_id).set_stream("default") async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: """Send MUTE command to given player.""" @@ -266,20 +226,87 @@ class SnapCastProvider(PlayerProvider): """Unsync Snapcast player.""" group = self._get_snapgroup(player_id) await group.remove_client(player_id) - group = self._get_snapgroup(player_id) - stream_id = await self._get_empty_stream() - await group.set_stream(stream_id) + # assign default/empty stream to the player + await self._get_snapgroup(player_id).set_stream("default") self._handle_update() - def _get_snapgroup(self, player_id: str) -> SnapGroup: + async def play_media( + self, + player_id: str, + queue_item: QueueItem, + seek_position: int, + fade_in: bool, + ) -> None: + """Handle PLAY MEDIA on given player. + + This is called by the Queue controller to start playing a queue item on the given player. + The provider's own implementation should work out how to handle this request. + + - player_id: player_id of the player to handle the command. + - queue_item: The QueueItem that needs to be played on the player. + - seek_position: Optional seek to this position. + - fade_in: Optionally fade in the item at playback start. + """ + player = self.mass.players.get(player_id) + if player.synced_to: + raise RuntimeError("A synced player cannot receive play commands directly") + # stop any existing streams first + await self.cmd_stop(player_id) + queue = self.mass.player_queues.get(queue_item.queue_id) + stream, port = await self._create_stream() + snap_group = self._get_snapgroup(player_id) + await snap_group.set_stream(stream.identifier) + + async def queue_streamer(): + host = self.snapcast_server_host + _, writer = await asyncio.open_connection(host, port) + self.logger.debug("Opened connection to %s:%s", host, port) + player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}" + player.elapsed_time = 0 + player.elapsed_time_last_updated = time.time() + player.state = PlayerState.PLAYING + self._set_childs_state(player_id, PlayerState.PLAYING) + self.mass.players.register_or_update(player) + # TODO: can we handle 24 bits bit depth ? + pcm_format = AudioFormat( + content_type=ContentType.PCM_S16LE, + sample_rate=48000, + bit_depth=16, + channels=2, + ) + try: + async for pcm_chunk in self.mass.streams.get_flow_stream( + queue, + start_queue_item=queue_item, + pcm_format=pcm_format, + seek_position=seek_position, + fade_in=fade_in, + ): + writer.write(pcm_chunk) + await writer.drain() + + finally: + await self._snapserver.stream_remove_stream(stream.identifier) + if writer.can_write_eof(): + writer.close() + if not writer.is_closing(): + writer.close() + self.logger.debug("Closed connection to %s:%s", host, port) + + # start streaming the queue (pcm) audio in a background task + self._stream_tasks[player_id] = asyncio.create_task(queue_streamer()) + + def _get_snapgroup(self, player_id: str) -> Snapgroup: """Get snapcast group for given player_id.""" - client = self._snapserver.client(player_id) + client: Snapclient = self._snapserver.client(player_id) return client.group - def _get_snapstream(self, player_id: str) -> SnapStream: + def _get_snapstream(self, player_id: str) -> Snapstream | None: """Get snapcast stream for given player_id.""" - group = self._get_snapgroup(player_id) - return self._snapserver.stream(group.stream) + if group := self._get_snapgroup(player_id): + with suppress(KeyError): + return self._snapserver.stream(group.stream) + return None def _synced_to(self, player_id: str) -> str | None: """Return player_id of the player this player is synced to.""" @@ -292,26 +319,25 @@ class SnapCastProvider(PlayerProvider): snap_group = self._get_snapgroup(player_id) return {snap_client for snap_client in snap_group.clients if snap_client != player_id} - async def _get_empty_stream(self) -> str: - """Find or create empty stream on snapcast server. - - This method ensures that there is a snapstream for each snapclient, - even if the snapserver only have one stream configured. This is needed - because the default config of snapserver is one stream on a named pipe. - """ - used_streams = {group.stream for group in self._snapserver.groups} - for stream in self._snapserver.streams: - if stream.path == "" and stream.identifier not in used_streams: - return stream.identifier - port = 4953 - name = str(uuid.uuid4()) - while True: - new_stream = await self._snapserver.stream_add_stream( - f"tcp://0.0.0.0:{port}?name={f'MA_{name}'}&sampleformat=48000:16:2", + async def _create_stream(self) -> tuple[Snapstream, int]: + """Create new stream on snapcast server.""" + attempts = 50 + while attempts: + attempts -= 1 + # pick a random port + port = random.randint(4953, 4953 + 200) + name = f"MusicAssistant--{port}" + result = await self._snapserver.stream_add_stream( + # TODO: can we handle 24 bits bit depth ? + f"tcp://0.0.0.0:{port}?name={name}&sampleformat=48000:16:2", ) - if "id" in new_stream and new_stream["id"] not in used_streams: - return new_stream["id"] - port += 1 + if "id" not in result: + # if the port is already taken, the result will be an error + self.logger.warning(result) + continue + stream = self._snapserver.stream(result["id"]) + return (stream, port) + raise RuntimeError("Unable to create stream - No free port found?") def _get_player_state(self, player_id: str) -> PlayerState: """Return the state of the player.""" @@ -323,11 +349,4 @@ class SnapCastProvider(PlayerProvider): for child_player_id in self._group_childs(player_id): player = self.mass.players.get(child_player_id) player.state = state - self.mass.players.update(player) - - async def _set_snapclient_empty_stream(self, player_id: str) -> SnapStream: - """Set the snapclient stream to empty and return new stream.""" - new_stream_id = await self._get_empty_stream() - await self._get_snapgroup(player_id).set_stream(new_stream_id) - stream = self._snapserver.stream(new_stream_id) - return stream + self.mass.players.update(child_player_id) diff --git a/music_assistant/server/providers/sonos/__init__.py b/music_assistant/server/providers/sonos/__init__.py index 007795d3..e35d6523 100644 --- a/music_assistant/server/providers/sonos/__init__.py +++ b/music_assistant/server/providers/sonos/__init__.py @@ -14,17 +14,22 @@ from soco.events_base import Event as SonosEvent from soco.events_base import SubscriptionBase from soco.groups import ZoneGroup -from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType +from music_assistant.common.models.config_entries import ( + CONF_ENTRY_CROSSFADE, + ConfigEntry, + ConfigValueType, +) from music_assistant.common.models.enums import ( ConfigEntryType, + ContentType, PlayerFeature, PlayerState, PlayerType, ) -from music_assistant.common.models.errors import PlayerUnavailableError, QueueEmpty +from music_assistant.common.models.errors import PlayerCommandFailed, PlayerUnavailableError from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.common.models.queue_item import QueueItem -from music_assistant.constants import CONF_PLAYERS +from music_assistant.constants import CONF_CROSSFADE, CONF_PLAYERS from music_assistant.server.helpers.didl_lite import create_didl_metadata from music_assistant.server.models.player_provider import PlayerProvider @@ -36,10 +41,10 @@ if TYPE_CHECKING: PLAYER_FEATURES = ( - PlayerFeature.SET_MEMBERS, PlayerFeature.SYNC, PlayerFeature.VOLUME_MUTE, PlayerFeature.VOLUME_SET, + PlayerFeature.ENQUEUE_NEXT, ) CONF_NETWORK_SCAN = "network_scan" @@ -48,6 +53,19 @@ CONF_NETWORK_SCAN = "network_scan" # to allow coextistence with HA on the same host config.EVENT_LISTENER_PORT = 1700 +HIRES_MODELS = ( + "Sonos Roam", + "Sonos Arc", + "Sonos Beam", + "Sonos Five", + "Sonos Move", + "Sonos One SL", + "Sonos Port", + "Sonos Amp", + "SYMFONISK Bookshelf", + "SYMFONISK Table Lamp", +) + async def setup( mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig @@ -92,7 +110,6 @@ class SonosPlayer: soco: soco.SoCo player: Player is_stereo_pair: bool = False - next_url: str | None = None elapsed_time: int = 0 playback_started: float | None = None need_elapsed_time_workaround: bool = False @@ -170,7 +187,16 @@ class SonosPlayer: self.playback_started = now # media info (track info) - self.player.current_url = self.track_info["uri"] + self.player.current_item_id = self.track_info["uri"] + if self.player.player_id in self.player.current_item_id: + self.player.active_source = self.player.player_id + elif "spotify" in self.player.current_item_id: + self.player.active_source = "spotify" + elif self.player.current_item_id.startswith("http"): + self.player.active_source = "http" + else: + # TODO: handle other possible sources here + self.player.active_source = None if not self.need_elapsed_time_workaround: self.player.elapsed_time = self.elapsed_time self.player.elapsed_time_last_updated = self.track_info_updated @@ -262,12 +288,75 @@ class SonosPlayerProvider(PlayerProvider): player.soco.end_direct_control_session() self.sonosplayers = None + async def get_player_config_entries( + self, player_id: str # noqa: ARG002 + ) -> tuple[ConfigEntry, ...]: + """Return Config Entries for the given player.""" + base_entries = await super().get_player_config_entries(player_id) + if not (sonos_player := self.sonosplayers.get(player_id)): + return base_entries + return base_entries + ( + CONF_ENTRY_CROSSFADE, + ConfigEntry( + key="sonos_bass", + type=ConfigEntryType.INTEGER, + label="Bass", + default_value=0, + range=(-10, 10), + description="Set the Bass level for the Sonos player", + value=sonos_player.soco.bass, + advanced=True, + ), + ConfigEntry( + key="sonos_treble", + type=ConfigEntryType.INTEGER, + label="Treble", + default_value=0, + range=(-10, 10), + description="Set the Treble level for the Sonos player", + value=sonos_player.soco.treble, + advanced=True, + ), + ConfigEntry( + key="sonos_loudness", + type=ConfigEntryType.BOOLEAN, + label="Loudness compensation", + default_value=True, + description="Enable loudness compensation on the Sonos player", + value=sonos_player.soco.loudness, + advanced=True, + ), + ) + def on_player_config_changed( self, config: PlayerConfig, changed_keys: set[str] # noqa: ARG002 ) -> None: """Call (by config manager) when the configuration of a player changes.""" - # run discovery to catch any re-enabled players - self.mass.create_task(self._run_discovery()) + if "enabled" in changed_keys: + # run discovery to catch any re-enabled players + self.mass.create_task(self._run_discovery()) + if not (sonos_player := self.sonosplayers.get(config.player_id)): + return + if "values/sonos_bass" in changed_keys: + self.mass.create_task( + sonos_player.soco.renderingControl.SetBass, + [("InstanceID", 0), ("DesiredBass", config.get_value("sonos_bass"))], + ) + if "values/sonos_treble" in changed_keys: + self.mass.create_task( + sonos_player.soco.renderingControl.SetTreble, + [("InstanceID", 0), ("DesiredTreble", config.get_value("sonos_treble"))], + ) + if "values/sonos_loudness" in changed_keys: + loudness_value = "1" if config.get_value("sonos_loudness") else "0" + self.mass.create_task( + sonos_player.soco.renderingControl.SetLoudness, + [ + ("InstanceID", 0), + ("Channel", "Master"), + ("DesiredLoudness", loudness_value), + ], + ) async def cmd_stop(self, player_id: str) -> None: """Send STOP command to given player.""" @@ -293,49 +382,6 @@ class SonosPlayerProvider(PlayerProvider): return await asyncio.to_thread(sonos_player.soco.play) - async def cmd_play_url( - self, - player_id: str, - url: str, - queue_item: QueueItem | None, - ) -> None: - """Send PLAY URL command to given player. - - This is called when the Queue wants the player to start playing a specific url. - If an item from the Queue is being played, the QueueItem will be provided with - all metadata present. - - - player_id: player_id of the player to handle the command. - - url: the url that the player should start playing. - - queue_item: the QueueItem that is related to the URL (None when playing direct url). - """ - sonos_player = self.sonosplayers[player_id] - if not sonos_player.soco.is_coordinator: - self.logger.debug( - "Ignore PLAY_MEDIA command for %s: Player is synced to another player.", - player_id, - ) - return - # always stop and clear queue first - sonos_player.next_url = None - await asyncio.to_thread(sonos_player.soco.stop) - await asyncio.to_thread(sonos_player.soco.clear_queue) - - if queue_item is None: - # enforce mp3 radio mode for flow stream - url = url.replace(".flac", ".mp3").replace(".wav", ".mp3") - await asyncio.to_thread( - sonos_player.soco.play_uri, url, title="Music Assistant", force_radio=True - ) - else: - await self._enqueue_item(sonos_player, url=url, queue_item=queue_item) - await asyncio.to_thread(sonos_player.soco.play_from_queue, 0) - # optimistically set this timestamp to help figure out elapsed time later - now = time.time() - sonos_player.playback_started = now - sonos_player.player.elapsed_time = 0 - sonos_player.player.elapsed_time_last_updated = now - async def cmd_pause(self, player_id: str) -> None: """Send PAUSE command to given player.""" sonos_player = self.sonosplayers[player_id] @@ -398,6 +444,80 @@ class SonosPlayerProvider(PlayerProvider): update_group_info=True, ) + async def play_media( + self, + player_id: str, + queue_item: QueueItem, + seek_position: int, + fade_in: bool, + ) -> None: + """Handle PLAY MEDIA on given player. + + This is called by the Queue controller to start playing a queue item on the given player. + The provider's own implementation should work out how to handle this request. + + - player_id: player_id of the player to handle the command. + - queue_item: The QueueItem that needs to be played on the player. + - seek_position: Optional seek to this position. + - fade_in: Optionally fade in the item at playback start. + """ + url = await self.mass.streams.resolve_stream_url( + queue_item=queue_item, + output_codec=ContentType.FLAC, + seek_position=seek_position, + fade_in=fade_in, + flow_mode=False, + ) + sonos_player = self.sonosplayers[player_id] + if not sonos_player.soco.is_coordinator: + # this should be already handled by the player manager, but just in case... + raise PlayerCommandFailed( + f"Player {sonos_player.player.display_name} can not " + "accept play_media command, it is synced to another player." + ) + # always stop and clear queue first + await asyncio.to_thread(sonos_player.soco.stop) + await asyncio.to_thread(sonos_player.soco.clear_queue) + await self._enqueue_item(sonos_player, url=url, queue_item=queue_item) + await asyncio.to_thread(sonos_player.soco.play_from_queue, 0) + # optimistically set this timestamp to help figure out elapsed time later + now = time.time() + sonos_player.playback_started = now + sonos_player.player.elapsed_time = 0 + sonos_player.player.elapsed_time_last_updated = now + + async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): + """ + Handle enqueuing of the next queue item on the player. + + If the player supports PlayerFeature.ENQUE_NEXT: + This will be called about 10 seconds before the end of the track. + If the player does NOT report support for PlayerFeature.ENQUE_NEXT: + This will be called when the end of the track is reached. + + A PlayerProvider implementation is in itself responsible for handling this + so that the queue items keep playing until its empty or the player stopped. + + This will NOT be called if the end of the queue is reached (and repeat disabled). + This will NOT be called if flow mode is enabled on the queue. + """ + sonos_player = self.sonosplayers[player_id] + url = await self.mass.streams.resolve_stream_url( + queue_item=queue_item, + output_codec=ContentType.FLAC, + ) + # set crossfade according to player setting + crossfade = await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE) + if sonos_player.soco.cross_fade != crossfade: + + def set_crossfade(): + with suppress(Exception): + sonos_player.soco.cross_fade = crossfade + + await asyncio.to_thread(set_crossfade) + + await self._enqueue_item(sonos_player, url=url, queue_item=queue_item) + async def poll_player(self, player_id: str) -> None: """Poll player for state updates. @@ -489,12 +609,16 @@ class SonosPlayerProvider(PlayerProvider): address=soco_device.ip_address, manufacturer=self.name, ), - max_sample_rate=48000, - supports_24bit=True, + max_sample_rate=441000, + supports_24bit=False, ), speaker_info=speaker_info, speaker_info_updated=time.time(), ) + if speaker_info["model_name"] in HIRES_MODELS: + sonos_player.player.max_sample_rate = 48000 + sonos_player.player.supports_24bit = True + # poll all endpoints once and update attributes await sonos_player.check_poll() sonos_player.update_attributes() @@ -560,36 +684,11 @@ class SonosPlayerProvider(PlayerProvider): sonos_player.group_info_updated = time.time() asyncio.run_coroutine_threadsafe(self._update_player(sonos_player), self.mass.loop) - async def _enqueue_next_track(self, sonos_player: SonosPlayer) -> None: - """Enqueue the next track of the MA queue on the CC queue.""" - try: - next_url, next_item, crossfade = await self.mass.player_queues.preload_next_url( - sonos_player.player_id - ) - except QueueEmpty: - return - - if sonos_player.next_url == next_url: - return # already set ?! - sonos_player.next_url = next_url - - # set crossfade according to queue mode - if sonos_player.soco.cross_fade != crossfade: - - def set_crossfade(): - with suppress(Exception): - sonos_player.soco.cross_fade = crossfade - - await asyncio.to_thread(set_crossfade) - - # send queue item to sonos queue - await self._enqueue_item(sonos_player, url=next_url, queue_item=next_item) - async def _enqueue_item( self, sonos_player: SonosPlayer, url: str, - queue_item: QueueItem | None = None, + queue_item: QueueItem, ) -> None: """Enqueue a queue item to the Sonos player Queue.""" metadata = create_didl_metadata(self.mass, url, queue_item) @@ -612,13 +711,13 @@ class SonosPlayerProvider(PlayerProvider): async def _update_player(self, sonos_player: SonosPlayer, signal_update: bool = True) -> None: """Update Sonos Player.""" - prev_url = sonos_player.player.current_url + prev_url = sonos_player.player.current_item_id prev_state = sonos_player.player.state sonos_player.update_attributes() sonos_player.player.can_sync_with = tuple( x for x in self.sonosplayers if x != sonos_player.player_id ) - current_url = sonos_player.player.current_url + current_url = sonos_player.player.current_item_id current_state = sonos_player.player.state if (prev_url != current_url) or (prev_state != current_state): @@ -635,14 +734,6 @@ class SonosPlayerProvider(PlayerProvider): # will detect changes to the player object itself self.mass.players.update(sonos_player.player_id) - # enqueue next item if needed - if ( - sonos_player.player.state == PlayerState.PLAYING - and sonos_player.player.active_source == sonos_player.player.player_id - and sonos_player.next_url in (None, sonos_player.player.current_url) - ): - self.mass.create_task(self._enqueue_next_track(sonos_player)) - def _convert_state(sonos_state: str) -> PlayerState: """Convert Sonos state to PlayerState.""" diff --git a/music_assistant/server/providers/ugp/__init__.py b/music_assistant/server/providers/ugp/__init__.py index 4b6f91b4..1d2b0378 100644 --- a/music_assistant/server/providers/ugp/__init__.py +++ b/music_assistant/server/providers/ugp/__init__.py @@ -9,8 +9,6 @@ from __future__ import annotations import asyncio from typing import TYPE_CHECKING -import shortuuid - from music_assistant.common.models.config_entries import ( CONF_ENTRY_EQ_BASS, CONF_ENTRY_EQ_MID, @@ -18,7 +16,6 @@ from music_assistant.common.models.config_entries import ( CONF_ENTRY_FLOW_MODE, CONF_ENTRY_HIDE_GROUP_MEMBERS, CONF_ENTRY_OUTPUT_CHANNELS, - CONF_ENTRY_OUTPUT_CODEC, ConfigEntry, ConfigValueOption, ConfigValueType, @@ -38,7 +35,6 @@ if TYPE_CHECKING: from music_assistant.common.models.config_entries import ProviderConfig from music_assistant.common.models.provider import ProviderManifest from music_assistant.server import MusicAssistant - from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType @@ -61,9 +57,6 @@ CONF_ENTRY_EQ_MID_HIDDEN = ConfigEntry.from_dict({**CONF_ENTRY_EQ_MID.to_dict(), CONF_ENTRY_EQ_TREBLE_HIDDEN = ConfigEntry.from_dict( {**CONF_ENTRY_EQ_TREBLE.to_dict(), "hidden": True} ) -CONF_ENTRY_OUTPUT_CODEC_HIDDEN = ConfigEntry.from_dict( - {**CONF_ENTRY_OUTPUT_CODEC.to_dict(), "hidden": True} -) CONF_ENTRY_GROUPED_POWER_ON = ConfigEntry( key=CONF_GROUPED_POWER_ON, type=ConfigEntryType.BOOLEAN, @@ -170,9 +163,8 @@ class UniversalGroupProvider(PlayerProvider): PlayerFeature.PAUSE, PlayerFeature.VOLUME_SET, PlayerFeature.VOLUME_MUTE, - PlayerFeature.SET_MEMBERS, ), - max_sample_rate=96000, + max_sample_rate=48000, supports_24bit=True, active_source=conf_key, group_childs=player_conf, @@ -203,14 +195,6 @@ class UniversalGroupProvider(PlayerProvider): "child players will be treated as (un)mute commands to prevent the small " "interruption of music when the stream is restarted.", ), - CONF_ENTRY_OUTPUT_CHANNELS_FORCED_STEREO, - CONF_ENTRY_FORCED_FLOW_MODE, - # group player outputs to individual members so - # these settings make no sense, hide them - CONF_ENTRY_EQ_BASS_HIDDEN, - CONF_ENTRY_EQ_MID_HIDDEN, - CONF_ENTRY_EQ_TREBLE_HIDDEN, - CONF_ENTRY_OUTPUT_CODEC_HIDDEN, ) async def cmd_stop(self, player_id: str) -> None: @@ -236,36 +220,26 @@ class UniversalGroupProvider(PlayerProvider): ): tg.create_task(self.mass.players.cmd_play(member.player_id)) - async def cmd_play_url( + async def play_media( self, player_id: str, - url: str, - queue_item: QueueItem | None, + queue_item: QueueItem, + seek_position: int, + fade_in: bool, ) -> None: - """Send PLAY URL command to given player. + """Handle PLAY MEDIA on given player. - This is called when the Queue wants the player to start playing a specific url. - If an item from the Queue is being played, the QueueItem will be provided with - all metadata present. + This is called by the Queue controller to start playing a queue item on the given player. + The provider's own implementation should work out how to handle this request. - player_id: player_id of the player to handle the command. - - url: the url that the player should start playing. - - queue_item: the QueueItem that is related to the URL (None when playing direct url). + - queue_item: The QueueItem that needs to be played on the player. + - seek_position: Optional seek to this position. + - fade_in: Optionally fade in the item at playback start. """ - # send stop first - await self.cmd_stop(player_id) - # debounce - # this can potentially be called multiple times at the (near) exact time - # due to many child players being powered on (or resynced) at the same time - # debounce the command a bit by only letting through the last one. - self.debounce_id = debounce_id = shortuuid.uuid() - await asyncio.sleep(100) - if self.debounce_id != debounce_id: - return # power ON await self.cmd_power(player_id, True) group_player = self.mass.players.get(player_id) - active_members = self._get_active_members( player_id, only_powered=True, skip_sync_childs=True ) @@ -279,43 +253,41 @@ class UniversalGroupProvider(PlayerProvider): group_player.extra_data["optimistic_state"] = PlayerState.PLAYING - # forward command to all (powered) group child's + # forward the command to all (sync master) group child's async with asyncio.TaskGroup() as tg: for member in active_members: player_prov = self.mass.players.get_player_provider(member.player_id) tg.create_task( - player_prov.cmd_play_url(member.player_id, url=url, queue_item=queue_item) + player_prov.play_media( + member.player_id, + queue_item=queue_item, + seek_position=seek_position, + fade_in=fade_in, + ) ) - async def cmd_handle_stream_job(self, player_id: str, stream_job: MultiClientStreamJob) -> None: - """Handle StreamJob play command on given player.""" - # send stop first - await self.cmd_stop(player_id) - # power ON - await self.cmd_power(player_id, True) - group_player = self.mass.players.get(player_id) + async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): + """ + Handle enqueuing of the next queue item on the player. - active_members = self._get_active_members( - player_id, only_powered=True, skip_sync_childs=True - ) - if len(active_members) == 0: - self.logger.warning( - "Play media requested for player %s but no member players are powered, " - "the request will be ignored", - group_player.display_name, - ) - return + If the player supports PlayerFeature.ENQUE_NEXT: + This will be called about 10 seconds before the end of the track. + If the player does NOT report support for PlayerFeature.ENQUE_NEXT: + This will be called when the end of the track is reached. - group_player.extra_data["optimistic_state"] = PlayerState.PLAYING + A PlayerProvider implementation is in itself responsible for handling this + so that the queue items keep playing until its empty or the player stopped. - # forward command to all (powered) group child's + This will NOT be called if the end of the queue is reached (and repeat disabled). + This will NOT be called if the player is using flow mode to playback the queue. + """ + # forward the command to all (sync master) group child's async with asyncio.TaskGroup() as tg: - for member in active_members: + for member in self._get_active_members( + player_id, only_powered=False, skip_sync_childs=True + ): player_prov = self.mass.players.get_player_provider(member.player_id) - # we forward the stream_job to child to allow for nested groups etc - tg.create_task( - player_prov.cmd_handle_stream_job(member.player_id, stream_job=stream_job) - ) + tg.create_task(player_prov.enqueue_next_queue_item(member.player_id, queue_item)) async def cmd_pause(self, player_id: str) -> None: """Send PAUSE command to given player.""" @@ -390,14 +362,14 @@ class UniversalGroupProvider(PlayerProvider): group_player = self.mass.players.get(player_id) if not group_player.powered: group_player.state = PlayerState.IDLE + group_player.active_source = None return all_members = self._get_active_members( player_id, only_powered=False, skip_sync_childs=False ) - if all_members: - group_player.max_sample_rate = max(x.max_sample_rate for x in all_members) group_player.group_childs = list(x.player_id for x in all_members) + group_player.active_source = player_id # read the state from the first active group member for member in all_members: if member.synced_to: @@ -408,7 +380,7 @@ class UniversalGroupProvider(PlayerProvider): player_powered = member.powered if not player_powered: continue - group_player.current_url = member.current_url + group_player.current_item_id = member.current_item_id group_player.elapsed_time = member.elapsed_time group_player.elapsed_time_last_updated = member.elapsed_time_last_updated group_player.state = member.state