Handle better locking of play commands
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 11 Dec 2025 00:10:53 +0000 (01:10 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 11 Dec 2025 00:10:53 +0000 (01:10 +0100)
music_assistant/controllers/player_queues.py
music_assistant/controllers/players/player_controller.py
music_assistant/mass.py

index 41a4a3d23837ba9328cdf0749acc1ed521ba959b..60f436b8b76b45e1a7cdd9d24f1531c93d317539 100644 (file)
@@ -876,9 +876,6 @@ class PlayerQueuesController(CoreController):
         self.signal_update(queue_id)
         queue.index_in_buffer = index
         queue.flow_mode_stream_log = []
-        prefer_flow_mode = await self.mass.config.get_player_config_value(
-            queue_id, CONF_FLOW_MODE, return_type=bool
-        )
         target_player = self.mass.players.get(queue_id)
         if target_player is None:
             raise PlayerUnavailableError(f"Player {queue_id} is not available")
@@ -898,8 +895,6 @@ class PlayerQueuesController(CoreController):
         # NOTE that we debounce this a bit to account for someone hitting the next button
         # like a madman. This will prevent the player from being overloaded with requests.
         async def _play_index(index: int, debounce: bool) -> None:
-            if debounce:
-                await asyncio.sleep(0.25)
             for attempt in range(5):
                 try:
                     queue_item = self.get_item(queue_id, index)
@@ -933,6 +928,10 @@ class PlayerQueuesController(CoreController):
                 # all attempts to find a playable item failed
                 raise MediaNotFoundError("No playable item found to start playback")
 
+            # work out if we need to use flow mode
+            prefer_flow_mode = await self.mass.config.get_player_config_value(
+                queue_id, CONF_FLOW_MODE, default=False
+            )
             flow_mode = (
                 prefer_flow_mode or not enqueue_supported
             ) and queue_item.media_type not in (
@@ -940,27 +939,35 @@ class PlayerQueuesController(CoreController):
                 MediaType.RADIO,
                 MediaType.PLUGIN_SOURCE,
             )
-            if debounce:
-                await asyncio.sleep(0.25)
+            await asyncio.sleep(0.5 if debounce else 0.1)
             queue.flow_mode = flow_mode
             await self.mass.players.play_media(
                 player_id=queue_id,
                 media=await self.player_media_from_queue_item(queue_item, flow_mode),
             )
+            queue.current_index = index
+            queue.current_item = queue_item
             await asyncio.sleep(2)
             self._transitioning_players.discard(queue_id)
 
         # we set a flag to notify the update logic that we're transitioning to a new track
         self._transitioning_players.add(queue_id)
+
+        # we debounce the play_index command to handle the case where someone
+        # is spamming next/previous on the player
+        task_id = f"play_index_{queue_id}"
+        if existing_task := self.mass.get_task(task_id):
+            existing_task.cancel()
+            with suppress(asyncio.CancelledError):
+                await existing_task
         task = self.mass.create_task(
             _play_index,
             index,
             debounce,
-            task_id=f"play_media_{queue_id}",
-            abort_existing=True,
+            task_id=task_id,
         )
-        self.signal_update(queue_id)
         await task
+        self.signal_update(queue_id)
 
     @api_command("player_queues/transfer")
     async def transfer_queue(
index ade539accdea04546857f5a0bed15142057e446d..706464fc7cf07180d9e32746e3a249e14a88286f 100644 (file)
@@ -21,7 +21,7 @@ import functools
 import time
 from collections.abc import Awaitable, Callable, Coroutine
 from contextlib import suppress
-from typing import TYPE_CHECKING, Any, Concatenate, TypedDict, cast
+from typing import TYPE_CHECKING, Any, Concatenate, TypedDict, cast, overload
 
 from music_assistant_models.constants import (
     PLAYER_CONTROL_FAKE,
@@ -102,46 +102,97 @@ class AnnounceData(TypedDict):
     pre_announce_url: str
 
 
+@overload
 def handle_player_command[PlayerControllerT: "PlayerController", **P, R](
     func: Callable[Concatenate[PlayerControllerT, P], Awaitable[R]],
-) -> Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]]:
-    """Check and log commands to players."""
-
-    @functools.wraps(func)
-    async def wrapper(self: PlayerControllerT, *args: P.args, **kwargs: P.kwargs) -> None:
-        """Log and handle_player_command commands to players."""
-        player_id = kwargs.get("player_id") or args[0]
-        assert isinstance(player_id, str)  # for type checking
-        if (player := self._players.get(player_id)) is None or not player.available:
-            # player not existent
-            self.logger.warning(
-                "Ignoring command %s for unavailable player %s",
-                func.__name__,
-                player_id,
+) -> Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]]: ...
+
+
+@overload
+def handle_player_command[PlayerControllerT: "PlayerController", **P, R](
+    func: None = None,
+    *,
+    lock: bool = False,
+) -> Callable[
+    [Callable[Concatenate[PlayerControllerT, P], Awaitable[R]]],
+    Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]],
+]: ...
+
+
+def handle_player_command[PlayerControllerT: "PlayerController", **P, R](
+    func: Callable[Concatenate[PlayerControllerT, P], Awaitable[R]] | None = None,
+    *,
+    lock: bool = False,
+) -> (
+    Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]]
+    | Callable[
+        [Callable[Concatenate[PlayerControllerT, P], Awaitable[R]]],
+        Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]],
+    ]
+):
+    """Check and log commands to players.
+
+    :param func: The function to wrap (when used without parentheses).
+    :param lock: If True, acquire a lock per player_id and function name before executing.
+    """
+
+    def decorator(
+        fn: Callable[Concatenate[PlayerControllerT, P], Awaitable[R]],
+    ) -> Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]]:
+        @functools.wraps(fn)
+        async def wrapper(self: PlayerControllerT, *args: P.args, **kwargs: P.kwargs) -> None:
+            """Log and handle_player_command commands to players."""
+            player_id = kwargs.get("player_id") or args[0]
+            assert isinstance(player_id, str)  # for type checking
+            if (player := self._players.get(player_id)) is None or not player.available:
+                # player not existent
+                self.logger.warning(
+                    "Ignoring command %s for unavailable player %s",
+                    fn.__name__,
+                    player_id,
+                )
+                return
+
+            current_user = get_current_user()
+            if (
+                current_user
+                and current_user.player_filter
+                and player.player_id not in current_user.player_filter
+            ):
+                msg = (
+                    f"{current_user.username} does not have access to player {player.display_name}"
+                )
+                raise InsufficientPermissions(msg)
+
+            self.logger.debug(
+                "Handling command %s for player %s (%s)",
+                fn.__name__,
+                player.display_name,
+                f"by user {current_user.username}" if current_user else "unauthenticated",
             )
-            return
 
-        current_user = get_current_user()
-        if (
-            current_user
-            and current_user.player_filter
-            and player.player_id not in current_user.player_filter
-        ):
-            msg = f"{current_user.username} does not have access to player {player.display_name}"
-            raise InsufficientPermissions(msg)
+            async def execute() -> None:
+                try:
+                    await fn(self, *args, **kwargs)
+                except Exception as err:
+                    raise PlayerCommandFailed(str(err)) from err
+
+            if lock:
+                # Acquire a lock specific to player_id and function name
+                lock_key = f"{fn.__name__}_{player_id}"
+                if lock_key not in self._player_command_locks:
+                    self._player_command_locks[lock_key] = asyncio.Lock()
+                async with self._player_command_locks[lock_key]:
+                    await execute()
+            else:
+                await execute()
 
-        self.logger.debug(
-            "Handling command %s for player %s (%s)",
-            func.__name__,
-            player.display_name,
-            f"by user {current_user.username}" if current_user else "unauthenticated",
-        )
-        try:
-            await func(self, *args, **kwargs)
-        except Exception as err:
-            raise PlayerCommandFailed(str(err)) from err
+        return wrapper
 
-    return wrapper
+    # Support both @handle_player_command and @handle_player_command(lock=True)
+    if func is not None:
+        return decorator(func)
+    return decorator
 
 
 class PlayerController(CoreController):
@@ -161,7 +212,7 @@ class PlayerController(CoreController):
         self.manifest.icon = "speaker-multiple"
         self._poll_task: asyncio.Task[None] | None = None
         self._player_throttlers: dict[str, Throttler] = {}
-        self._announce_locks: dict[str, asyncio.Lock] = {}
+        self._player_command_locks: dict[str, asyncio.Lock] = {}
         self._sync_groups: SyncGroupController = SyncGroupController(self)
 
     async def setup(self, config: CoreConfig) -> None:
@@ -376,7 +427,7 @@ class PlayerController(CoreController):
                 await player.stop()
 
     @api_command("players/cmd/play")
-    @handle_player_command
+    @handle_player_command(lock=True)
     async def cmd_play(self, player_id: str) -> None:
         """Send PLAY (unpause) command to given player.
 
@@ -413,7 +464,7 @@ class PlayerController(CoreController):
             await self._handle_cmd_resume(player.player_id)
 
     @api_command("players/cmd/pause")
-    @handle_player_command
+    @handle_player_command(lock=True)
     async def cmd_pause(self, player_id: str) -> None:
         """Send PAUSE command to given player.
 
@@ -571,7 +622,7 @@ class PlayerController(CoreController):
         raise UnsupportedFeaturedException(msg)
 
     @api_command("players/cmd/power")
-    @handle_player_command
+    @handle_player_command(lock=True)
     async def cmd_power(self, player_id: str, powered: bool) -> None:
         """Send POWER command to given player.
 
@@ -581,7 +632,7 @@ class PlayerController(CoreController):
         await self._handle_cmd_power(player_id, powered)
 
     @api_command("players/cmd/volume_set")
-    @handle_player_command
+    @handle_player_command(lock=True)
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
         """Send VOLUME_SET command to given player.
 
@@ -629,7 +680,7 @@ class PlayerController(CoreController):
         await self.cmd_volume_set(player_id, new_volume)
 
     @api_command("players/cmd/group_volume")
-    @handle_player_command
+    @handle_player_command(lock=True)
     async def cmd_group_volume(
         self,
         player_id: str,
@@ -743,6 +794,7 @@ class PlayerController(CoreController):
                 await player_control.mute_set(muted)
 
     @api_command("players/cmd/play_announcement")
+    @handle_player_command(lock=True)
     async def play_announcement(
         self,
         player_id: str,
@@ -770,90 +822,80 @@ class PlayerController(CoreController):
             and not validate_announcement_chime_url(pre_announce_url)
         ):
             raise PlayerCommandFailed("Invalid pre-announce chime URL specified.")
-        # prevent multiple announcements at the same time to the same player with a lock
-        if player_id not in self._announce_locks:
-            self._announce_locks[player_id] = lock = asyncio.Lock()
-        else:
-            lock = self._announce_locks[player_id]
-        async with lock:
-            try:
-                # mark announcement_in_progress on player
-                player.extra_data[ATTR_ANNOUNCEMENT_IN_PROGRESS] = True
-                # determine if the player has native announcements support
-                native_announce_support = (
-                    PlayerFeature.PLAY_ANNOUNCEMENT in player.supported_features
+        try:
+            # mark announcement_in_progress on player
+            player.extra_data[ATTR_ANNOUNCEMENT_IN_PROGRESS] = True
+            # determine if the player has native announcements support
+            native_announce_support = PlayerFeature.PLAY_ANNOUNCEMENT in player.supported_features
+            # determine pre-announce from (group)player config
+            if pre_announce is None and "tts" in url:
+                conf_pre_announce = self.mass.config.get_raw_player_config_value(
+                    player_id,
+                    CONF_ENTRY_TTS_PRE_ANNOUNCE.key,
+                    CONF_ENTRY_TTS_PRE_ANNOUNCE.default_value,
                 )
-                # determine pre-announce from (group)player config
-                if pre_announce is None and "tts" in url:
-                    conf_pre_announce = self.mass.config.get_raw_player_config_value(
-                        player_id,
-                        CONF_ENTRY_TTS_PRE_ANNOUNCE.key,
-                        CONF_ENTRY_TTS_PRE_ANNOUNCE.default_value,
-                    )
-                    pre_announce = cast("bool", conf_pre_announce)
-                if pre_announce_url is None:
-                    if conf_pre_announce_url := self.mass.config.get_raw_player_config_value(
-                        player_id,
-                        CONF_PRE_ANNOUNCE_CHIME_URL,
-                    ):
-                        # player default custom chime url
-                        pre_announce_url = cast("str", conf_pre_announce_url)
-                    else:
-                        # use global default chime url
-                        pre_announce_url = ANNOUNCE_ALERT_FILE
-                # if player type is group with all members supporting announcements,
-                # we forward the request to each individual player
-                if player.type == PlayerType.GROUP and (
-                    all(
-                        PlayerFeature.PLAY_ANNOUNCEMENT in x.supported_features
-                        for x in self.iter_group_members(player)
-                    )
+                pre_announce = cast("bool", conf_pre_announce)
+            if pre_announce_url is None:
+                if conf_pre_announce_url := self.mass.config.get_raw_player_config_value(
+                    player_id,
+                    CONF_PRE_ANNOUNCE_CHIME_URL,
                 ):
-                    # forward the request to each individual player
-                    async with TaskManager(self.mass) as tg:
-                        for group_member in player.group_members:
-                            tg.create_task(
-                                self.play_announcement(
-                                    group_member,
-                                    url=url,
-                                    pre_announce=pre_announce,
-                                    volume_level=volume_level,
-                                    pre_announce_url=pre_announce_url,
-                                )
-                            )
-                    return
-                self.logger.info(
-                    "Playback announcement to player %s (with pre-announce: %s): %s",
-                    player.display_name,
-                    pre_announce,
-                    url,
-                )
-                # create a PlayerMedia object for the announcement so
-                # we can send a regular play-media call downstream
-                announce_data = AnnounceData(
-                    announcement_url=url,
-                    pre_announce=bool(pre_announce or False),
-                    pre_announce_url=pre_announce_url,
-                )
-                announcement = PlayerMedia(
-                    uri=self.mass.streams.get_announcement_url(
-                        player_id, announce_data=announce_data
-                    ),
-                    media_type=MediaType.ANNOUNCEMENT,
-                    title="Announcement",
-                    custom_data=dict(announce_data),
+                    # player default custom chime url
+                    pre_announce_url = cast("str", conf_pre_announce_url)
+                else:
+                    # use global default chime url
+                    pre_announce_url = ANNOUNCE_ALERT_FILE
+            # if player type is group with all members supporting announcements,
+            # we forward the request to each individual player
+            if player.type == PlayerType.GROUP and (
+                all(
+                    PlayerFeature.PLAY_ANNOUNCEMENT in x.supported_features
+                    for x in self.iter_group_members(player)
                 )
-                # handle native announce support
-                if native_announce_support:
-                    announcement_volume = self.get_announcement_volume(player_id, volume_level)
-                    await player.play_announcement(announcement, announcement_volume)
-                    return
-                # use fallback/default implementation
-                await self._play_announcement(player, announcement, volume_level)
-            finally:
-                player.extra_data[ATTR_ANNOUNCEMENT_IN_PROGRESS] = False
+            ):
+                # forward the request to each individual player
+                async with TaskManager(self.mass) as tg:
+                    for group_member in player.group_members:
+                        tg.create_task(
+                            self.play_announcement(
+                                group_member,
+                                url=url,
+                                pre_announce=pre_announce,
+                                volume_level=volume_level,
+                                pre_announce_url=pre_announce_url,
+                            )
+                        )
+                return
+            self.logger.info(
+                "Playback announcement to player %s (with pre-announce: %s): %s",
+                player.display_name,
+                pre_announce,
+                url,
+            )
+            # create a PlayerMedia object for the announcement so
+            # we can send a regular play-media call downstream
+            announce_data = AnnounceData(
+                announcement_url=url,
+                pre_announce=bool(pre_announce or False),
+                pre_announce_url=pre_announce_url,
+            )
+            announcement = PlayerMedia(
+                uri=self.mass.streams.get_announcement_url(player_id, announce_data=announce_data),
+                media_type=MediaType.ANNOUNCEMENT,
+                title="Announcement",
+                custom_data=dict(announce_data),
+            )
+            # handle native announce support
+            if native_announce_support:
+                announcement_volume = self.get_announcement_volume(player_id, volume_level)
+                await player.play_announcement(announcement, announcement_volume)
+                return
+            # use fallback/default implementation
+            await self._play_announcement(player, announcement, volume_level)
+        finally:
+            player.extra_data[ATTR_ANNOUNCEMENT_IN_PROGRESS] = False
 
-    @handle_player_command
+    @handle_player_command(lock=True)
     async def play_media(self, player_id: str, media: PlayerMedia) -> None:
         """Handle PLAY MEDIA on given player.
 
@@ -869,6 +911,7 @@ class PlayerController(CoreController):
         await player.play_media(media)
 
     @api_command("players/cmd/select_source")
+    @handle_player_command(lock=True)
     async def select_source(self, player_id: str, source: str | None) -> None:
         """
         Handle SELECT SOURCE command on given player.
@@ -914,6 +957,7 @@ class PlayerController(CoreController):
         # forward to player
         await player.select_source(source)
 
+    @handle_player_command(lock=True)
     async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
         """
         Handle enqueuing of a next media item on the player.
index 475c941315b3dbc5581229dc541043f46c0f44e5..584ab43b0c0126a3e1cec7c538425ae39d5eddb3 100644 (file)
@@ -499,13 +499,12 @@ class MusicAssistant:
         self._tracked_timers[task_id] = handle
         return handle
 
-    def get_task(self, task_id: str) -> asyncio.Task[Any]:
+    def get_task(self, task_id: str) -> asyncio.Task[Any] | None:
         """Get existing scheduled task."""
         if existing := self._tracked_tasks.get(task_id):
             # prevent duplicate tasks if task_id is given and already present
             return existing
-        msg = "Task does not exist"
-        raise KeyError(msg)
+        return None
 
     def cancel_task(self, task_id: str) -> None:
         """Cancel existing scheduled task."""