Chore: prevent lingering tasks
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 17 Jan 2025 22:35:26 +0000 (23:35 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 17 Jan 2025 22:35:26 +0000 (23:35 +0100)
Always use mass helper to create tasks so they are tracked and logged

13 files changed:
music_assistant/controllers/streams.py
music_assistant/controllers/webserver.py
music_assistant/helpers/ffmpeg.py
music_assistant/helpers/webserver.py
music_assistant/mass.py
music_assistant/providers/airplay/raop.py
music_assistant/providers/player_group/ugp_stream.py
music_assistant/providers/siriusxm/__init__.py
music_assistant/providers/snapcast/__init__.py
music_assistant/providers/sonos_s1/__init__.py
music_assistant/providers/sonos_s1/player.py
music_assistant/providers/spotify/__init__.py
music_assistant/providers/spotify_connect/__init__.py

index 97c4a737e05068106ad25527ed205fe43896b262..efe075ec3e9e1d37b1c896bc64a5542b96f05beb 100644 (file)
@@ -97,7 +97,7 @@ class StreamsController(CoreController):
     def __init__(self, *args, **kwargs) -> None:
         """Initialize instance."""
         super().__init__(*args, **kwargs)
-        self._server = Webserver(self.logger, enable_dynamic_routes=True)
+        self._server = Webserver(self.mass, self.logger, enable_dynamic_routes=True)
         self.register_dynamic_route = self._server.register_dynamic_route
         self.unregister_dynamic_route = self._server.unregister_dynamic_route
         self.manifest.name = "Streamserver"
index 29cfe9d02d27bb791b59095d46c19521bb25f9c4..77b60675630f3f3f606266cfeeb0bd0aff5289ac 100644 (file)
@@ -56,7 +56,7 @@ class WebserverController(CoreController):
     def __init__(self, *args, **kwargs) -> None:
         """Initialize instance."""
         super().__init__(*args, **kwargs)
-        self._server = Webserver(self.logger, enable_dynamic_routes=False)
+        self._server = Webserver(self.mass, self.logger, enable_dynamic_routes=False)
         self.clients: set[WebsocketClientHandler] = set()
         self.manifest.name = "Web Server (frontend and api)"
         self.manifest.description = (
@@ -260,7 +260,7 @@ class WebsocketClientHandler:
 
         self._logger.log(VERBOSE_LOG_LEVEL, "Connection from %s", request.remote)
         self._handle_task = asyncio.current_task()
-        self._writer_task = asyncio.create_task(self._writer())
+        self._writer_task = self.mass.create_task(self._writer())
 
         # send server(version) info when client connects
         self._send_message(self.mass.get_server_info())
@@ -340,7 +340,7 @@ class WebsocketClientHandler:
             return
 
         # schedule task to handle the command
-        asyncio.create_task(self._run_handler(handler, msg))
+        self.mass.create_task(self._run_handler(handler, msg))
 
     async def _run_handler(self, handler: APICommandHandler, msg: CommandMessage) -> None:
         try:
index b3913008756dd17170a79260fd5d07020cfa3b9d..6402f530870804bc9aa3dd49c14e2eb390c249f1 100644 (file)
@@ -7,6 +7,7 @@ import logging
 import time
 from collections import deque
 from collections.abc import AsyncGenerator
+from contextlib import suppress
 from typing import TYPE_CHECKING, Final
 
 from music_assistant_models.enums import ContentType
@@ -91,6 +92,8 @@ class FFMpeg(AsyncProcess):
             return
         if self._stdin_task and not self._stdin_task.done():
             self._stdin_task.cancel()
+            with suppress(asyncio.CancelledError):
+                await self._stdin_task
         await super().close(send_signal)
 
     async def _log_reader_task(self) -> None:
@@ -147,13 +150,12 @@ class FFMpeg(AsyncProcess):
             generator_exhausted = True
         except Exception as err:
             cancelled = isinstance(err, asyncio.CancelledError)
-            if cancelled:
-                raise
             self.logger.error(
                 "Stream error: %s",
                 str(err) or err.__class__.__name__,
-                exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
+                exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None,
             )
+            raise
         finally:
             if not cancelled:
                 await self.write_eof()
index 64a28c31e956f4c15025695a656f87fff9d04951..fbdc6e4ee03befa25f6d18f1b29ac29736bbf52d 100644 (file)
@@ -11,6 +11,8 @@ if TYPE_CHECKING:
     import logging
     from collections.abc import Awaitable, Callable
 
+    from music_assistant.mass import MusicAssistant
+
 MAX_CLIENT_SIZE: Final = 1024**2 * 16
 MAX_LINE_SIZE: Final = 24570
 
@@ -20,10 +22,12 @@ class Webserver:
 
     def __init__(
         self,
+        mass: MusicAssistant,
         logger: logging.Logger,
         enable_dynamic_routes: bool = False,
     ) -> None:
         """Initialize instance."""
+        self.mass = mass
         self.logger = logger
         # the below gets initialized in async setup
         self._apprunner: web.AppRunner | None = None
@@ -52,6 +56,7 @@ class Webserver:
                 "max_line_size": MAX_LINE_SIZE,
                 "max_field_size": MAX_LINE_SIZE,
             },
+            loop=self.mass.loop,
         )
         self.logger.info("Starting server on  %s:%s - base url: %s", bind_ip, bind_port, base_url)
         self._apprunner = web.AppRunner(self._webapp, access_log=None, shutdown_timeout=10)
