for operations where we don't want to stop other tasks when one of the tasks fails
)
from music_assistant.server.helpers.api import api_command
from music_assistant.server.helpers.tags import parse_tags
+from music_assistant.server.helpers.util import TaskManager
from music_assistant.server.models.core_controller import CoreController
from music_assistant.server.models.player_provider import PlayerProvider
if not power and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
await self.cmd_stop(player_id)
- async with asyncio.TaskGroup() as tg:
- any_member_powered = False
+ any_member_powered = False
+ async with TaskManager(self.mass) as tg:
for member in self.iter_group_members(group_player, only_powered=True):
any_member_powered = True
if power:
"while one or more individual players are playing. "
"This announcement will be redirected to the individual players."
)
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
for group_member in player.group_childs:
tg.create_task(
self.play_announcement(
# adjust volume if needed
# in case of a (sync) group, we need to do this for all child players
prev_volumes: dict[str, int] = {}
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
for volume_player_id in player.group_childs or (player.player_id,):
if not (volume_player := self.get(volume_player_id)):
continue
"Announcement to player %s - restore previous state...", player.display_name
)
# restore volume
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
for volume_player_id, prev_volume in prev_volumes.items():
tg.create_task(self.cmd_volume_set(volume_player_id, prev_volume))
from music_assistant.common.helpers.util import empty_queue
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.server.helpers.audio import get_ffmpeg_stream
+from music_assistant.server.helpers.util import TaskManager
LOGGER = logging.getLogger(__name__)
*[sub.put(chunk) for sub in self.subscribers], return_exceptions=True
)
# EOF: send empty chunk
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
for sub in list(self.subscribers):
tg.create_task(sub.put(b""))
import urllib.error
import urllib.parse
import urllib.request
+from collections.abc import Coroutine
from functools import lru_cache
from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as pkg_version
-from typing import TYPE_CHECKING
+from types import TracebackType
+from typing import TYPE_CHECKING, Self
import ifaddr
import memory_tempfile
if TYPE_CHECKING:
from collections.abc import Iterator
+ from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderModuleType
LOGGER = logging.getLogger(__name__)
"""Chunk bytes data into smaller chunks."""
for i in range(0, len(data), chunk_size):
yield data[i : i + chunk_size]
+
+
+class TaskManager:
+ """
+ Helper class to run many tasks at once.
+
+ This is basically an alternative to asyncio.TaskGroup but this will not
+ cancel all operations when one of the tasks fails.
+ Logging of exceptions is done by the mass.create_task helper.
+ """
+
+ def __init__(self, mass: MusicAssistant):
+ """Initialize the TaskManager."""
+ self.mass = mass
+ self._tasks: list[asyncio.Task] = []
+
+ def create_task(self, coro: Coroutine) -> None:
+ """Create a new task and add it to the manager."""
+ task = self.mass.create_task(coro)
+ self._tasks.append(task)
+
+ async def __aenter__(self) -> Self:
+ """Enter context manager."""
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> bool | None:
+ """Exit context manager."""
+ await asyncio.wait(self._tasks)
+ self._tasks.clear()
from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
from music_assistant.server.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params
from music_assistant.server.helpers.process import AsyncProcess, check_output
+from music_assistant.server.helpers.util import TaskManager
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
- player_id: player_id of the player to handle the command.
"""
# forward command to player and any connected sync members
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
for airplay_player in self._get_sync_clients(player_id):
if airplay_player.active_stream:
tg.create_task(airplay_player.active_stream.stop())
- player_id: player_id of the player to handle the command.
"""
# forward command to player and any connected sync members
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
for airplay_player in self._get_sync_clients(player_id):
if airplay_player.active_stream and airplay_player.active_stream.running:
# prefer interactive command to our streamer
# should not happen, but just in case
raise RuntimeError("Player is synced")
# always stop existing stream first
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
for airplay_player in self._get_sync_clients(player_id):
if airplay_player.active_stream and airplay_player.active_stream:
tg.create_task(airplay_player.active_stream.stop())
from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.constants import CONF_ENFORCE_MP3, CONF_PLAYERS, VERBOSE_LOG_LEVEL
from music_assistant.server.helpers.didl_lite import create_didl_metadata
+from music_assistant.server.helpers.util import TaskManager
from music_assistant.server.models.player_provider import PlayerProvider
from .helpers import DLNANotifyServer
Called when provider is deregistered (e.g. MA exiting or config reloading).
"""
self.mass.streams.unregister_dynamic_route("/notify", "NOTIFY")
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
for dlna_player in self.dlnaplayers.values():
tg.create_task(self._device_disconnect(dlna_player))
)
from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params
from music_assistant.server.helpers.multi_client_stream import MultiClientStream
+from music_assistant.server.helpers.util import TaskManager
from music_assistant.server.models.player_provider import PlayerProvider
from music_assistant.server.providers.ugp import UniversalGroupProvider
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
# forward command to player and any connected sync members
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
for slimplayer in self._get_sync_clients(player_id):
tg.create_task(slimplayer.stop())
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY command to given player."""
# forward command to player and any connected sync members
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
for slimplayer in self._get_sync_clients(player_id):
tg.create_task(slimplayer.play())
base_url = f"{self.mass.streams.base_url}/slimproto/multi?player_id={player_id}&fmt=flac"
# forward to downstream play_media commands
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
for slimplayer in self._get_sync_clients(player_id):
url = f"{base_url}&child_player_id={slimplayer.player_id}"
if self.mass.config.get_raw_player_config_value(
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player."""
# forward command to player and any connected sync members
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
for slimplayer in self._get_sync_clients(player_id):
tg.create_task(slimplayer.pause())
break
# all child's ready (or timeout) - start play
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
for _client in self._get_sync_clients(player.player_id):
self._sync_playpoints.setdefault(
_client.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS)
from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.constants import CONF_CROSSFADE, SYNCGROUP_PREFIX, VERBOSE_LOG_LEVEL
from music_assistant.server.helpers.didl_lite import create_didl_metadata
+from music_assistant.server.helpers.util import TaskManager
from music_assistant.server.models.player_provider import PlayerProvider
from .player import SonosPlayer
"""Handle (provider native) playback of an announcement on given player."""
if player_id.startswith(SYNCGROUP_PREFIX):
# handle syncgroup, unwrap to all underlying child's
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
if group_player := self.mass.players.get(player_id):
# execute on all child players
for child_player_id in group_player.group_childs:
from __future__ import annotations
-import asyncio
from time import time
from typing import TYPE_CHECKING
from music_assistant.server.controllers.streams import DEFAULT_STREAM_HEADERS
from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params
from music_assistant.server.helpers.multi_client_stream import MultiClientStream
+from music_assistant.server.helpers.util import TaskManager
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
group_player.state = PlayerState.IDLE
self.mass.players.update(player_id)
# forward command to player and any connected sync child's
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
for member in self.mass.players.iter_group_members(group_player, only_powered=True):
if member.state == PlayerState.IDLE:
continue
base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.flac"
# forward to downstream play_media commands
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self.mass) as tg:
for member in self.mass.players.iter_group_members(group_player, only_powered=True):
if member.player_id.startswith(SYNCGROUP_PREFIX):
member = self.mass.players.get_sync_leader(member) # noqa: PLW2901
from music_assistant.server.helpers.api import APICommandHandler, api_command
from music_assistant.server.helpers.images import get_icon_string
from music_assistant.server.helpers.util import (
+ TaskManager,
get_package_version,
is_hass_supervisor,
load_provider_module,
for task in self._tracked_tasks.values():
task.cancel()
# cleanup all providers
- async with asyncio.TaskGroup() as tg:
- for prov_id in list(self._providers.keys()):
- tg.create_task(self.unload_provider(prov_id))
+ await asyncio.gather(
+ *[self.unload_provider(prov_id) for prov_id in list(self._providers.keys())],
+ return_exceptions=True,
+ )
# stop core controllers
await self.streams.close()
await self.webserver.close()
exc_info=exc,
)
- async with asyncio.TaskGroup() as tg:
+ async with TaskManager(self) as tg:
for dir_str in os.listdir(PROVIDERS_PATH):
dir_path = os.path.join(PROVIDERS_PATH, dir_str)
if dir_str == "test" and not ENABLE_DEBUG: