from __future__ import annotations
import asyncio
+import functools
import logging
-from collections.abc import Iterator
-from typing import TYPE_CHECKING, cast
+from collections.abc import Awaitable, Callable, Coroutine, Iterator
+from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, cast
from music_assistant.common.helpers.util import get_changed_values
from music_assistant.common.models.enums import (
LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.players")
+_PlayerControllerT = TypeVar("_PlayerControllerT", bound="PlayerController")
+_R = TypeVar("_R")
+_P = ParamSpec("_P")
+
+
+def debounce(
+ func: Callable[Concatenate[_PlayerControllerT, _P], Awaitable[_R]]
+) -> Callable[Concatenate[_PlayerControllerT, _P], Coroutine[Any, Any, _R | None]]:
+ """Log and debounce commands to players."""
+
+ @functools.wraps(func)
+ async def wrapper(self: _PlayerControllerT, *args: _P.args, **kwargs: _P.kwargs) -> _R | None:
+ """Log and debounce commands to players."""
+ player_id = kwargs["player_id"] if "player_id" in kwargs else args[0]
+ if (player := self._players.get(player_id)) is None or not player.available:
+ # player not existent
+ self.logger.debug(
+ "Ignoring command %s for unavailable player %s",
+ func.__name__,
+ player_id,
+ )
+ return
+ debounce_key = f"{player_id}.func.__name__"
+ # cancel any existing command to this player
+ existing_timer = self._cmd_debounce.pop(debounce_key, None)
+ if existing_timer and not existing_timer.cancelled():
+ existing_timer.cancel()
+
+ self.logger.debug(
+ "Handling command %s for player %s",
+ func.__name__,
+ player.display_name,
+ )
+
+ def run():
+ self.mass.create_task(func(self, *args, **kwargs))
+
+ # debounce command with 250ms
+ self._cmd_debounce[debounce_key] = self.mass.loop.call_later(0.25, run)
+
+ return wrapper
+
class PlayerController(CoreController):
"""Controller holding all logic to control registered players."""
"Music Assistant's core controller which manages all players from all providers."
)
self.manifest.icon = "speaker-multiple"
+ self._cmd_debounce: dict[str, asyncio.TimerHandle] = {}
self._poll_task: asyncio.Task | None = None
async def setup(self, config: CoreConfig) -> None: # noqa: ARG002
# Player commands
@api_command("players/cmd/stop")
+ @debounce
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player.
- player_id: player_id of the player to handle the command.
"""
- self.logger.debug("Processing STOP command for player %s", player_id)
player_id = self._check_redirect(player_id)
if player_provider := self.get_player_provider(player_id):
await player_provider.cmd_stop(player_id)
@api_command("players/cmd/play")
+ @debounce
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY (unpause) command to given player.
- player_id: player_id of the player to handle the command.
"""
- self.logger.debug("Processing PLAY command for player %s", player_id)
player_id = self._check_redirect(player_id)
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_play(player_id)
@api_command("players/cmd/pause")
+ @debounce
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player.
- player_id: player_id of the player to handle the command.
"""
- self.logger.debug("Processing PAUSE command for player %s", player_id)
player_id = self._check_redirect(player_id)
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_pause(player_id)
self.mass.create_task(_watch_pause(player_id))
@api_command("players/cmd/play_pause")
+ @debounce
async def cmd_play_pause(self, player_id: str) -> None:
"""Toggle play/pause on given player.
await self.cmd_play(player_id)
@api_command("players/cmd/power")
+ @debounce
async def cmd_power(self, player_id: str, powered: bool) -> None:
"""Send POWER command to given player.
- player_id: player_id of the player to handle the command.
- powered: bool if player should be powered on or off.
"""
- self.logger.debug(
- "Processing POWER %s command for player %s", "ON" if powered else "OFF", player_id
- )
# TODO: Implement PlayerControl
player = self.get(player_id, True)
await self.mass.player_queues.resume(player_id)
@api_command("players/cmd/volume_set")
+ @debounce
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player.
- player_id: player_id of the player to handle the command.
- volume_level: volume level (0..100) to set on the player.
"""
- self.logger.debug("Processing VOLUME_SET command for player %s", player_id)
# TODO: Implement PlayerControl
player = self.get(player_id, True)
if player.type == PlayerType.GROUP:
await player_provider.cmd_volume_set(player_id, volume_level)
@api_command("players/cmd/volume_up")
+ @debounce
async def cmd_volume_up(self, player_id: str) -> None:
"""Send VOLUME_UP command to given player.
await self.cmd_volume_set(player_id, new_volume)
@api_command("players/cmd/volume_down")
+ @debounce
async def cmd_volume_down(self, player_id: str) -> None:
"""Send VOLUME_DOWN command to given player.
await self.cmd_volume_set(player_id, new_volume)
@api_command("players/cmd/group_volume")
+ @debounce
async def cmd_group_volume(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given playergroup.
await asyncio.gather(*coros)
@api_command("players/cmd/volume_mute")
+ @debounce
async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
"""Send VOLUME_MUTE command to given player.
- player_id: player_id of the player to handle the command.
- muted: bool if player should be muted.
"""
- self.logger.debug("Processing VOLUME_MUTE command for player %s", player_id)
player = self.get(player_id, True)
assert player
if PlayerFeature.VOLUME_MUTE not in player.supported_features:
await player_provider.cmd_volume_mute(player_id, muted)
@api_command("players/cmd/sync")
+ @debounce
async def cmd_sync(self, player_id: str, target_player: str) -> None:
"""Handle SYNC command for given player.
- player_id: player_id of the player to handle the command.
- target_player: player_id of the syncgroup master or group player.
"""
- self.logger.debug("Processing SYNC command for player %s", player_id)
child_player = self.get(player_id, True)
parent_player = self.get(target_player, True)
assert child_player
await player_provider.cmd_sync(player_id, target_player)
@api_command("players/cmd/unsync")
+ @debounce
async def cmd_unsync(self, player_id: str) -> None:
"""Handle UNSYNC command for given player.
- player_id: player_id of the player to handle the command.
"""
- self.logger.debug("Processing UNSYNC command for player %s", player_id)
player = self.get(player_id, True)
if PlayerFeature.SYNC not in player.supported_features:
raise UnsupportedFeaturedException(f"Player {player.name} does not support syncing")