index 1e1b900eaaf7727c7666f5e3f88e3757dbfd5de9..d0295d627b554e3fef7f6e2beefa00d9e3e26ab1 100644 (file)
@@ -5,8 +5,9 @@ from __future__ import annotations
 import asyncio
 import logging
 import os
+import threading
 from collections.abc import Awaitable, Callable, Coroutine
-from typing import TYPE_CHECKING, Any, Self, TypeGuard, TypeVar
+from typing import TYPE_CHECKING, Any, Self, TypeGuard, TypeVar, cast
 from uuid import uuid4
 
 import aiofiles
@@ -65,7 +66,7 @@ rmfile = wrap(os.remove)
 listdir = wrap(os.listdir)
 rename = wrap(os.rename)
 
-EventCallBackType = Callable[[MassEvent], None]
+EventCallBackType = Callable[[MassEvent], None] | Coroutine[MassEvent, Any, None]
 EventSubscriptionType = tuple[
     EventCallBackType, tuple[EventType, ...] | None, tuple[str, ...] | None
 ]
@@ -289,6 +290,11 @@ class MusicAssistant:
         if self.closing:
             return
 
+        if ENABLE_DEBUG and not isinstance(threading.current_thread(), threading._MainThread):  # type: ignore[attr-defined]
+            raise RuntimeError(
+                "Non-Async operation detected: This method may only be called from the eventloop."
+            )
+
         if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL):
             # do not log queue time updated events because that is too chatty
             LOGGER.getChild("event").log(VERBOSE_LOG_LEVEL, "%s %s", event.value, object_id or "")
@@ -300,8 +306,12 @@ class MusicAssistant:
             if not (id_filter is None or object_id in id_filter):
                 continue
             if asyncio.iscoroutinefunction(cb_func):
-                asyncio.run_coroutine_threadsafe(cb_func(event_obj), self.loop)
+                if TYPE_CHECKING:
+                    cb_func = cast(Coroutine[Any, Any, None], cb_func)
+                self.create_task(cb_func, event_obj)
             else:
