import asyncio
import functools
import time
+from collections.abc import Awaitable, Callable, Coroutine
from contextlib import suppress
from typing import TYPE_CHECKING, Any, Concatenate, TypedDict, cast
from .sync_groups import SyncGroupController, SyncGroupPlayer
if TYPE_CHECKING:
- from collections.abc import Awaitable, Callable, Coroutine, Iterator
+ from collections.abc import Iterator
from music_assistant_models.config_entries import CoreConfig, PlayerConfig
from music_assistant_models.player_queue import PlayerQueue
"""Check and log commands to players."""
@functools.wraps(func)
- async def wrapper(self: PlayerControllerT, *args: P.args, **kwargs: P.kwargs) -> R | None:
+ async def wrapper(self: PlayerControllerT, *args: P.args, **kwargs: P.kwargs) -> None:
"""Log and handle_player_command commands to players."""
- player_id = kwargs["player_id"] if "player_id" in kwargs else args[0]
+ player_id = kwargs.get("player_id") or args[0]
+ assert isinstance(player_id, str) # for type checking
if (player := self._players.get(player_id)) is None or not player.available:
# player not existent
self.logger.warning(
domain: str = "players"
- def __init__(self, *args, **kwargs) -> None:
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize core controller."""
super().__init__(*args, **kwargs)
self._players: dict[str, Player] = {}
"Music Assistant's core controller which manages all players from all providers."
)
self.manifest.icon = "speaker-multiple"
- self._poll_task: asyncio.Task | None = None
+ self._poll_task: asyncio.Task[None] | None = None
self._player_throttlers: dict[str, Throttler] = {}
self._announce_locks: dict[str, asyncio.Lock] = {}
self._sync_groups: SyncGroupController = SyncGroupController(self)
@property
def providers(self) -> list[PlayerProvider]:
"""Return all loaded/running MusicProviders."""
- return self.mass.get_providers(ProviderType.PLAYER) # type: ignore=return-value
+ return cast("list[PlayerProvider]", self.mass.get_providers(ProviderType.PLAYER))
def all(
self,
# we can send a regular play-media call downstream
announce_data = AnnounceData(
announcement_url=url,
- pre_announce=pre_announce,
+ pre_announce=bool(pre_announce or False),
pre_announce_url=pre_announce_url,
)
announcement = PlayerMedia(
),
media_type=MediaType.ANNOUNCEMENT,
title="Announcement",
- custom_data=announce_data,
+ custom_data=dict(announce_data),
)
# handle native announce support
if native_announce_support:
# check if source is a pluginsource
# in that case the source id is the instance_id of the plugin provider
if plugin_prov := self.mass.get_provider(source):
- await self._handle_select_plugin_source(player, plugin_prov)
+ await self._handle_select_plugin_source(player, cast("PluginProvider", plugin_prov))
return
# check if source is a mass queue
# this can be used to restore the queue after a source switch
@api_command("players/create_group_player")
async def create_group_player(
self, provider: str, name: str, members: list[str], dynamic: bool = True
- ):
+ ) -> Player:
"""
Create a new (permanent) Group Player.
return None
volume_level = volume_override
if volume_level is None and volume_strategy == "absolute":
- volume_level = volume_strategy_volume
+ volume_level = int(cast("float", volume_strategy_volume))
elif volume_level is None and volume_strategy == "relative":
- player = self.get(player_id)
- volume_level = player.volume_level + volume_strategy_volume
+ if (player := self.get(player_id)) and player.volume_level is not None:
+ volume_level = int(player.volume_level + cast("float", volume_strategy_volume))
elif volume_level is None and volume_strategy == "percentual":
- player = self.get(player_id)
- percentual = (player.volume_level / 100) * volume_strategy_volume
- volume_level = player.volume_level + percentual
+ if (player := self.get(player_id)) and player.volume_level is not None:
+ percentual = (player.volume_level / 100) * cast("float", volume_strategy_volume)
+ volume_level = int(player.volume_level + percentual)
if volume_level is not None:
- announce_volume_min = self.mass.config.get_raw_player_config_value(
- player_id,
- CONF_ENTRY_ANNOUNCE_VOLUME_MIN.key,
- CONF_ENTRY_ANNOUNCE_VOLUME_MIN.default_value,
+ announce_volume_min = cast(
+ "float",
+ self.mass.config.get_raw_player_config_value(
+ player_id,
+ CONF_ENTRY_ANNOUNCE_VOLUME_MIN.key,
+ CONF_ENTRY_ANNOUNCE_VOLUME_MIN.default_value,
+ ),
)
- volume_level = max(announce_volume_min, volume_level)
- announce_volume_max = self.mass.config.get_raw_player_config_value(
- player_id,
- CONF_ENTRY_ANNOUNCE_VOLUME_MAX.key,
- CONF_ENTRY_ANNOUNCE_VOLUME_MAX.default_value,
+ volume_level = max(int(announce_volume_min), volume_level)
+ announce_volume_max = cast(
+ "float",
+ self.mass.config.get_raw_player_config_value(
+ player_id,
+ CONF_ENTRY_ANNOUNCE_VOLUME_MAX.key,
+ CONF_ENTRY_ANNOUNCE_VOLUME_MAX.default_value,
+ ),
)
- volume_level = min(announce_volume_max, volume_level)
- # ensure the result is an integer
+ volume_level = min(int(announce_volume_max), volume_level)
return None if volume_level is None else int(volume_level)
def iter_group_members(
await self.wait_for_state(player, PlaybackState.PLAYING, 10, minimal_time=0.1)
# wait for the player to stop playing
if not announcement.duration:
+ if not announcement.custom_data:
+ raise ValueError("Announcement missing duration and custom_data")
media_info = await async_parse_tags(
announcement.custom_data["announcement_url"], require_duration=True
)
- announcement.duration = media_info.duration
+ announcement.duration = int(media_info.duration) if media_info.duration else None
+
+ if announcement.duration is None:
+ raise ValueError("Announcement duration could not be determined")
+
await self.wait_for_state(
player,
PlaybackState.IDLE,
timeout=announcement.duration + 6,
- minimal_time=announcement.duration,
+ minimal_time=float(announcement.duration),
)
self.logger.debug(
"Announcement to player %s - restore previous state...", player.display_name
self.mass.loop.call_soon(
self.mass.player_queues.on_player_update,
player,
- {"corrected_elapsed_time": player.corrected_elapsed_time},
+ {"corrected_elapsed_time": (None, player.corrected_elapsed_time)},
)
# Poll player;
if not player.needs_poll: