import time
from collections.abc import Awaitable, Callable, Coroutine
from contextlib import suppress
-from typing import TYPE_CHECKING, Any, Concatenate, TypedDict, cast
+from typing import TYPE_CHECKING, Any, Concatenate, TypedDict, cast, overload
from music_assistant_models.constants import (
PLAYER_CONTROL_FAKE,
pre_announce_url: str
+@overload
def handle_player_command[PlayerControllerT: "PlayerController", **P, R](
func: Callable[Concatenate[PlayerControllerT, P], Awaitable[R]],
-) -> Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]]:
- """Check and log commands to players."""
-
- @functools.wraps(func)
- async def wrapper(self: PlayerControllerT, *args: P.args, **kwargs: P.kwargs) -> None:
- """Log and handle_player_command commands to players."""
- player_id = kwargs.get("player_id") or args[0]
- assert isinstance(player_id, str) # for type checking
- if (player := self._players.get(player_id)) is None or not player.available:
- # player not existent
- self.logger.warning(
- "Ignoring command %s for unavailable player %s",
- func.__name__,
- player_id,
+) -> Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]]: ...
+
+
+@overload
+def handle_player_command[PlayerControllerT: "PlayerController", **P, R](
+ func: None = None,
+ *,
+ lock: bool = False,
+) -> Callable[
+ [Callable[Concatenate[PlayerControllerT, P], Awaitable[R]]],
+ Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]],
+]: ...
+
+
+def handle_player_command[PlayerControllerT: "PlayerController", **P, R](
+ func: Callable[Concatenate[PlayerControllerT, P], Awaitable[R]] | None = None,
+ *,
+ lock: bool = False,
+) -> (
+ Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]]
+ | Callable[
+ [Callable[Concatenate[PlayerControllerT, P], Awaitable[R]]],
+ Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]],
+ ]
+):
+ """Check and log commands to players.
+
+ :param func: The function to wrap (when used without parentheses).
+ :param lock: If True, acquire a lock per player_id and function name before executing.
+ """
+
+ def decorator(
+ fn: Callable[Concatenate[PlayerControllerT, P], Awaitable[R]],
+ ) -> Callable[Concatenate[PlayerControllerT, P], Coroutine[Any, Any, R | None]]:
+ @functools.wraps(fn)
+ async def wrapper(self: PlayerControllerT, *args: P.args, **kwargs: P.kwargs) -> None:
+ """Log and handle_player_command commands to players."""
+ player_id = kwargs.get("player_id") or args[0]
+ assert isinstance(player_id, str) # for type checking
+ if (player := self._players.get(player_id)) is None or not player.available:
+ # player not existent
+ self.logger.warning(
+ "Ignoring command %s for unavailable player %s",
+ fn.__name__,
+ player_id,
+ )
+ return
+
+ current_user = get_current_user()
+ if (
+ current_user
+ and current_user.player_filter
+ and player.player_id not in current_user.player_filter
+ ):
+ msg = (
+ f"{current_user.username} does not have access to player {player.display_name}"
+ )
+ raise InsufficientPermissions(msg)
+
+ self.logger.debug(
+ "Handling command %s for player %s (%s)",
+ fn.__name__,
+ player.display_name,
+ f"by user {current_user.username}" if current_user else "unauthenticated",
)
- return
- current_user = get_current_user()
- if (
- current_user
- and current_user.player_filter
- and player.player_id not in current_user.player_filter
- ):
- msg = f"{current_user.username} does not have access to player {player.display_name}"
- raise InsufficientPermissions(msg)
+ async def execute() -> None:
+ try:
+ await fn(self, *args, **kwargs)
+ except Exception as err:
+ raise PlayerCommandFailed(str(err)) from err
+
+ if lock:
+ # Acquire a lock specific to player_id and function name
+ lock_key = f"{fn.__name__}_{player_id}"
+ if lock_key not in self._player_command_locks:
+ self._player_command_locks[lock_key] = asyncio.Lock()
+ async with self._player_command_locks[lock_key]:
+ await execute()
+ else:
+ await execute()
- self.logger.debug(
- "Handling command %s for player %s (%s)",
- func.__name__,
- player.display_name,
- f"by user {current_user.username}" if current_user else "unauthenticated",
- )
- try:
- await func(self, *args, **kwargs)
- except Exception as err:
- raise PlayerCommandFailed(str(err)) from err
+ return wrapper
- return wrapper
+ # Support both @handle_player_command and @handle_player_command(lock=True)
+ if func is not None:
+ return decorator(func)
+ return decorator
class PlayerController(CoreController):
self.manifest.icon = "speaker-multiple"
self._poll_task: asyncio.Task[None] | None = None
self._player_throttlers: dict[str, Throttler] = {}
- self._announce_locks: dict[str, asyncio.Lock] = {}
+ self._player_command_locks: dict[str, asyncio.Lock] = {}
self._sync_groups: SyncGroupController = SyncGroupController(self)
async def setup(self, config: CoreConfig) -> None:
await player.stop()
@api_command("players/cmd/play")
- @handle_player_command
+ @handle_player_command(lock=True)
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY (unpause) command to given player.
await self._handle_cmd_resume(player.player_id)
@api_command("players/cmd/pause")
- @handle_player_command
+ @handle_player_command(lock=True)
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player.
raise UnsupportedFeaturedException(msg)
@api_command("players/cmd/power")
- @handle_player_command
+ @handle_player_command(lock=True)
async def cmd_power(self, player_id: str, powered: bool) -> None:
"""Send POWER command to given player.
await self._handle_cmd_power(player_id, powered)
@api_command("players/cmd/volume_set")
- @handle_player_command
+ @handle_player_command(lock=True)
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player.
await self.cmd_volume_set(player_id, new_volume)
@api_command("players/cmd/group_volume")
- @handle_player_command
+ @handle_player_command(lock=True)
async def cmd_group_volume(
self,
player_id: str,
await player_control.mute_set(muted)
@api_command("players/cmd/play_announcement")
+ @handle_player_command(lock=True)
async def play_announcement(
self,
player_id: str,
and not validate_announcement_chime_url(pre_announce_url)
):
raise PlayerCommandFailed("Invalid pre-announce chime URL specified.")
- # prevent multiple announcements at the same time to the same player with a lock
- if player_id not in self._announce_locks:
- self._announce_locks[player_id] = lock = asyncio.Lock()
- else:
- lock = self._announce_locks[player_id]
- async with lock:
- try:
- # mark announcement_in_progress on player
- player.extra_data[ATTR_ANNOUNCEMENT_IN_PROGRESS] = True
- # determine if the player has native announcements support
- native_announce_support = (
- PlayerFeature.PLAY_ANNOUNCEMENT in player.supported_features
+ try:
+ # mark announcement_in_progress on player
+ player.extra_data[ATTR_ANNOUNCEMENT_IN_PROGRESS] = True
+ # determine if the player has native announcements support
+ native_announce_support = PlayerFeature.PLAY_ANNOUNCEMENT in player.supported_features
+ # determine pre-announce from (group)player config
+ if pre_announce is None and "tts" in url:
+ conf_pre_announce = self.mass.config.get_raw_player_config_value(
+ player_id,
+ CONF_ENTRY_TTS_PRE_ANNOUNCE.key,
+ CONF_ENTRY_TTS_PRE_ANNOUNCE.default_value,
)
- # determine pre-announce from (group)player config
- if pre_announce is None and "tts" in url:
- conf_pre_announce = self.mass.config.get_raw_player_config_value(
- player_id,
- CONF_ENTRY_TTS_PRE_ANNOUNCE.key,
- CONF_ENTRY_TTS_PRE_ANNOUNCE.default_value,
- )
- pre_announce = cast("bool", conf_pre_announce)
- if pre_announce_url is None:
- if conf_pre_announce_url := self.mass.config.get_raw_player_config_value(
- player_id,
- CONF_PRE_ANNOUNCE_CHIME_URL,
- ):
- # player default custom chime url
- pre_announce_url = cast("str", conf_pre_announce_url)
- else:
- # use global default chime url
- pre_announce_url = ANNOUNCE_ALERT_FILE
- # if player type is group with all members supporting announcements,
- # we forward the request to each individual player
- if player.type == PlayerType.GROUP and (
- all(
- PlayerFeature.PLAY_ANNOUNCEMENT in x.supported_features
- for x in self.iter_group_members(player)
- )
+ pre_announce = cast("bool", conf_pre_announce)
+ if pre_announce_url is None:
+ if conf_pre_announce_url := self.mass.config.get_raw_player_config_value(
+ player_id,
+ CONF_PRE_ANNOUNCE_CHIME_URL,
):
- # forward the request to each individual player
- async with TaskManager(self.mass) as tg:
- for group_member in player.group_members:
- tg.create_task(
- self.play_announcement(
- group_member,
- url=url,
- pre_announce=pre_announce,
- volume_level=volume_level,
- pre_announce_url=pre_announce_url,
- )
- )
- return
- self.logger.info(
- "Playback announcement to player %s (with pre-announce: %s): %s",
- player.display_name,
- pre_announce,
- url,
- )
- # create a PlayerMedia object for the announcement so
- # we can send a regular play-media call downstream
- announce_data = AnnounceData(
- announcement_url=url,
- pre_announce=bool(pre_announce or False),
- pre_announce_url=pre_announce_url,
- )
- announcement = PlayerMedia(
- uri=self.mass.streams.get_announcement_url(
- player_id, announce_data=announce_data
- ),
- media_type=MediaType.ANNOUNCEMENT,
- title="Announcement",
- custom_data=dict(announce_data),
+ # player default custom chime url
+ pre_announce_url = cast("str", conf_pre_announce_url)
+ else:
+ # use global default chime url
+ pre_announce_url = ANNOUNCE_ALERT_FILE
+ # if player type is group with all members supporting announcements,
+ # we forward the request to each individual player
+ if player.type == PlayerType.GROUP and (
+ all(
+ PlayerFeature.PLAY_ANNOUNCEMENT in x.supported_features
+ for x in self.iter_group_members(player)
)
- # handle native announce support
- if native_announce_support:
- announcement_volume = self.get_announcement_volume(player_id, volume_level)
- await player.play_announcement(announcement, announcement_volume)
- return
- # use fallback/default implementation
- await self._play_announcement(player, announcement, volume_level)
- finally:
- player.extra_data[ATTR_ANNOUNCEMENT_IN_PROGRESS] = False
+ ):
+ # forward the request to each individual player
+ async with TaskManager(self.mass) as tg:
+ for group_member in player.group_members:
+ tg.create_task(
+ self.play_announcement(
+ group_member,
+ url=url,
+ pre_announce=pre_announce,
+ volume_level=volume_level,
+ pre_announce_url=pre_announce_url,
+ )
+ )
+ return
+ self.logger.info(
+ "Playback announcement to player %s (with pre-announce: %s): %s",
+ player.display_name,
+ pre_announce,
+ url,
+ )
+ # create a PlayerMedia object for the announcement so
+ # we can send a regular play-media call downstream
+ announce_data = AnnounceData(
+ announcement_url=url,
+ pre_announce=bool(pre_announce or False),
+ pre_announce_url=pre_announce_url,
+ )
+ announcement = PlayerMedia(
+ uri=self.mass.streams.get_announcement_url(player_id, announce_data=announce_data),
+ media_type=MediaType.ANNOUNCEMENT,
+ title="Announcement",
+ custom_data=dict(announce_data),
+ )
+ # handle native announce support
+ if native_announce_support:
+ announcement_volume = self.get_announcement_volume(player_id, volume_level)
+ await player.play_announcement(announcement, announcement_volume)
+ return
+ # use fallback/default implementation
+ await self._play_announcement(player, announcement, volume_level)
+ finally:
+ player.extra_data[ATTR_ANNOUNCEMENT_IN_PROGRESS] = False
- @handle_player_command
+ @handle_player_command(lock=True)
async def play_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA on given player.
await player.play_media(media)
@api_command("players/cmd/select_source")
+ @handle_player_command(lock=True)
async def select_source(self, player_id: str, source: str | None) -> None:
"""
Handle SELECT SOURCE command on given player.
# forward to player
await player.select_source(source)
+ @handle_player_command(lock=True)
async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
"""
Handle enqueuing of a next media item on the player.