+                if TYPE_CHECKING:
+                    cb_func = cast(Callable[[MassEvent], None], cb_func)
                 self.loop.call_soon_threadsafe(cb_func, event_obj)
 
     def subscribe(
@@ -331,12 +341,12 @@ class MusicAssistant:
 
     def create_task(
         self,
-        target: Coroutine[Any, Any, _R] | Awaitable[_R] | Callable[..., _R],
+        target: Coroutine[Any, Any, _R] | Awaitable[_R],
         *args: Any,
         task_id: str | None = None,
         abort_existing: bool = False,
         **kwargs: Any,
-    ) -> asyncio.Task[_R] | asyncio.Future[_R]:
+    ) -> asyncio.Task[_R]:
         """Create Task on (main) event loop from Coroutine(function).
 
         Tasks created by this helper will be properly cancelled on stop.
@@ -347,6 +357,11 @@ class MusicAssistant:
                 existing.cancel()
             else:
                 return existing
+        if ENABLE_DEBUG and not isinstance(threading.current_thread(), threading._MainThread):  # type: ignore[attr-defined]
+            raise RuntimeError(
+                "Non-Async operation detected: This method may only be called from the eventloop."
+            )
+
         if asyncio.iscoroutinefunction(target):
             # coroutine function
             task = self.loop.create_task(target(*args, **kwargs))
@@ -354,7 +369,7 @@ class MusicAssistant:
             # coroutine
             task = self.loop.create_task(target)
         elif callable(target):
-            task = self.loop.create_task(asyncio.to_thread(target, *args, **kwargs))
+            raise RuntimeError("Function is not a coroutine or coroutine function")
         else:
             raise RuntimeError("Target is missing")
 
@@ -401,11 +416,25 @@ class MusicAssistant:
         if existing := self._tracked_timers.get(task_id):
             existing.cancel()
 
+        if ENABLE_DEBUG and not isinstance(threading.current_thread(), threading._MainThread):  # type: ignore[attr-defined]
+            raise RuntimeError(
+                "Non-Async operation detected: This method may only be called from the eventloop."
+            )
+
         def _create_task() -> None:
             self._tracked_timers.pop(task_id)
+            if TYPE_CHECKING:
+                target = cast(Coroutine[Any, Any, _R], target)  # noqa: F823
             self.create_task(target, *args, task_id=task_id, abort_existing=True, **kwargs)
 
-        handle = self.loop.call_later(delay, _create_task)
+        if asyncio.iscoroutinefunction(target) or asyncio.iscoroutine(target):
+            # coroutine function
+            handle = self.loop.call_later(delay, _create_task)
+        else:
+            # regular callable
+            if TYPE_CHECKING:
+                target = cast(Callable[..., _R], target)
+            handle = self.loop.call_later(delay, target, *args)
         self._tracked_timers[task_id] = handle
         return handle
 
index b717e471fe4b99539536de4f52546cacc3ca4b8b..93185373431e83850d732b464b0cfd2b57f657ee 100644 (file)
@@ -89,6 +89,13 @@ class RaopStreamSession:
                         *[x.raop_stream.write_eof() for x in self._sync_clients if x.raop_stream],
                         return_exceptions=True,
                     )
+            except Exception as err:
+                logger = self.prov.logger
+                logger.error(
+                    "Stream error: %s",
+                    str(err) or err.__class__.__name__,
+                    exc_info=err if logger.isEnabledFor(logging.DEBUG) else None,
+                )
             finally:
                 if not generator_exhausted:
                     await close_async_generator(self._audio_source)
@@ -114,8 +121,10 @@ class RaopStreamSession:
         if self._stopped:
             return
         self._stopped = True
-        if self._audio_source_task and not self._audio_source_task.done():
+        if self._audio_source_task:
             self._audio_source_task.cancel()
+            with suppress(asyncio.CancelledError):
+                await self._audio_source_task
         await asyncio.gather(
             *[self.remove_client(x) for x in self._sync_clients],
             return_exceptions=True,
@@ -269,6 +278,10 @@ class RaopStream:
             await self._cliraop_proc.close(True)
         if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
             await self._ffmpeg_proc.close(True)
+        if self._log_reader_task:
+            self._log_reader_task.cancel()
+            with suppress(asyncio.CancelledError):
+                await self._log_reader_task
         self._cliraop_proc = None
         self._ffmpeg_proc = None
 
index 5b13815bdb71f54f48a90f4d82d27ee23b48d78b..c8e02601cb8b53a080872dc23f5567524b169a46 100644 (file)
@@ -9,6 +9,7 @@ from __future__ import annotations
 
 import asyncio
 from collections.abc import AsyncGenerator, Awaitable, Callable
+from contextlib import suppress
 from typing import TYPE_CHECKING
 
 if TYPE_CHECKING:
@@ -53,6 +54,8 @@ class UGPStream:
             return
         if self._task and not self._task.done():
             self._task.cancel()
+        with suppress(asyncio.CancelledError):
+            await self._task
         self._done.set()
 
     async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
index 74fa2baab8e65dd1c7a5b7b5fa4edd9567ae95d3..b4016526fac8934bc28db664dde1eb7b6b0f9119 100644 (file)
@@ -162,7 +162,7 @@ class SiriusXMProvider(MusicProvider):
         self._base_url = f"{bind_ip}:{bind_port}"
         http_handler = sxm.http.make_http_handler(self._client)
 
-        self._sxm_server = Webserver(self.logger)
+        self._sxm_server = Webserver(self.mass, self.logger)
 
         await self._sxm_server.setup(
             bind_ip=bind_ip,
index 916f8f4f7ee96457315db89aabdd9b0730ea9efb..53d8a04e3e0439cc34ed8415a07de4263806ae74 100644 (file)
@@ -565,7 +565,7 @@ class SnapCastProvider(PlayerProvider):
                 await self._delete_current_snapstream(stream, media)
 
         # start streaming the queue (pcm) audio in a background task
-        self._stream_tasks[player_id] = asyncio.create_task(_streamer())
+        self._stream_tasks[player_id] = self.mass.create_task(_streamer())
 
     async def _delete_current_snapstream(self, stream: Snapstream, media: PlayerMedia) -> None:
         with suppress(TypeError, KeyError, AttributeError):
@@ -727,7 +727,7 @@ class SnapCastProvider(PlayerProvider):
         """Start the built-in Snapserver."""
         if self._use_builtin_server:
             self._snapserver_started = asyncio.Event()
-            self._snapserver_runner = asyncio.create_task(self._builtin_server_runner())
+            self._snapserver_runner = self.mass.create_task(self._builtin_server_runner())
             await asyncio.wait_for(self._snapserver_started.wait(), 10)
 
     def _handle_disconnect(self, exc: Exception) -> None:
index 3fcfe3f2d13430e259079cd45a7e151c4b71448f..28cc44c74625a8fc875a3b479b71c5f5e8471a74 100644 (file)
@@ -403,7 +403,7 @@ class SonosPlayerProvider(PlayerProvider):
             finally:
                 self._discovery_running = False
 
-        await self.mass.create_task(do_discover)
+        await asyncio.to_thread(do_discover)
 
         def reschedule() -> None:
             self._discovery_reschedule_timer = None
index 339abdfdb46dd924d9f2f1217cd39b12c6af1a41..bb91bb53005db478d3f02c11b3a83fe564a4fefa 100644 (file)
@@ -301,13 +301,13 @@ class SonosPlayer:
     ) -> None:
         """Sync given players/speakers with this player."""
         async with self.sonos_prov.topology_condition:
-            group: list[SonosPlayer] = await self.mass.create_task(self._join, members)
+            group: list[SonosPlayer] = await asyncio.to_thread(self._join, members)
             await self.wait_for_groups([group])
 
     async def unjoin(self) -> None:
         """Unjoin player from all/any groups."""
         async with self.sonos_prov.topology_condition:
-            await self.mass.create_task(self._unjoin)
+            await asyncio.to_thread(self._unjoin)
             await self.wait_for_groups([[self]])
 
     def update_player(self, signal_update: bool = True) -> None:
@@ -530,7 +530,7 @@ class SonosPlayer:
             if group:
                 assert isinstance(group, str)
                 return group.split(",")
-            return await self.mass.create_task(_get_soco_group)
+            return await asyncio.to_thread(_get_soco_group)
 
         def _regroup(group: list[str]) -> None:
             """Rebuild internal group layout (async safe)."""
index 1c99a2811f00e69b2acbf25dd6cb0fe59fb0a722..d701676951cf261fffd515c7c2371d90bf89148c 100644 (file)
@@ -606,7 +606,7 @@ class SpotifyProvider(MusicProvider):
                     logger.log(VERBOSE_LOG_LEVEL, line)
 
             if stderr:
-                log_reader = asyncio.create_task(_read_stderr())
+                log_reader = self.mass.create_task(_read_stderr())
 
             async for chunk in librespot_proc.iter_any(chunk_size):
                 yield chunk
index 6b1556e0ae6a398bf068c38bbe18817c930b7894..46dc859bba0b1c75bd3cfda5d3262914e8d94e01 100644 (file)
@@ -301,7 +301,7 @@ class SpotifyConnectProvider(MusicProvider):
     def _setup_player_daemon(self) -> None:
         """Handle setup of the spotify connect daemon for a player."""
         self._librespot_started.clear()
-        self._runner_task = asyncio.create_task(self._librespot_runner())
+        self._runner_task = self.mass.create_task(self._librespot_runner())
 
     def _on_mass_player_event(self, event: MassEvent) -> None:
         """Handle incoming event from linked airplay player."""