From 0a3d8d255169319b780200906036c19de2165c44 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Fri, 17 Jan 2025 23:35:26 +0100 Subject: [PATCH] Chore: prevent lingering tasks Always use mass helper to create tasks so they are tracked and logged --- music_assistant/controllers/streams.py | 2 +- music_assistant/controllers/webserver.py | 6 +-- music_assistant/helpers/ffmpeg.py | 8 ++-- music_assistant/helpers/webserver.py | 5 +++ music_assistant/mass.py | 43 ++++++++++++++++--- music_assistant/providers/airplay/raop.py | 15 ++++++- .../providers/player_group/ugp_stream.py | 3 ++ .../providers/siriusxm/__init__.py | 2 +- .../providers/snapcast/__init__.py | 4 +- .../providers/sonos_s1/__init__.py | 2 +- music_assistant/providers/sonos_s1/player.py | 6 +-- music_assistant/providers/spotify/__init__.py | 2 +- .../providers/spotify_connect/__init__.py | 2 +- 13 files changed, 76 insertions(+), 24 deletions(-) diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index 97c4a737..efe075ec 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -97,7 +97,7 @@ class StreamsController(CoreController): def __init__(self, *args, **kwargs) -> None: """Initialize instance.""" super().__init__(*args, **kwargs) - self._server = Webserver(self.logger, enable_dynamic_routes=True) + self._server = Webserver(self.mass, self.logger, enable_dynamic_routes=True) self.register_dynamic_route = self._server.register_dynamic_route self.unregister_dynamic_route = self._server.unregister_dynamic_route self.manifest.name = "Streamserver" diff --git a/music_assistant/controllers/webserver.py b/music_assistant/controllers/webserver.py index 29cfe9d0..77b60675 100644 --- a/music_assistant/controllers/webserver.py +++ b/music_assistant/controllers/webserver.py @@ -56,7 +56,7 @@ class WebserverController(CoreController): def __init__(self, *args, **kwargs) -> None: """Initialize instance.""" super().__init__(*args, **kwargs) - self._server = Webserver(self.logger, enable_dynamic_routes=False) + self._server = Webserver(self.mass, self.logger, enable_dynamic_routes=False) self.clients: set[WebsocketClientHandler] = set() self.manifest.name = "Web Server (frontend and api)" self.manifest.description = ( @@ -260,7 +260,7 @@ class WebsocketClientHandler: self._logger.log(VERBOSE_LOG_LEVEL, "Connection from %s", request.remote) self._handle_task = asyncio.current_task() - self._writer_task = asyncio.create_task(self._writer()) + self._writer_task = self.mass.create_task(self._writer()) # send server(version) info when client connects self._send_message(self.mass.get_server_info()) @@ -340,7 +340,7 @@ class WebsocketClientHandler: return # schedule task to handle the command - asyncio.create_task(self._run_handler(handler, msg)) + self.mass.create_task(self._run_handler(handler, msg)) async def _run_handler(self, handler: APICommandHandler, msg: CommandMessage) -> None: try: diff --git a/music_assistant/helpers/ffmpeg.py b/music_assistant/helpers/ffmpeg.py index b3913008..6402f530 100644 --- a/music_assistant/helpers/ffmpeg.py +++ b/music_assistant/helpers/ffmpeg.py @@ -7,6 +7,7 @@ import logging import time from collections import deque from collections.abc import AsyncGenerator +from contextlib import suppress from typing import TYPE_CHECKING, Final from music_assistant_models.enums import ContentType @@ -91,6 +92,8 @@ class FFMpeg(AsyncProcess): return if self._stdin_task and not self._stdin_task.done(): self._stdin_task.cancel() + with suppress(asyncio.CancelledError): + await self._stdin_task await super().close(send_signal) async def _log_reader_task(self) -> None: @@ -147,13 +150,12 @@ class FFMpeg(AsyncProcess): generator_exhausted = True except Exception as err: cancelled = isinstance(err, asyncio.CancelledError) - if cancelled: - raise self.logger.error( "Stream error: %s", str(err) or err.__class__.__name__, - exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None, + exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None, ) + raise finally: if not cancelled: await self.write_eof() diff --git a/music_assistant/helpers/webserver.py b/music_assistant/helpers/webserver.py index 64a28c31..fbdc6e4e 100644 --- a/music_assistant/helpers/webserver.py +++ b/music_assistant/helpers/webserver.py @@ -11,6 +11,8 @@ if TYPE_CHECKING: import logging from collections.abc import Awaitable, Callable + from music_assistant.mass import MusicAssistant + MAX_CLIENT_SIZE: Final = 1024**2 * 16 MAX_LINE_SIZE: Final = 24570 @@ -20,10 +22,12 @@ class Webserver: def __init__( self, + mass: MusicAssistant, logger: logging.Logger, enable_dynamic_routes: bool = False, ) -> None: """Initialize instance.""" + self.mass = mass self.logger = logger # the below gets initialized in async setup self._apprunner: web.AppRunner | None = None @@ -52,6 +56,7 @@ class Webserver: "max_line_size": MAX_LINE_SIZE, "max_field_size": MAX_LINE_SIZE, }, + loop=self.mass.loop, ) self.logger.info("Starting server on %s:%s - base url: %s", bind_ip, bind_port, base_url) self._apprunner = web.AppRunner(self._webapp, access_log=None, shutdown_timeout=10) diff --git a/music_assistant/mass.py b/music_assistant/mass.py index 1e1b900e..d0295d62 100644 --- a/music_assistant/mass.py +++ b/music_assistant/mass.py @@ -5,8 +5,9 @@ from __future__ import annotations import asyncio import logging import os +import threading from collections.abc import Awaitable, Callable, Coroutine -from typing import TYPE_CHECKING, Any, Self, TypeGuard, TypeVar +from typing import TYPE_CHECKING, Any, Self, TypeGuard, TypeVar, cast from uuid import uuid4 import aiofiles @@ -65,7 +66,7 @@ rmfile = wrap(os.remove) listdir = wrap(os.listdir) rename = wrap(os.rename) -EventCallBackType = Callable[[MassEvent], None] +EventCallBackType = Callable[[MassEvent], None] | Coroutine[MassEvent, Any, None] EventSubscriptionType = tuple[ EventCallBackType, tuple[EventType, ...] | None, tuple[str, ...] | None ] @@ -289,6 +290,11 @@ class MusicAssistant: if self.closing: return + if ENABLE_DEBUG and not isinstance(threading.current_thread(), threading._MainThread): # type: ignore[attr-defined] + raise RuntimeError( + "Non-Async operation detected: This method may only be called from the eventloop." + ) + if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL): # do not log queue time updated events because that is too chatty LOGGER.getChild("event").log(VERBOSE_LOG_LEVEL, "%s %s", event.value, object_id or "") @@ -300,8 +306,12 @@ class MusicAssistant: if not (id_filter is None or object_id in id_filter): continue if asyncio.iscoroutinefunction(cb_func): - asyncio.run_coroutine_threadsafe(cb_func(event_obj), self.loop) + if TYPE_CHECKING: + cb_func = cast(Coroutine[Any, Any, None], cb_func) + self.create_task(cb_func, event_obj) else: + if TYPE_CHECKING: + cb_func = cast(Callable[[MassEvent], None], cb_func) self.loop.call_soon_threadsafe(cb_func, event_obj) def subscribe( @@ -331,12 +341,12 @@ class MusicAssistant: def create_task( self, - target: Coroutine[Any, Any, _R] | Awaitable[_R] | Callable[..., _R], + target: Coroutine[Any, Any, _R] | Awaitable[_R], *args: Any, task_id: str | None = None, abort_existing: bool = False, **kwargs: Any, - ) -> asyncio.Task[_R] | asyncio.Future[_R]: + ) -> asyncio.Task[_R]: """Create Task on (main) event loop from Coroutine(function). Tasks created by this helper will be properly cancelled on stop. @@ -347,6 +357,11 @@ class MusicAssistant: existing.cancel() else: return existing + if ENABLE_DEBUG and not isinstance(threading.current_thread(), threading._MainThread): # type: ignore[attr-defined] + raise RuntimeError( + "Non-Async operation detected: This method may only be called from the eventloop." + ) + if asyncio.iscoroutinefunction(target): # coroutine function task = self.loop.create_task(target(*args, **kwargs)) @@ -354,7 +369,7 @@ class MusicAssistant: # coroutine task = self.loop.create_task(target) elif callable(target): - task = self.loop.create_task(asyncio.to_thread(target, *args, **kwargs)) + raise RuntimeError("Function is not a coroutine or coroutine function") else: raise RuntimeError("Target is missing") @@ -401,11 +416,25 @@ class MusicAssistant: if existing := self._tracked_timers.get(task_id): existing.cancel() + if ENABLE_DEBUG and not isinstance(threading.current_thread(), threading._MainThread): # type: ignore[attr-defined] + raise RuntimeError( + "Non-Async operation detected: This method may only be called from the eventloop." + ) + def _create_task() -> None: self._tracked_timers.pop(task_id) + if TYPE_CHECKING: + target = cast(Coroutine[Any, Any, _R], target) # noqa: F823 self.create_task(target, *args, task_id=task_id, abort_existing=True, **kwargs) - handle = self.loop.call_later(delay, _create_task) + if asyncio.iscoroutinefunction(target) or asyncio.iscoroutine(target): + # coroutine function + handle = self.loop.call_later(delay, _create_task) + else: + # regular callable + if TYPE_CHECKING: + target = cast(Callable[..., _R], target) + handle = self.loop.call_later(delay, target, *args) self._tracked_timers[task_id] = handle return handle diff --git a/music_assistant/providers/airplay/raop.py b/music_assistant/providers/airplay/raop.py index b717e471..93185373 100644 --- a/music_assistant/providers/airplay/raop.py +++ b/music_assistant/providers/airplay/raop.py @@ -89,6 +89,13 @@ class RaopStreamSession: *[x.raop_stream.write_eof() for x in self._sync_clients if x.raop_stream], return_exceptions=True, ) + except Exception as err: + logger = self.prov.logger + logger.error( + "Stream error: %s", + str(err) or err.__class__.__name__, + exc_info=err if logger.isEnabledFor(logging.DEBUG) else None, + ) finally: if not generator_exhausted: await close_async_generator(self._audio_source) @@ -114,8 +121,10 @@ class RaopStreamSession: if self._stopped: return self._stopped = True - if self._audio_source_task and not self._audio_source_task.done(): + if self._audio_source_task: self._audio_source_task.cancel() + with suppress(asyncio.CancelledError): + await self._audio_source_task await asyncio.gather( *[self.remove_client(x) for x in self._sync_clients], return_exceptions=True, @@ -269,6 +278,10 @@ class RaopStream: await self._cliraop_proc.close(True) if self._ffmpeg_proc and not self._ffmpeg_proc.closed: await self._ffmpeg_proc.close(True) + if self._log_reader_task: + self._log_reader_task.cancel() + with suppress(asyncio.CancelledError): + await self._log_reader_task self._cliraop_proc = None self._ffmpeg_proc = None diff --git a/music_assistant/providers/player_group/ugp_stream.py b/music_assistant/providers/player_group/ugp_stream.py index 5b13815b..c8e02601 100644 --- a/music_assistant/providers/player_group/ugp_stream.py +++ b/music_assistant/providers/player_group/ugp_stream.py @@ -9,6 +9,7 @@ from __future__ import annotations import asyncio from collections.abc import AsyncGenerator, Awaitable, Callable +from contextlib import suppress from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -53,6 +54,8 @@ class UGPStream: return if self._task and not self._task.done(): self._task.cancel() + with suppress(asyncio.CancelledError): + await self._task self._done.set() async def subscribe_raw(self) -> AsyncGenerator[bytes, None]: diff --git a/music_assistant/providers/siriusxm/__init__.py b/music_assistant/providers/siriusxm/__init__.py index 74fa2baa..b4016526 100644 --- a/music_assistant/providers/siriusxm/__init__.py +++ b/music_assistant/providers/siriusxm/__init__.py @@ -162,7 +162,7 @@ class SiriusXMProvider(MusicProvider): self._base_url = f"{bind_ip}:{bind_port}" http_handler = sxm.http.make_http_handler(self._client) - self._sxm_server = Webserver(self.logger) + self._sxm_server = Webserver(self.mass, self.logger) await self._sxm_server.setup( bind_ip=bind_ip, diff --git a/music_assistant/providers/snapcast/__init__.py b/music_assistant/providers/snapcast/__init__.py index 916f8f4f..53d8a04e 100644 --- a/music_assistant/providers/snapcast/__init__.py +++ b/music_assistant/providers/snapcast/__init__.py @@ -565,7 +565,7 @@ class SnapCastProvider(PlayerProvider): await self._delete_current_snapstream(stream, media) # start streaming the queue (pcm) audio in a background task - self._stream_tasks[player_id] = asyncio.create_task(_streamer()) + self._stream_tasks[player_id] = self.mass.create_task(_streamer()) async def _delete_current_snapstream(self, stream: Snapstream, media: PlayerMedia) -> None: with suppress(TypeError, KeyError, AttributeError): @@ -727,7 +727,7 @@ class SnapCastProvider(PlayerProvider): """Start the built-in Snapserver.""" if self._use_builtin_server: self._snapserver_started = asyncio.Event() - self._snapserver_runner = asyncio.create_task(self._builtin_server_runner()) + self._snapserver_runner = self.mass.create_task(self._builtin_server_runner()) await asyncio.wait_for(self._snapserver_started.wait(), 10) def _handle_disconnect(self, exc: Exception) -> None: diff --git a/music_assistant/providers/sonos_s1/__init__.py b/music_assistant/providers/sonos_s1/__init__.py index 3fcfe3f2..28cc44c7 100644 --- a/music_assistant/providers/sonos_s1/__init__.py +++ b/music_assistant/providers/sonos_s1/__init__.py @@ -403,7 +403,7 @@ class SonosPlayerProvider(PlayerProvider): finally: self._discovery_running = False - await self.mass.create_task(do_discover) + await asyncio.to_thread(do_discover) def reschedule() -> None: self._discovery_reschedule_timer = None diff --git a/music_assistant/providers/sonos_s1/player.py b/music_assistant/providers/sonos_s1/player.py index 339abdfd..bb91bb53 100644 --- a/music_assistant/providers/sonos_s1/player.py +++ b/music_assistant/providers/sonos_s1/player.py @@ -301,13 +301,13 @@ class SonosPlayer: ) -> None: """Sync given players/speakers with this player.""" async with self.sonos_prov.topology_condition: - group: list[SonosPlayer] = await self.mass.create_task(self._join, members) + group: list[SonosPlayer] = await asyncio.to_thread(self._join, members) await self.wait_for_groups([group]) async def unjoin(self) -> None: """Unjoin player from all/any groups.""" async with self.sonos_prov.topology_condition: - await self.mass.create_task(self._unjoin) + await asyncio.to_thread(self._unjoin) await self.wait_for_groups([[self]]) def update_player(self, signal_update: bool = True) -> None: @@ -530,7 +530,7 @@ class SonosPlayer: if group: assert isinstance(group, str) return group.split(",") - return await self.mass.create_task(_get_soco_group) + return await asyncio.to_thread(_get_soco_group) def _regroup(group: list[str]) -> None: """Rebuild internal group layout (async safe).""" diff --git a/music_assistant/providers/spotify/__init__.py b/music_assistant/providers/spotify/__init__.py index 1c99a281..d7016769 100644 --- a/music_assistant/providers/spotify/__init__.py +++ b/music_assistant/providers/spotify/__init__.py @@ -606,7 +606,7 @@ class SpotifyProvider(MusicProvider): logger.log(VERBOSE_LOG_LEVEL, line) if stderr: - log_reader = asyncio.create_task(_read_stderr()) + log_reader = self.mass.create_task(_read_stderr()) async for chunk in librespot_proc.iter_any(chunk_size): yield chunk diff --git a/music_assistant/providers/spotify_connect/__init__.py b/music_assistant/providers/spotify_connect/__init__.py index 6b1556e0..46dc859b 100644 --- a/music_assistant/providers/spotify_connect/__init__.py +++ b/music_assistant/providers/spotify_connect/__init__.py @@ -301,7 +301,7 @@ class SpotifyConnectProvider(MusicProvider): def _setup_player_daemon(self) -> None: """Handle setup of the spotify connect daemon for a player.""" self._librespot_started.clear() - self._runner_task = asyncio.create_task(self._librespot_runner()) + self._runner_task = self.mass.create_task(self._librespot_runner()) def _on_mass_player_event(self, event: MassEvent) -> None: """Handle incoming event from linked airplay player.""" -- 2.34.1