From ab1bc7355ca04a4cac69e7776a6a24a6cbb840d9 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Thu, 11 Dec 2025 01:10:53 +0100 Subject: [PATCH] Handle better locking of play commands --- music_assistant/controllers/player_queues.py | 27 +- .../controllers/players/player_controller.py | 286 ++++++++++-------- music_assistant/mass.py | 5 +- 3 files changed, 184 insertions(+), 134 deletions(-) diff --git a/music_assistant/controllers/player_queues.py b/music_assistant/controllers/player_queues.py index 41a4a3d2..60f436b8 100644 --- a/music_assistant/controllers/player_queues.py +++ b/music_assistant/controllers/player_queues.py @@ -876,9 +876,6 @@ class PlayerQueuesController(CoreController): self.signal_update(queue_id) queue.index_in_buffer = index queue.flow_mode_stream_log = [] - prefer_flow_mode = await self.mass.config.get_player_config_value( - queue_id, CONF_FLOW_MODE, return_type=bool - ) target_player = self.mass.players.get(queue_id) if target_player is None: raise PlayerUnavailableError(f"Player {queue_id} is not available") @@ -898,8 +895,6 @@ class PlayerQueuesController(CoreController): # NOTE that we debounce this a bit to account for someone hitting the next button # like a madman. This will prevent the player from being overloaded with requests. async def _play_index(index: int, debounce: bool) -> None: - if debounce: - await asyncio.sleep(0.25) for attempt in range(5): try: queue_item = self.get_item(queue_id, index) @@ -933,6 +928,10 @@ class PlayerQueuesController(CoreController): # all attempts to find a playable item failed raise MediaNotFoundError("No playable item found to start playback") + # work out if we need to use flow mode + prefer_flow_mode = await self.mass.config.get_player_config_value( + queue_id, CONF_FLOW_MODE, default=False + ) flow_mode = ( prefer_flow_mode or not enqueue_supported ) and queue_item.media_type not in ( @@ -940,27 +939,35 @@ class PlayerQueuesController(CoreController): MediaType.RADIO, MediaType.PLUGIN_SOURCE, ) - if debounce: - await asyncio.sleep(0.25) + await asyncio.sleep(0.5 if debounce else 0.1) queue.flow_mode = flow_mode await self.mass.players.play_media( player_id=queue_id, media=await self.player_media_from_queue_item(queue_item, flow_mode), ) + queue.current_index = index + queue.current_item = queue_item await asyncio.sleep(2) self._transitioning_players.discard(queue_id) # we set a flag to notify the update logic that we're transitioning to a new track self._transitioning_players.add(queue_id) + + # we debounce the play_index command to handle the case where someone + # is spamming next/previous on the player + task_id = f"play_index_{queue_id}" + if existing_task := self.mass.get_task(task_id): + existing_task.cancel() + with suppress(asyncio.CancelledError): + await existing_task task = self.mass.create_task( _play_index, index, debounce, - task_id=f"play_media_{queue_id}", - abort_existing=True, + task_id=task_id, ) - self.signal_update(queue_id) await task + self.signal_update(queue_id) @api_command("player_queues/transfer") async def transfer_queue( diff --git a/music_assistant/controllers/players/player_controller.py b/music_assistant/controllers/players/player_controller.py index ade539ac..706464fc 100644 --- a/music_assistant/controllers/players/player_controller.py +++ b/music_assistant/controllers/players/player_controller.py @@ -21,7 +21,7 @@ import functools import time from collections.abc import Awaitable, Callable, Coroutine from contextlib import suppress -from typing import TYPE_CHECKING, Any, Concatenate, TypedDict, cast +from typing import TYPE_CHECKING, Any, Concatenate, TypedDict, cast, overload from music_assistant_models.constants import ( PLAYER_CONTROL_FAKE, @@ -102,46 +102,97 @@ class AnnounceData(TypedDict): pre_announce_url: str +@overload def handle_player_command[PlayerControllerT: "PlayerController", **P, R]( func: Callable[Concatenate[PlayerControllerT, P], Awaitable[R]], -) -> Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]]: - """Check and log commands to players.""" - - @functools.wraps(func) - async def wrapper(self: PlayerControllerT, *args: P.args, **kwargs: P.kwargs) -> None: - """Log and handle_player_command commands to players.""" - player_id = kwargs.get("player_id") or args[0] - assert isinstance(player_id, str) # for type checking - if (player := self._players.get(player_id)) is None or not player.available: - # player not existent - self.logger.warning( - "Ignoring command %s for unavailable player %s", - func.__name__, - player_id, +) -> Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]]: ... + + +@overload +def handle_player_command[PlayerControllerT: "PlayerController", **P, R]( + func: None = None, + *, + lock: bool = False, +) -> Callable[ + [Callable[Concatenate[PlayerControllerT, P], Awaitable[R]]], + Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]], +]: ... + + +def handle_player_command[PlayerControllerT: "PlayerController", **P, R]( + func: Callable[Concatenate[PlayerControllerT, P], Awaitable[R]] | None = None, + *, + lock: bool = False, +) -> ( + Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]] + | Callable[ + [Callable[Concatenate[PlayerControllerT, P], Awaitable[R]]], + Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]], + ] +): + """Check and log commands to players. + + :param func: The function to wrap (when used without parentheses). + :param lock: If True, acquire a lock per player_id and function name before executing. + """ + + def decorator( + fn: Callable[Concatenate[PlayerControllerT, P], Awaitable[R]], + ) -> Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]]: + @functools.wraps(fn) + async def wrapper(self: PlayerControllerT, *args: P.args, **kwargs: P.kwargs) -> None: + """Log and handle_player_command commands to players.""" + player_id = kwargs.get("player_id") or args[0] + assert isinstance(player_id, str) # for type checking + if (player := self._players.get(player_id)) is None or not player.available: + # player not existent + self.logger.warning( + "Ignoring command %s for unavailable player %s", + fn.__name__, + player_id, + ) + return + + current_user = get_current_user() + if ( + current_user + and current_user.player_filter + and player.player_id not in current_user.player_filter + ): + msg = ( + f"{current_user.username} does not have access to player {player.display_name}" + ) + raise InsufficientPermissions(msg) + + self.logger.debug( + "Handling command %s for player %s (%s)", + fn.__name__, + player.display_name, + f"by user {current_user.username}" if current_user else "unauthenticated", ) - return - current_user = get_current_user() - if ( - current_user - and current_user.player_filter - and player.player_id not in current_user.player_filter - ): - msg = f"{current_user.username} does not have access to player {player.display_name}" - raise InsufficientPermissions(msg) + async def execute() -> None: + try: + await fn(self, *args, **kwargs) + except Exception as err: + raise PlayerCommandFailed(str(err)) from err + + if lock: + # Acquire a lock specific to player_id and function name + lock_key = f"{fn.__name__}_{player_id}" + if lock_key not in self._player_command_locks: + self._player_command_locks[lock_key] = asyncio.Lock() + async with self._player_command_locks[lock_key]: + await execute() + else: + await execute() - self.logger.debug( - "Handling command %s for player %s (%s)", - func.__name__, - player.display_name, - f"by user {current_user.username}" if current_user else "unauthenticated", - ) - try: - await func(self, *args, **kwargs) - except Exception as err: - raise PlayerCommandFailed(str(err)) from err + return wrapper - return wrapper + # Support both @handle_player_command and @handle_player_command(lock=True) + if func is not None: + return decorator(func) + return decorator class PlayerController(CoreController): @@ -161,7 +212,7 @@ class PlayerController(CoreController): self.manifest.icon = "speaker-multiple" self._poll_task: asyncio.Task[None] | None = None self._player_throttlers: dict[str, Throttler] = {} - self._announce_locks: dict[str, asyncio.Lock] = {} + self._player_command_locks: dict[str, asyncio.Lock] = {} self._sync_groups: SyncGroupController = SyncGroupController(self) async def setup(self, config: CoreConfig) -> None: @@ -376,7 +427,7 @@ class PlayerController(CoreController): await player.stop() @api_command("players/cmd/play") - @handle_player_command + @handle_player_command(lock=True) async def cmd_play(self, player_id: str) -> None: """Send PLAY (unpause) command to given player. @@ -413,7 +464,7 @@ class PlayerController(CoreController): await self._handle_cmd_resume(player.player_id) @api_command("players/cmd/pause") - @handle_player_command + @handle_player_command(lock=True) async def cmd_pause(self, player_id: str) -> None: """Send PAUSE command to given player. @@ -571,7 +622,7 @@ class PlayerController(CoreController): raise UnsupportedFeaturedException(msg) @api_command("players/cmd/power") - @handle_player_command + @handle_player_command(lock=True) async def cmd_power(self, player_id: str, powered: bool) -> None: """Send POWER command to given player. @@ -581,7 +632,7 @@ class PlayerController(CoreController): await self._handle_cmd_power(player_id, powered) @api_command("players/cmd/volume_set") - @handle_player_command + @handle_player_command(lock=True) async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player. @@ -629,7 +680,7 @@ class PlayerController(CoreController): await self.cmd_volume_set(player_id, new_volume) @api_command("players/cmd/group_volume") - @handle_player_command + @handle_player_command(lock=True) async def cmd_group_volume( self, player_id: str, @@ -743,6 +794,7 @@ class PlayerController(CoreController): await player_control.mute_set(muted) @api_command("players/cmd/play_announcement") + @handle_player_command(lock=True) async def play_announcement( self, player_id: str, @@ -770,90 +822,80 @@ class PlayerController(CoreController): and not validate_announcement_chime_url(pre_announce_url) ): raise PlayerCommandFailed("Invalid pre-announce chime URL specified.") - # prevent multiple announcements at the same time to the same player with a lock - if player_id not in self._announce_locks: - self._announce_locks[player_id] = lock = asyncio.Lock() - else: - lock = self._announce_locks[player_id] - async with lock: - try: - # mark announcement_in_progress on player - player.extra_data[ATTR_ANNOUNCEMENT_IN_PROGRESS] = True - # determine if the player has native announcements support - native_announce_support = ( - PlayerFeature.PLAY_ANNOUNCEMENT in player.supported_features + try: + # mark announcement_in_progress on player + player.extra_data[ATTR_ANNOUNCEMENT_IN_PROGRESS] = True + # determine if the player has native announcements support + native_announce_support = PlayerFeature.PLAY_ANNOUNCEMENT in player.supported_features + # determine pre-announce from (group)player config + if pre_announce is None and "tts" in url: + conf_pre_announce = self.mass.config.get_raw_player_config_value( + player_id, + CONF_ENTRY_TTS_PRE_ANNOUNCE.key, + CONF_ENTRY_TTS_PRE_ANNOUNCE.default_value, ) - # determine pre-announce from (group)player config - if pre_announce is None and "tts" in url: - conf_pre_announce = self.mass.config.get_raw_player_config_value( - player_id, - CONF_ENTRY_TTS_PRE_ANNOUNCE.key, - CONF_ENTRY_TTS_PRE_ANNOUNCE.default_value, - ) - pre_announce = cast("bool", conf_pre_announce) - if pre_announce_url is None: - if conf_pre_announce_url := self.mass.config.get_raw_player_config_value( - player_id, - CONF_PRE_ANNOUNCE_CHIME_URL, - ): - # player default custom chime url - pre_announce_url = cast("str", conf_pre_announce_url) - else: - # use global default chime url - pre_announce_url = ANNOUNCE_ALERT_FILE - # if player type is group with all members supporting announcements, - # we forward the request to each individual player - if player.type == PlayerType.GROUP and ( - all( - PlayerFeature.PLAY_ANNOUNCEMENT in x.supported_features - for x in self.iter_group_members(player) - ) + pre_announce = cast("bool", conf_pre_announce) + if pre_announce_url is None: + if conf_pre_announce_url := self.mass.config.get_raw_player_config_value( + player_id, + CONF_PRE_ANNOUNCE_CHIME_URL, ): - # forward the request to each individual player - async with TaskManager(self.mass) as tg: - for group_member in player.group_members: - tg.create_task( - self.play_announcement( - group_member, - url=url, - pre_announce=pre_announce, - volume_level=volume_level, - pre_announce_url=pre_announce_url, - ) - ) - return - self.logger.info( - "Playback announcement to player %s (with pre-announce: %s): %s", - player.display_name, - pre_announce, - url, - ) - # create a PlayerMedia object for the announcement so - # we can send a regular play-media call downstream - announce_data = AnnounceData( - announcement_url=url, - pre_announce=bool(pre_announce or False), - pre_announce_url=pre_announce_url, - ) - announcement = PlayerMedia( - uri=self.mass.streams.get_announcement_url( - player_id, announce_data=announce_data - ), - media_type=MediaType.ANNOUNCEMENT, - title="Announcement", - custom_data=dict(announce_data), + # player default custom chime url + pre_announce_url = cast("str", conf_pre_announce_url) + else: + # use global default chime url + pre_announce_url = ANNOUNCE_ALERT_FILE + # if player type is group with all members supporting announcements, + # we forward the request to each individual player + if player.type == PlayerType.GROUP and ( + all( + PlayerFeature.PLAY_ANNOUNCEMENT in x.supported_features + for x in self.iter_group_members(player) ) - # handle native announce support - if native_announce_support: - announcement_volume = self.get_announcement_volume(player_id, volume_level) - await player.play_announcement(announcement, announcement_volume) - return - # use fallback/default implementation - await self._play_announcement(player, announcement, volume_level) - finally: - player.extra_data[ATTR_ANNOUNCEMENT_IN_PROGRESS] = False + ): + # forward the request to each individual player + async with TaskManager(self.mass) as tg: + for group_member in player.group_members: + tg.create_task( + self.play_announcement( + group_member, + url=url, + pre_announce=pre_announce, + volume_level=volume_level, + pre_announce_url=pre_announce_url, + ) + ) + return + self.logger.info( + "Playback announcement to player %s (with pre-announce: %s): %s", + player.display_name, + pre_announce, + url, + ) + # create a PlayerMedia object for the announcement so + # we can send a regular play-media call downstream + announce_data = AnnounceData( + announcement_url=url, + pre_announce=bool(pre_announce or False), + pre_announce_url=pre_announce_url, + ) + announcement = PlayerMedia( + uri=self.mass.streams.get_announcement_url(player_id, announce_data=announce_data), + media_type=MediaType.ANNOUNCEMENT, + title="Announcement", + custom_data=dict(announce_data), + ) + # handle native announce support + if native_announce_support: + announcement_volume = self.get_announcement_volume(player_id, volume_level) + await player.play_announcement(announcement, announcement_volume) + return + # use fallback/default implementation + await self._play_announcement(player, announcement, volume_level) + finally: + player.extra_data[ATTR_ANNOUNCEMENT_IN_PROGRESS] = False - @handle_player_command + @handle_player_command(lock=True) async def play_media(self, player_id: str, media: PlayerMedia) -> None: """Handle PLAY MEDIA on given player. @@ -869,6 +911,7 @@ class PlayerController(CoreController): await player.play_media(media) @api_command("players/cmd/select_source") + @handle_player_command(lock=True) async def select_source(self, player_id: str, source: str | None) -> None: """ Handle SELECT SOURCE command on given player. @@ -914,6 +957,7 @@ class PlayerController(CoreController): # forward to player await player.select_source(source) + @handle_player_command(lock=True) async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None: """ Handle enqueuing of a next media item on the player. diff --git a/music_assistant/mass.py b/music_assistant/mass.py index 475c9413..584ab43b 100644 --- a/music_assistant/mass.py +++ b/music_assistant/mass.py @@ -499,13 +499,12 @@ class MusicAssistant: self._tracked_timers[task_id] = handle return handle - def get_task(self, task_id: str) -> asyncio.Task[Any]: + def get_task(self, task_id: str) -> asyncio.Task[Any] | None: """Get existing scheduled task.""" if existing := self._tracked_tasks.get(task_id): # prevent duplicate tasks if task_id is given and already present return existing - msg = "Task does not exist" - raise KeyError(msg) + return None def cancel_task(self, task_id: str) -> None: """Cancel existing scheduled task.""" -- 2.34.1