Chore: small refactor of player and queue update logic
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 11 Feb 2025 20:08:21 +0000 (21:08 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 11 Feb 2025 20:08:21 +0000 (21:08 +0100)
music_assistant/controllers/player_queues.py
music_assistant/controllers/players.py
music_assistant/helpers/util.py

index 6dd05d7299456c4af5222317701082ba9f71c6b9..8c553f772f0d8f8cd3a8b29b7274434fc242ccff 100644 (file)
@@ -101,6 +101,7 @@ class CompareState(TypedDict):
     state: PlayerState
     current_item_id: str | None
     next_item_id: str | None
+    current_item: QueueItem | None
     elapsed_time: int
     stream_title: str | None
     codec_type: ContentType | None
@@ -806,12 +807,19 @@ class PlayerQueuesController(CoreController):
         # send play_media request to player
         # NOTE that we debounce this a bit to account for someone hitting the next button
         # like a madman. This will prevent the player from being overloaded with requests.
+        async def play_media():
+            await self.mass.players.play_media(
+                player_id=queue_id,
+                media=self.player_media_from_queue_item(queue_item, queue.flow_mode),
+            )
+            await asyncio.sleep(2)
+            setattr(queue, "transitioning", False)  # noqa: B010
+
+        # we set a flag to notify the update logic that we're transitioning to a new track
+        setattr(queue, "transitioning", True)  # noqa: B010
         self.mass.call_later(
-            1 if debounce else 0.1,
-            self.mass.players.play_media,
-            player_id=queue_id,
-            # transform into PlayerMedia to send to the actual player implementation
-            media=self.player_media_from_queue_item(queue_item, queue.flow_mode),
+            1.5 if debounce else 0.1,
+            play_media,
             task_id=f"play_media_{queue_id}",
         )
         self.signal_update(queue_id)
@@ -913,174 +921,25 @@ class PlayerQueuesController(CoreController):
 
         NOTE: This is called every second if the player is playing.
         """
-        if player.player_id not in self._queues:
+        queue_id = player.player_id
+        if (queue := self._queues.get(queue_id)) is None:
             # race condition
             return
         if player.announcement_in_progress:
             # do nothing while the announcement is in progress
             return
-        queue_id = player.player_id
-        player = self.mass.players.get(queue_id)
-        queue = self._queues[queue_id]
-
-        # basic properties
-        queue.display_name = player.display_name
-        queue.available = player.available
-        queue.items = len(self._queue_items[queue_id])
         # determine if this queue is currently active for this player
         queue.active = player.active_source == queue.queue_id
         if not queue.active and queue_id not in self._prev_states:
             queue.state = PlayerState.IDLE
             # return early if the queue is not active and we have no previous state
             return
-
-        # update current item/index from player report
-        if queue.active and queue.state == PlayerState.PLAYING:
-            # NOTE: If the queue is not playing (yet) we will not update the current index
-            # to ensure we keep the previously known the current index
-            if queue.flow_mode:
-                # flow mode active, the player is playing one long stream
-                # so we need to calculate the current index and elapsed time
-                queue.current_index, queue.elapsed_time = self._get_flow_queue_stream_index(
-                    queue, player
-                )
-            elif item_id := self._parse_player_current_item_id(queue_id, player):
-                # normal mode, the player itself will report the current item
-                queue.elapsed_time = int(player.corrected_elapsed_time or 0)
-                queue.current_index = self.index_by_id(queue_id, item_id)
-            queue.elapsed_time_last_updated = time.time()
-            queue.state = player.state or PlayerState.IDLE
-        else:
-            # queue is not active (or not playing)
-            queue.state = player.state or PlayerState.IDLE
-
-        # set current item and next item from the current index
-        queue.current_item = self.get_item(queue_id, queue.current_index)
-        queue.next_item = self._get_next_item(queue_id, queue.current_index)
-
-        # correct elapsed time when seeking
-        if (
-            player.state == PlayerState.PLAYING
-            and not queue.flow_mode
-            and queue.current_item
-            and queue.current_item.streamdetails
-            and queue.current_item.streamdetails.seek_position
-        ):
-            queue.elapsed_time += queue.current_item.streamdetails.seek_position
-
-        prev_state: CompareState = self._prev_states.get(
-            queue_id,
-            CompareState(
-                queue_id=queue_id,
-                state=PlayerState.IDLE,
-                current_item_id=None,
-                next_item_id=None,
-                elapsed_time=0,
-                stream_title=None,
-                output_formats=None,
-            ),
-        )
-
-        # This is enough to detect any changes in the DSPDetails
-        # (so child count changed, or any output format changed)
-        output_formats = []
-        if player.output_format:
-            output_formats.append(str(player.output_format))
-        for child_id in player.group_childs:
-            if (child := self.mass.players.get(child_id)) and child.output_format:
-                output_formats.append(str(child.output_format))
-            else:
-                output_formats.append("unknown")
-
-        # basic throttle: do not send state changed events if queue did not actually change
-        new_state = CompareState(
-            queue_id=queue_id,
-            state=queue.state,
-            current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
-            next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
-            elapsed_time=queue.elapsed_time,
-            stream_title=(
-                queue.current_item.streamdetails.stream_title
-                if queue.current_item and queue.current_item.streamdetails
-                else None
-            ),
-            codec_type=(
-                queue.current_item.streamdetails.audio_format.codec_type
-                if queue.current_item and queue.current_item.streamdetails
-                else None
-            ),
-            output_formats=output_formats,
-        )
-        changed_keys = get_changed_keys(prev_state, new_state)
-        # return early if nothing changed
-        if len(changed_keys) == 0:
+        if getattr(queue, "transitioning", False):
+            # we're currently transitioning to a new track,
+            # ignore updates from the player during this time
             return
-
-        # signal update and store state
-        if changed_keys == {"elapsed_time"}:
-            # do not send full updates if only time was updated
-            self.mass.signal_event(
-                EventType.QUEUE_TIME_UPDATED,
-                object_id=queue_id,
-                data=queue.elapsed_time,
-            )
-        else:
-            self.signal_update(queue_id)
-
-        # store the new state
-        if queue.active:
-            self._prev_states[queue_id] = new_state
-        else:
-            self._prev_states.pop(queue_id, None)
-
-        if "output_formats" in changed_keys:
-            # refresh DSP details since they may have changed
-            dsp = get_stream_dsp_details(self.mass, queue_id)
-            if queue.current_item and queue.current_item.streamdetails:
-                queue.current_item.streamdetails.dsp = dsp
-            if queue.next_item and queue.next_item.streamdetails:
-                queue.next_item.streamdetails.dsp = dsp
-
-        # handle sending a playback progress report
-        # we do this every 30 seconds or when the state changes
-        if (
-            changed_keys.intersection({"state", "current_item_id", "next_item_id"})
-            or int(queue.elapsed_time) % 30 == 0
-        ):
-            self._handle_playback_progress_report(queue, prev_state, new_state)
-
-        # check if we need to clear the queue if we reached the end
-        if "state" in changed_keys and queue.state == PlayerState.IDLE:
-            self._handle_end_of_queue(queue, prev_state, new_state)
-
-        # watch dynamic radio items refill if needed
-        if "current_item_id" in changed_keys:
-            # auto enable radio mode if dont stop the music is enabled
-            if (
-                queue.dont_stop_the_music_enabled
-                and queue.enqueued_media_items
-                and queue.current_index is not None
-                and (queue.items - queue.current_index) <= 1
-            ):
-                # We have received the last item in the queue and Don't stop the music is enabled
-                # set the played media item(s) as radio items (which will refill the queue)
-                # note that this will fail if there are no media items for which we have
-                # a dynamic radio source.
-                self.logger.debug(
-                    "End of queue detected and Don't stop the music is enabled for %s"
-                    " - setting enqueued media items as radio source: %s",
-                    queue.display_name,
-                    ", ".join([x.uri for x in queue.enqueued_media_items]),
-                )
-                queue.radio_source = queue.enqueued_media_items
-            # auto fill radio tracks if less than 5 tracks left in the queue
-            if (
-                queue.radio_source
-                and queue.current_index is not None
-                and (queue.items - queue.current_index) < 5
-            ):
-                task_id = f"fill_radio_tracks_{queue_id}"
-                self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
+        # queue is active and preflight checks passed, update the queue details
+        self._update_queue_from_player(player)
 
     def on_player_remove(self, player_id: str) -> None:
         """Call when a player is removed from the registry."""
@@ -1665,6 +1524,169 @@ class PlayerQueuesController(CoreController):
             )
         return queue_tracks
 
+    def _update_queue_from_player(
+        self,
+        player: Player,
+    ) -> None:
+        """Update the Queue when the player state changed."""
+        queue_id = player.player_id
+        player = self.mass.players.get(queue_id)
+        queue = self._queues[queue_id]
+
+        # basic properties
+        queue.display_name = player.display_name
+        queue.available = player.available
+        queue.items = len(self._queue_items[queue_id])
+
+        queue.state = player.state or PlayerState.IDLE if queue.active else PlayerState.IDLE
+        # update current item/index from player report
+        if queue.active and queue.state in (PlayerState.PLAYING, PlayerState.PAUSED):
+            # NOTE: If the queue is not playing (yet) we will not update the current index
+            # to ensure we keep the previously known current index
+            if queue.flow_mode:
+                # flow mode active, the player is playing one long stream
+                # so we need to calculate the current index and elapsed time
+                current_index, elapsed_time = self._get_flow_queue_stream_index(queue, player)
+            elif item_id := self._parse_player_current_item_id(queue_id, player):
+                # normal mode, the player itself will report the current item
+                elapsed_time = int(player.corrected_elapsed_time or 0)
+                current_index = self.index_by_id(queue_id, item_id)
+            else:
+                # this should not happen but we will handle it gracefully
+                elapsed_time = 0
+                current_index = None
+
+            # get current/next item based on current index
+            queue.current_index = current_index
+            queue.current_item = current_item = self.get_item(queue_id, current_index)
+            queue.next_item = self._get_next_item(queue_id, current_index) if current_item else None
+
+            # correct elapsed time when seeking
+            if (
+                not queue.flow_mode
+                and current_item
+                and current_item.streamdetails
+                and current_item.streamdetails.seek_position
+            ):
+                elapsed_time += current_item.streamdetails.seek_position
+            queue.elapsed_time = elapsed_time
+            queue.elapsed_time_last_updated = time.time()
+
+        # This is enough to detect any changes in the DSPDetails
+        # (so child count changed, or any output format changed)
+        output_formats = []
+        if player.output_format:
+            output_formats.append(str(player.output_format))
+        for child_id in player.group_childs:
+            if (child := self.mass.players.get(child_id)) and child.output_format:
+                output_formats.append(str(child.output_format))
+            else:
+                output_formats.append("unknown")
+
+        # basic throttle: do not send state changed events if queue did not actually change
+        prev_state: CompareState = self._prev_states.get(
+            queue_id,
+            CompareState(
+                queue_id=queue_id,
+                state=PlayerState.IDLE,
+                current_item_id=None,
+                next_item_id=None,
+                current_item=None,
+                elapsed_time=0,
+                stream_title=None,
+                output_formats=None,
+            ),
+        )
+        new_state = CompareState(
+            queue_id=queue_id,
+            state=queue.state,
+            current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
+            next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
+            current_item=queue.current_item,
+            elapsed_time=queue.elapsed_time,
+            stream_title=(
+                queue.current_item.streamdetails.stream_title
+                if queue.current_item and queue.current_item.streamdetails
+                else None
+            ),
+            codec_type=(
+                queue.current_item.streamdetails.audio_format.codec_type
+                if queue.current_item and queue.current_item.streamdetails
+                else None
+            ),
+            output_formats=output_formats,
+        )
+        changed_keys = get_changed_keys(prev_state, new_state, ["next_item"])
+        # return early if nothing changed
+        if len(changed_keys) == 0:
+            return
+
+        # signal update and store state
+        if changed_keys == {"elapsed_time"}:
+            # do not send full updates if only time was updated
+            self.mass.signal_event(
+                EventType.QUEUE_TIME_UPDATED,
+                object_id=queue_id,
+                data=queue.elapsed_time,
+            )
+        else:
+            self.signal_update(queue_id)
+
+        # store the new state
+        if queue.active:
+            self._prev_states[queue_id] = new_state
+        else:
+            self._prev_states.pop(queue_id, None)
+
+        if "output_formats" in changed_keys:
+            # refresh DSP details since they may have changed
+            dsp = get_stream_dsp_details(self.mass, queue_id)
+            if queue.current_item and queue.current_item.streamdetails:
+                queue.current_item.streamdetails.dsp = dsp
+            if queue.next_item and queue.next_item.streamdetails:
+                queue.next_item.streamdetails.dsp = dsp
+
+        # handle sending a playback progress report
+        # we do this every 30 seconds or when the state changes
+        if (
+            changed_keys.intersection({"state", "current_item_id"})
+            or int(queue.elapsed_time) % 30 == 0
+        ):
+            self._handle_playback_progress_report(queue, prev_state, new_state)
+
+        # check if we need to clear the queue if we reached the end
+        if "state" in changed_keys and queue.state == PlayerState.IDLE:
+            self._handle_end_of_queue(queue, prev_state, new_state)
+
+        # watch dynamic radio items refill if needed
+        if "current_item_id" in changed_keys:
+            # auto enable radio mode if dont stop the music is enabled
+            if (
+                queue.dont_stop_the_music_enabled
+                and queue.enqueued_media_items
+                and queue.current_index is not None
+                and (queue.items - queue.current_index) <= 1
+            ):
+                # We have received the last item in the queue and Don't stop the music is enabled
+                # set the played media item(s) as radio items (which will refill the queue)
+                # note that this will fail if there are no media items for which we have
+                # a dynamic radio source.
+                self.logger.debug(
+                    "End of queue detected and Don't stop the music is enabled for %s"
+                    " - setting enqueued media items as radio source: %s",
+                    queue.display_name,
+                    ", ".join([x.uri for x in queue.enqueued_media_items]),
+                )
+                queue.radio_source = queue.enqueued_media_items
+            # auto fill radio tracks if less than 5 tracks left in the queue
+            if (
+                queue.radio_source
+                and queue.current_index is not None
+                and (queue.items - queue.current_index) < 5
+            ):
+                task_id = f"fill_radio_tracks_{queue_id}"
+                self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
+
     def _get_flow_queue_stream_index(
         self, queue: PlayerQueue, player: Player
     ) -> tuple[int | None, int]:
@@ -1796,30 +1818,26 @@ class PlayerQueuesController(CoreController):
             return
         if prev_item_id is not None and prev_item_id != cur_item_id:
             # we have a new item, so we need report the previous one
-            if not (item_to_report := self.get_item(queue.queue_id, prev_item_id)):
-                # should not happen, but guard it anyway
-                return
-            if not (stream_details := item_to_report.streamdetails):
-                # should not happen, but guard it anyway
-                return
+            item_to_report = prev_state["current_item"]
             seconds_played = int(prev_state["elapsed_time"])
         else:
             # report on current item
-            if not (item_to_report := self.get_item(queue.queue_id, cur_item_id)):
-                # should not happen, but guard it anyway
-                return
-            if not (stream_details := item_to_report.streamdetails):
-                # should not happen, but guard it anyway
-                return
+            item_to_report = self.get_item(queue.queue_id, cur_item_id) or new_state["current_item"]
+            if not item_to_report:
+                return  # guard against invalid items
             seconds_played = int(new_state["elapsed_time"])
-            if seconds_played < 30:
-                # ignore items that have been played less than 30 seconds
+            if seconds_played < 10:
+                # ignore items that have been played less than 10 seconds
                 return
+
         if not item_to_report.media_item:
             # only report on media items
             return
 
-        duration = stream_details.duration or item_to_report.duration or 3600
+        if item_to_report.streamdetails and item_to_report.streamdetails.duration:
+            duration = item_to_report.streamdetails.duration
+        else:
+            duration = item_to_report.duration or 3600
         fully_played = seconds_played >= (duration or 3600) - 5
 
         self.logger.debug(
index 7cda81c566730d2bc5323bcd7efb423f5220ac03..d233903d2ace2e841940bd6f963dc622ca9c49b8 100644 (file)
@@ -306,28 +306,22 @@ class PlayerController(CoreController):
         """Handle NEXT TRACK command for given player."""
         player = self._get_player_with_redirect(player_id)
         active_source_id = player.active_source or player.player_id
-        supports_native_skip = PlayerFeature.NEXT_PREVIOUS in player.supported_features
-        can_native_skip = False
+
         if active_queue := self.mass.player_queues.get(active_source_id):
             # active source is a MA queue
-            can_native_skip = supports_native_skip and not active_queue.flow_mode
-        elif supports_native_skip:
-            # player has some other source active and native next/previous support
-            active_source = next((x for x in player.source_list if x.id == active_source_id), None)
-            can_native_skip = active_source and active_source.can_next_previous
-        # always prefer native skip, even if player is playing MA queue
-        if can_native_skip:
-            player_provider = self.get_player_provider(player.player_id)
-            await player_provider.cmd_next(player.player_id)
-            return
-        # Redirect to queue controller if it is active
-        # which will result in a new play_media call
-        if active_queue:
             await self.mass.player_queues.next(active_queue.queue_id)
             return
-        if supports_native_skip:
+
+        if PlayerFeature.NEXT_PREVIOUS in player.supported_features:
+            # player has some other source active and native next/previous support
+            active_source = next((x for x in player.source_list if x.id == active_source_id), None)
+            if active_source and active_source.can_next_previous:
+                player_provider = self.get_player_provider(player.player_id)
+                await player_provider.cmd_next(player.player_id)
+                return
             msg = "This action is (currently) unavailable for this source."
             raise PlayerCommandFailed(msg)
+
         msg = f"Player {player.display_name} does not support skipping to the next track."
         raise UnsupportedFeaturedException(msg)
 
@@ -336,29 +330,22 @@ class PlayerController(CoreController):
         """Handle PREVIOUS TRACK command for given player."""
         player = self._get_player_with_redirect(player_id)
         active_source_id = player.active_source or player.player_id
-        supports_native_skip = PlayerFeature.NEXT_PREVIOUS in player.supported_features
-        can_native_skip = False
         if active_queue := self.mass.player_queues.get(active_source_id):
             # active source is a MA queue
-            can_native_skip = not active_queue.flow_mode
-        elif supports_native_skip:
-            # player has some other source active and native next/previous support
-            active_source = next((x for x in player.source_list if x.id == active_source_id), None)
-            can_native_skip = active_source and active_source.can_next_previous
-        # always prefer native skip, even if player is playing MA queue
-        if can_native_skip:
-            player_provider = self.get_player_provider(player.player_id)
-            await player_provider.cmd_previous(player.player_id)
-            return
-        # Redirect to queue controller if it is active
-        # which will result in a new play_media call
-        if active_queue:
             await self.mass.player_queues.previous(active_queue.queue_id)
             return
-        if supports_native_skip:
+
+        if PlayerFeature.NEXT_PREVIOUS in player.supported_features:
+            # player has some other source active and native next/previous support
+            active_source = next((x for x in player.source_list if x.id == active_source_id), None)
+            if active_source and active_source.can_next_previous:
+                player_provider = self.get_player_provider(player.player_id)
+                await player_provider.cmd_previous(player.player_id)
+                return
             msg = "This action is (currently) unavailable for this source."
             raise PlayerCommandFailed(msg)
-        msg = f"Player {player.display_name} does not support skipping to the next track."
+
+        msg = f"Player {player.display_name} does not support skipping to the previous track."
         raise UnsupportedFeaturedException(msg)
 
     @api_command("players/cmd/power")
index 00e9c8fe9b174f904f8f50133cb1692240fa4a27..7e791de434414554edb4963199de4e13f936f285 100644 (file)
@@ -301,15 +301,17 @@ def get_changed_keys(
     dict1: dict[str, Any],
     dict2: dict[str, Any],
     ignore_keys: list[str] | None = None,
+    recursive: bool = False,
 ) -> set[str]:
     """Compare 2 dicts and return set of changed keys."""
-    return set(get_changed_values(dict1, dict2, ignore_keys).keys())
+    return set(get_changed_values(dict1, dict2, ignore_keys, recursive).keys())
 
 
 def get_changed_values(
     dict1: dict[str, Any],
     dict2: dict[str, Any],
     ignore_keys: list[str] | None = None,
+    recursive: bool = False,
 ) -> dict[str, tuple[Any, Any]]:
     """
     Compare 2 dicts and return dict of changed values.
@@ -328,8 +330,12 @@ def get_changed_values(
             continue
         if key not in dict1:
             changed_values[key] = (None, value)
-        elif isinstance(value, dict):
-            changed_values.update(get_changed_values(dict1[key], value, ignore_keys))
+        elif isinstance(value, dict) or isinstance(dict1[key], dict):
+            changed_subvalues = get_changed_values(dict1[key], value, ignore_keys, recursive)
+            if recursive:
+                changed_values.update(changed_subvalues)
+            elif changed_subvalues:
+                changed_values[key] = (dict1[key], value)
         elif dict1[key] != value:
             changed_values[key] = (dict1[key], value)
     return changed_values