from __future__ import annotations
import asyncio
+import functools
import random
import time
+from collections.abc import Awaitable, Callable, Coroutine
from contextlib import suppress
from types import NoneType
-from typing import TYPE_CHECKING, Any, TypedDict, cast
+from typing import TYPE_CHECKING, Any, Concatenate, TypedDict, cast
import shortuuid
from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
from music_assistant.constants import (
ATTR_ACTIVE_PLAYLIST,
ATTR_ANNOUNCEMENT_IN_PROGRESS,
+ ATTR_PLAY_ACTION_IN_PROGRESS,
MASS_LOGO_ONLINE,
PLAYBACK_REPORT_INTERVAL_SECONDS,
PLAYLIST_MEDIA_TYPES,
CACHE_CATEGORY_PLAYER_QUEUE_ITEMS = 1
+def handle_play_action[PlayerQueuesControllerT: "PlayerQueuesController", **P, R](
+ func: Callable[Concatenate[PlayerQueuesControllerT, P], Awaitable[R]],
+) -> Callable[Concatenate[PlayerQueuesControllerT, P], Coroutine[Any, Any, R]]:
+ """
+ Decorator to mark a play action in progress on the queue.
+
+ Fetches the queue based on queue_id, sets ATTR_PLAY_ACTION_IN_PROGRESS
+ to True before calling the function, and removes it after the function completes.
+
+ :param func: The function to wrap.
+ """ # noqa: D401
+
+ @functools.wraps(func)
+ async def wrapper(self: PlayerQueuesControllerT, *args: P.args, **kwargs: P.kwargs) -> R:
+ """Execute function with play action flag set."""
+ queue_id = kwargs.get("queue_id") or args[0]
+ assert isinstance(queue_id, str) # for type checking
+ queue = self._queues.get(queue_id)
+ if queue is None:
+ # Queue not found, just call the function and let it handle the error
+ return await func(self, *args, **kwargs)
+ try:
+ has_flag = bool(queue.extra_attributes.get(ATTR_PLAY_ACTION_IN_PROGRESS))
+ if not has_flag:
+ queue.extra_attributes[ATTR_PLAY_ACTION_IN_PROGRESS] = True
+ self.signal_update(queue_id)
+ return await func(self, *args, **kwargs)
+ finally:
+ if not has_flag:
+ queue.extra_attributes.pop(ATTR_PLAY_ACTION_IN_PROGRESS, None)
+ self.signal_update(queue_id)
+
+ return wrapper
+
+
class CompareState(TypedDict):
"""Simple object where we store the (previous) state of a queue.
await self.resume(queue_id)
@api_command("player_queues/play_media")
+ @handle_play_action
async def play_media(
self,
queue_id: str,
raise QueueEmpty(msg)
@api_command("player_queues/play_index")
+ @handle_play_action
async def play_index(
self,
queue_id: str,