From b75c64a3cc989724219fddf61797015cec4d2772 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Tue, 11 Feb 2025 21:08:21 +0100 Subject: [PATCH] Chore: small refactor of player and queue update logic --- music_assistant/controllers/player_queues.py | 370 ++++++++++--------- music_assistant/controllers/players.py | 53 +-- music_assistant/helpers/util.py | 12 +- 3 files changed, 223 insertions(+), 212 deletions(-) diff --git a/music_assistant/controllers/player_queues.py b/music_assistant/controllers/player_queues.py index 6dd05d72..8c553f77 100644 --- a/music_assistant/controllers/player_queues.py +++ b/music_assistant/controllers/player_queues.py @@ -101,6 +101,7 @@ class CompareState(TypedDict): state: PlayerState current_item_id: str | None next_item_id: str | None + current_item: QueueItem | None elapsed_time: int stream_title: str | None codec_type: ContentType | None @@ -806,12 +807,19 @@ class PlayerQueuesController(CoreController): # send play_media request to player # NOTE that we debounce this a bit to account for someone hitting the next button # like a madman. This will prevent the player from being overloaded with requests. + async def play_media(): + await self.mass.players.play_media( + player_id=queue_id, + media=self.player_media_from_queue_item(queue_item, queue.flow_mode), + ) + await asyncio.sleep(2) + setattr(queue, "transitioning", False) # noqa: B010 + + # we set a flag to notify the update logic that we're transitioning to a new track + setattr(queue, "transitioning", True) # noqa: B010 self.mass.call_later( - 1 if debounce else 0.1, - self.mass.players.play_media, - player_id=queue_id, - # transform into PlayerMedia to send to the actual player implementation - media=self.player_media_from_queue_item(queue_item, queue.flow_mode), + 1.5 if debounce else 0.1, + play_media, task_id=f"play_media_{queue_id}", ) self.signal_update(queue_id) @@ -913,174 +921,25 @@ class PlayerQueuesController(CoreController): NOTE: This is called every second if the player is playing. """ - if player.player_id not in self._queues: + queue_id = player.player_id + if (queue := self._queues.get(queue_id)) is None: # race condition return if player.announcement_in_progress: # do nothing while the announcement is in progress return - queue_id = player.player_id - player = self.mass.players.get(queue_id) - queue = self._queues[queue_id] - - # basic properties - queue.display_name = player.display_name - queue.available = player.available - queue.items = len(self._queue_items[queue_id]) # determine if this queue is currently active for this player queue.active = player.active_source == queue.queue_id if not queue.active and queue_id not in self._prev_states: queue.state = PlayerState.IDLE # return early if the queue is not active and we have no previous state return - - # update current item/index from player report - if queue.active and queue.state == PlayerState.PLAYING: - # NOTE: If the queue is not playing (yet) we will not update the current index - # to ensure we keep the previously known the current index - if queue.flow_mode: - # flow mode active, the player is playing one long stream - # so we need to calculate the current index and elapsed time - queue.current_index, queue.elapsed_time = self._get_flow_queue_stream_index( - queue, player - ) - elif item_id := self._parse_player_current_item_id(queue_id, player): - # normal mode, the player itself will report the current item - queue.elapsed_time = int(player.corrected_elapsed_time or 0) - queue.current_index = self.index_by_id(queue_id, item_id) - queue.elapsed_time_last_updated = time.time() - queue.state = player.state or PlayerState.IDLE - else: - # queue is not active (or not playing) - queue.state = player.state or PlayerState.IDLE - - # set current item and next item from the current index - queue.current_item = self.get_item(queue_id, queue.current_index) - queue.next_item = self._get_next_item(queue_id, queue.current_index) - - # correct elapsed time when seeking - if ( - player.state == PlayerState.PLAYING - and not queue.flow_mode - and queue.current_item - and queue.current_item.streamdetails - and queue.current_item.streamdetails.seek_position - ): - queue.elapsed_time += queue.current_item.streamdetails.seek_position - - prev_state: CompareState = self._prev_states.get( - queue_id, - CompareState( - queue_id=queue_id, - state=PlayerState.IDLE, - current_item_id=None, - next_item_id=None, - elapsed_time=0, - stream_title=None, - output_formats=None, - ), - ) - - # This is enough to detect any changes in the DSPDetails - # (so child count changed, or any output format changed) - output_formats = [] - if player.output_format: - output_formats.append(str(player.output_format)) - for child_id in player.group_childs: - if (child := self.mass.players.get(child_id)) and child.output_format: - output_formats.append(str(child.output_format)) - else: - output_formats.append("unknown") - - # basic throttle: do not send state changed events if queue did not actually change - new_state = CompareState( - queue_id=queue_id, - state=queue.state, - current_item_id=queue.current_item.queue_item_id if queue.current_item else None, - next_item_id=queue.next_item.queue_item_id if queue.next_item else None, - elapsed_time=queue.elapsed_time, - stream_title=( - queue.current_item.streamdetails.stream_title - if queue.current_item and queue.current_item.streamdetails - else None - ), - codec_type=( - queue.current_item.streamdetails.audio_format.codec_type - if queue.current_item and queue.current_item.streamdetails - else None - ), - output_formats=output_formats, - ) - changed_keys = get_changed_keys(prev_state, new_state) - # return early if nothing changed - if len(changed_keys) == 0: + if getattr(queue, "transitioning", False): + # we're currently transitioning to a new track, + # ignore updates from the player during this time return - - # signal update and store state - if changed_keys == {"elapsed_time"}: - # do not send full updates if only time was updated - self.mass.signal_event( - EventType.QUEUE_TIME_UPDATED, - object_id=queue_id, - data=queue.elapsed_time, - ) - else: - self.signal_update(queue_id) - - # store the new state - if queue.active: - self._prev_states[queue_id] = new_state - else: - self._prev_states.pop(queue_id, None) - - if "output_formats" in changed_keys: - # refresh DSP details since they may have changed - dsp = get_stream_dsp_details(self.mass, queue_id) - if queue.current_item and queue.current_item.streamdetails: - queue.current_item.streamdetails.dsp = dsp - if queue.next_item and queue.next_item.streamdetails: - queue.next_item.streamdetails.dsp = dsp - - # handle sending a playback progress report - # we do this every 30 seconds or when the state changes - if ( - changed_keys.intersection({"state", "current_item_id", "next_item_id"}) - or int(queue.elapsed_time) % 30 == 0 - ): - self._handle_playback_progress_report(queue, prev_state, new_state) - - # check if we need to clear the queue if we reached the end - if "state" in changed_keys and queue.state == PlayerState.IDLE: - self._handle_end_of_queue(queue, prev_state, new_state) - - # watch dynamic radio items refill if needed - if "current_item_id" in changed_keys: - # auto enable radio mode if dont stop the music is enabled - if ( - queue.dont_stop_the_music_enabled - and queue.enqueued_media_items - and queue.current_index is not None - and (queue.items - queue.current_index) <= 1 - ): - # We have received the last item in the queue and Don't stop the music is enabled - # set the played media item(s) as radio items (which will refill the queue) - # note that this will fail if there are no media items for which we have - # a dynamic radio source. - self.logger.debug( - "End of queue detected and Don't stop the music is enabled for %s" - " - setting enqueued media items as radio source: %s", - queue.display_name, - ", ".join([x.uri for x in queue.enqueued_media_items]), - ) - queue.radio_source = queue.enqueued_media_items - # auto fill radio tracks if less than 5 tracks left in the queue - if ( - queue.radio_source - and queue.current_index is not None - and (queue.items - queue.current_index) < 5 - ): - task_id = f"fill_radio_tracks_{queue_id}" - self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id) + # queue is active and preflight checks passed, update the queue details + self._update_queue_from_player(player) def on_player_remove(self, player_id: str) -> None: """Call when a player is removed from the registry.""" @@ -1665,6 +1524,169 @@ class PlayerQueuesController(CoreController): ) return queue_tracks + def _update_queue_from_player( + self, + player: Player, + ) -> None: + """Update the Queue when the player state changed.""" + queue_id = player.player_id + player = self.mass.players.get(queue_id) + queue = self._queues[queue_id] + + # basic properties + queue.display_name = player.display_name + queue.available = player.available + queue.items = len(self._queue_items[queue_id]) + + queue.state = player.state or PlayerState.IDLE if queue.active else PlayerState.IDLE + # update current item/index from player report + if queue.active and queue.state in (PlayerState.PLAYING, PlayerState.PAUSED): + # NOTE: If the queue is not playing (yet) we will not update the current index + # to ensure we keep the previously known current index + if queue.flow_mode: + # flow mode active, the player is playing one long stream + # so we need to calculate the current index and elapsed time + current_index, elapsed_time = self._get_flow_queue_stream_index(queue, player) + elif item_id := self._parse_player_current_item_id(queue_id, player): + # normal mode, the player itself will report the current item + elapsed_time = int(player.corrected_elapsed_time or 0) + current_index = self.index_by_id(queue_id, item_id) + else: + # this should not happen but we will handle it gracefully + elapsed_time = 0 + current_index = None + + # get current/next item based on current index + queue.current_index = current_index + queue.current_item = current_item = self.get_item(queue_id, current_index) + queue.next_item = self._get_next_item(queue_id, current_index) if current_item else None + + # correct elapsed time when seeking + if ( + not queue.flow_mode + and current_item + and current_item.streamdetails + and current_item.streamdetails.seek_position + ): + elapsed_time += current_item.streamdetails.seek_position + queue.elapsed_time = elapsed_time + queue.elapsed_time_last_updated = time.time() + + # This is enough to detect any changes in the DSPDetails + # (so child count changed, or any output format changed) + output_formats = [] + if player.output_format: + output_formats.append(str(player.output_format)) + for child_id in player.group_childs: + if (child := self.mass.players.get(child_id)) and child.output_format: + output_formats.append(str(child.output_format)) + else: + output_formats.append("unknown") + + # basic throttle: do not send state changed events if queue did not actually change + prev_state: CompareState = self._prev_states.get( + queue_id, + CompareState( + queue_id=queue_id, + state=PlayerState.IDLE, + current_item_id=None, + next_item_id=None, + current_item=None, + elapsed_time=0, + stream_title=None, + output_formats=None, + ), + ) + new_state = CompareState( + queue_id=queue_id, + state=queue.state, + current_item_id=queue.current_item.queue_item_id if queue.current_item else None, + next_item_id=queue.next_item.queue_item_id if queue.next_item else None, + current_item=queue.current_item, + elapsed_time=queue.elapsed_time, + stream_title=( + queue.current_item.streamdetails.stream_title + if queue.current_item and queue.current_item.streamdetails + else None + ), + codec_type=( + queue.current_item.streamdetails.audio_format.codec_type + if queue.current_item and queue.current_item.streamdetails + else None + ), + output_formats=output_formats, + ) + changed_keys = get_changed_keys(prev_state, new_state, ["next_item"]) + # return early if nothing changed + if len(changed_keys) == 0: + return + + # signal update and store state + if changed_keys == {"elapsed_time"}: + # do not send full updates if only time was updated + self.mass.signal_event( + EventType.QUEUE_TIME_UPDATED, + object_id=queue_id, + data=queue.elapsed_time, + ) + else: + self.signal_update(queue_id) + + # store the new state + if queue.active: + self._prev_states[queue_id] = new_state + else: + self._prev_states.pop(queue_id, None) + + if "output_formats" in changed_keys: + # refresh DSP details since they may have changed + dsp = get_stream_dsp_details(self.mass, queue_id) + if queue.current_item and queue.current_item.streamdetails: + queue.current_item.streamdetails.dsp = dsp + if queue.next_item and queue.next_item.streamdetails: + queue.next_item.streamdetails.dsp = dsp + + # handle sending a playback progress report + # we do this every 30 seconds or when the state changes + if ( + changed_keys.intersection({"state", "current_item_id"}) + or int(queue.elapsed_time) % 30 == 0 + ): + self._handle_playback_progress_report(queue, prev_state, new_state) + + # check if we need to clear the queue if we reached the end + if "state" in changed_keys and queue.state == PlayerState.IDLE: + self._handle_end_of_queue(queue, prev_state, new_state) + + # watch dynamic radio items refill if needed + if "current_item_id" in changed_keys: + # auto enable radio mode if dont stop the music is enabled + if ( + queue.dont_stop_the_music_enabled + and queue.enqueued_media_items + and queue.current_index is not None + and (queue.items - queue.current_index) <= 1 + ): + # We have received the last item in the queue and Don't stop the music is enabled + # set the played media item(s) as radio items (which will refill the queue) + # note that this will fail if there are no media items for which we have + # a dynamic radio source. + self.logger.debug( + "End of queue detected and Don't stop the music is enabled for %s" + " - setting enqueued media items as radio source: %s", + queue.display_name, + ", ".join([x.uri for x in queue.enqueued_media_items]), + ) + queue.radio_source = queue.enqueued_media_items + # auto fill radio tracks if less than 5 tracks left in the queue + if ( + queue.radio_source + and queue.current_index is not None + and (queue.items - queue.current_index) < 5 + ): + task_id = f"fill_radio_tracks_{queue_id}" + self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id) + def _get_flow_queue_stream_index( self, queue: PlayerQueue, player: Player ) -> tuple[int | None, int]: @@ -1796,30 +1818,26 @@ class PlayerQueuesController(CoreController): return if prev_item_id is not None and prev_item_id != cur_item_id: # we have a new item, so we need report the previous one - if not (item_to_report := self.get_item(queue.queue_id, prev_item_id)): - # should not happen, but guard it anyway - return - if not (stream_details := item_to_report.streamdetails): - # should not happen, but guard it anyway - return + item_to_report = prev_state["current_item"] seconds_played = int(prev_state["elapsed_time"]) else: # report on current item - if not (item_to_report := self.get_item(queue.queue_id, cur_item_id)): - # should not happen, but guard it anyway - return - if not (stream_details := item_to_report.streamdetails): - # should not happen, but guard it anyway - return + item_to_report = self.get_item(queue.queue_id, cur_item_id) or new_state["current_item"] + if not item_to_report: + return # guard against invalid items seconds_played = int(new_state["elapsed_time"]) - if seconds_played < 30: - # ignore items that have been played less than 30 seconds + if seconds_played < 10: + # ignore items that have been played less than 10 seconds return + if not item_to_report.media_item: # only report on media items return - duration = stream_details.duration or item_to_report.duration or 3600 + if item_to_report.streamdetails and item_to_report.streamdetails.duration: + duration = item_to_report.streamdetails.duration + else: + duration = item_to_report.duration or 3600 fully_played = seconds_played >= (duration or 3600) - 5 self.logger.debug( diff --git a/music_assistant/controllers/players.py b/music_assistant/controllers/players.py index 7cda81c5..d233903d 100644 --- a/music_assistant/controllers/players.py +++ b/music_assistant/controllers/players.py @@ -306,28 +306,22 @@ class PlayerController(CoreController): """Handle NEXT TRACK command for given player.""" player = self._get_player_with_redirect(player_id) active_source_id = player.active_source or player.player_id - supports_native_skip = PlayerFeature.NEXT_PREVIOUS in player.supported_features - can_native_skip = False + if active_queue := self.mass.player_queues.get(active_source_id): # active source is a MA queue - can_native_skip = supports_native_skip and not active_queue.flow_mode - elif supports_native_skip: - # player has some other source active and native next/previous support - active_source = next((x for x in player.source_list if x.id == active_source_id), None) - can_native_skip = active_source and active_source.can_next_previous - # always prefer native skip, even if player is playing MA queue - if can_native_skip: - player_provider = self.get_player_provider(player.player_id) - await player_provider.cmd_next(player.player_id) - return - # Redirect to queue controller if it is active - # which will result in a new play_media call - if active_queue: await self.mass.player_queues.next(active_queue.queue_id) return - if supports_native_skip: + + if PlayerFeature.NEXT_PREVIOUS in player.supported_features: + # player has some other source active and native next/previous support + active_source = next((x for x in player.source_list if x.id == active_source_id), None) + if active_source and active_source.can_next_previous: + player_provider = self.get_player_provider(player.player_id) + await player_provider.cmd_next(player.player_id) + return msg = "This action is (currently) unavailable for this source." raise PlayerCommandFailed(msg) + msg = f"Player {player.display_name} does not support skipping to the next track." raise UnsupportedFeaturedException(msg) @@ -336,29 +330,22 @@ class PlayerController(CoreController): """Handle PREVIOUS TRACK command for given player.""" player = self._get_player_with_redirect(player_id) active_source_id = player.active_source or player.player_id - supports_native_skip = PlayerFeature.NEXT_PREVIOUS in player.supported_features - can_native_skip = False if active_queue := self.mass.player_queues.get(active_source_id): # active source is a MA queue - can_native_skip = not active_queue.flow_mode - elif supports_native_skip: - # player has some other source active and native next/previous support - active_source = next((x for x in player.source_list if x.id == active_source_id), None) - can_native_skip = active_source and active_source.can_next_previous - # always prefer native skip, even if player is playing MA queue - if can_native_skip: - player_provider = self.get_player_provider(player.player_id) - await player_provider.cmd_previous(player.player_id) - return - # Redirect to queue controller if it is active - # which will result in a new play_media call - if active_queue: await self.mass.player_queues.previous(active_queue.queue_id) return - if supports_native_skip: + + if PlayerFeature.NEXT_PREVIOUS in player.supported_features: + # player has some other source active and native next/previous support + active_source = next((x for x in player.source_list if x.id == active_source_id), None) + if active_source and active_source.can_next_previous: + player_provider = self.get_player_provider(player.player_id) + await player_provider.cmd_previous(player.player_id) + return msg = "This action is (currently) unavailable for this source." raise PlayerCommandFailed(msg) - msg = f"Player {player.display_name} does not support skipping to the next track." + + msg = f"Player {player.display_name} does not support skipping to the previous track." raise UnsupportedFeaturedException(msg) @api_command("players/cmd/power") diff --git a/music_assistant/helpers/util.py b/music_assistant/helpers/util.py index 00e9c8fe..7e791de4 100644 --- a/music_assistant/helpers/util.py +++ b/music_assistant/helpers/util.py @@ -301,15 +301,17 @@ def get_changed_keys( dict1: dict[str, Any], dict2: dict[str, Any], ignore_keys: list[str] | None = None, + recursive: bool = False, ) -> set[str]: """Compare 2 dicts and return set of changed keys.""" - return set(get_changed_values(dict1, dict2, ignore_keys).keys()) + return set(get_changed_values(dict1, dict2, ignore_keys, recursive).keys()) def get_changed_values( dict1: dict[str, Any], dict2: dict[str, Any], ignore_keys: list[str] | None = None, + recursive: bool = False, ) -> dict[str, tuple[Any, Any]]: """ Compare 2 dicts and return dict of changed values. @@ -328,8 +330,12 @@ def get_changed_values( continue if key not in dict1: changed_values[key] = (None, value) - elif isinstance(value, dict): - changed_values.update(get_changed_values(dict1[key], value, ignore_keys)) + elif isinstance(value, dict) or isinstance(dict1[key], dict): + changed_subvalues = get_changed_values(dict1[key], value, ignore_keys, recursive) + if recursive: + changed_values.update(changed_subvalues) + elif changed_subvalues: + changed_values[key] = (dict1[key], value) elif dict1[key] != value: changed_values[key] = (dict1[key], value) return changed_values -- 2.34.1