def __init__(self, *args, **kwargs) -> None:
"""Initialize instance."""
super().__init__(*args, **kwargs)
- self._server = Webserver(self.logger, enable_dynamic_routes=True)
+ self._server = Webserver(self.mass, self.logger, enable_dynamic_routes=True)
self.register_dynamic_route = self._server.register_dynamic_route
self.unregister_dynamic_route = self._server.unregister_dynamic_route
self.manifest.name = "Streamserver"
def __init__(self, *args, **kwargs) -> None:
"""Initialize instance."""
super().__init__(*args, **kwargs)
- self._server = Webserver(self.logger, enable_dynamic_routes=False)
+ self._server = Webserver(self.mass, self.logger, enable_dynamic_routes=False)
self.clients: set[WebsocketClientHandler] = set()
self.manifest.name = "Web Server (frontend and api)"
self.manifest.description = (
self._logger.log(VERBOSE_LOG_LEVEL, "Connection from %s", request.remote)
self._handle_task = asyncio.current_task()
- self._writer_task = asyncio.create_task(self._writer())
+ self._writer_task = self.mass.create_task(self._writer())
# send server(version) info when client connects
self._send_message(self.mass.get_server_info())
return
# schedule task to handle the command
- asyncio.create_task(self._run_handler(handler, msg))
+ self.mass.create_task(self._run_handler(handler, msg))
async def _run_handler(self, handler: APICommandHandler, msg: CommandMessage) -> None:
try:
import time
from collections import deque
from collections.abc import AsyncGenerator
+from contextlib import suppress
from typing import TYPE_CHECKING, Final
from music_assistant_models.enums import ContentType
return
if self._stdin_task and not self._stdin_task.done():
self._stdin_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self._stdin_task
await super().close(send_signal)
async def _log_reader_task(self) -> None:
generator_exhausted = True
except Exception as err:
cancelled = isinstance(err, asyncio.CancelledError)
- if cancelled:
- raise
self.logger.error(
"Stream error: %s",
str(err) or err.__class__.__name__,
- exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
+ exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None,
)
+ raise
finally:
if not cancelled:
await self.write_eof()
import logging
from collections.abc import Awaitable, Callable
+ from music_assistant.mass import MusicAssistant
+
MAX_CLIENT_SIZE: Final = 1024**2 * 16
MAX_LINE_SIZE: Final = 24570
def __init__(
self,
+ mass: MusicAssistant,
logger: logging.Logger,
enable_dynamic_routes: bool = False,
) -> None:
"""Initialize instance."""
+ self.mass = mass
self.logger = logger
# the below gets initialized in async setup
self._apprunner: web.AppRunner | None = None
"max_line_size": MAX_LINE_SIZE,
"max_field_size": MAX_LINE_SIZE,
},
+ loop=self.mass.loop,
)
self.logger.info("Starting server on %s:%s - base url: %s", bind_ip, bind_port, base_url)
self._apprunner = web.AppRunner(self._webapp, access_log=None, shutdown_timeout=10)
import asyncio
import logging
import os
+import threading
from collections.abc import Awaitable, Callable, Coroutine
-from typing import TYPE_CHECKING, Any, Self, TypeGuard, TypeVar
+from typing import TYPE_CHECKING, Any, Self, TypeGuard, TypeVar, cast
from uuid import uuid4
import aiofiles
listdir = wrap(os.listdir)
rename = wrap(os.rename)
-EventCallBackType = Callable[[MassEvent], None]
+EventCallBackType = Callable[[MassEvent], None] | Coroutine[MassEvent, Any, None]
EventSubscriptionType = tuple[
EventCallBackType, tuple[EventType, ...] | None, tuple[str, ...] | None
]
if self.closing:
return
+ if ENABLE_DEBUG and not isinstance(threading.current_thread(), threading._MainThread): # type: ignore[attr-defined]
+ raise RuntimeError(
+ "Non-Async operation detected: This method may only be called from the eventloop."
+ )
+
if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL):
# do not log queue time updated events because that is too chatty
LOGGER.getChild("event").log(VERBOSE_LOG_LEVEL, "%s %s", event.value, object_id or "")
if not (id_filter is None or object_id in id_filter):
continue
if asyncio.iscoroutinefunction(cb_func):
- asyncio.run_coroutine_threadsafe(cb_func(event_obj), self.loop)
+ if TYPE_CHECKING:
+ cb_func = cast(Coroutine[Any, Any, None], cb_func)
+ self.create_task(cb_func, event_obj)
else:
+ if TYPE_CHECKING:
+ cb_func = cast(Callable[[MassEvent], None], cb_func)
self.loop.call_soon_threadsafe(cb_func, event_obj)
def subscribe(
def create_task(
self,
- target: Coroutine[Any, Any, _R] | Awaitable[_R] | Callable[..., _R],
+ target: Coroutine[Any, Any, _R] | Awaitable[_R],
*args: Any,
task_id: str | None = None,
abort_existing: bool = False,
**kwargs: Any,
- ) -> asyncio.Task[_R] | asyncio.Future[_R]:
+ ) -> asyncio.Task[_R]:
"""Create Task on (main) event loop from Coroutine(function).
Tasks created by this helper will be properly cancelled on stop.
existing.cancel()
else:
return existing
+ if ENABLE_DEBUG and not isinstance(threading.current_thread(), threading._MainThread): # type: ignore[attr-defined]
+ raise RuntimeError(
+ "Non-Async operation detected: This method may only be called from the eventloop."
+ )
+
if asyncio.iscoroutinefunction(target):
# coroutine function
task = self.loop.create_task(target(*args, **kwargs))
# coroutine
task = self.loop.create_task(target)
elif callable(target):
- task = self.loop.create_task(asyncio.to_thread(target, *args, **kwargs))
+ raise RuntimeError("Function is not a coroutine or coroutine function")
else:
raise RuntimeError("Target is missing")
if existing := self._tracked_timers.get(task_id):
existing.cancel()
+ if ENABLE_DEBUG and not isinstance(threading.current_thread(), threading._MainThread): # type: ignore[attr-defined]
+ raise RuntimeError(
+ "Non-Async operation detected: This method may only be called from the eventloop."
+ )
+
def _create_task() -> None:
self._tracked_timers.pop(task_id)
+ if TYPE_CHECKING:
+ target = cast(Coroutine[Any, Any, _R], target) # noqa: F823
self.create_task(target, *args, task_id=task_id, abort_existing=True, **kwargs)
- handle = self.loop.call_later(delay, _create_task)
+ if asyncio.iscoroutinefunction(target) or asyncio.iscoroutine(target):
+ # coroutine function
+ handle = self.loop.call_later(delay, _create_task)
+ else:
+ # regular callable
+ if TYPE_CHECKING:
+ target = cast(Callable[..., _R], target)
+ handle = self.loop.call_later(delay, target, *args)
self._tracked_timers[task_id] = handle
return handle
*[x.raop_stream.write_eof() for x in self._sync_clients if x.raop_stream],
return_exceptions=True,
)
+ except Exception as err:
+ logger = self.prov.logger
+ logger.error(
+ "Stream error: %s",
+ str(err) or err.__class__.__name__,
+ exc_info=err if logger.isEnabledFor(logging.DEBUG) else None,
+ )
finally:
if not generator_exhausted:
await close_async_generator(self._audio_source)
if self._stopped:
return
self._stopped = True
- if self._audio_source_task and not self._audio_source_task.done():
+ if self._audio_source_task:
self._audio_source_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self._audio_source_task
await asyncio.gather(
*[self.remove_client(x) for x in self._sync_clients],
return_exceptions=True,
await self._cliraop_proc.close(True)
if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
await self._ffmpeg_proc.close(True)
+ if self._log_reader_task:
+ self._log_reader_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self._log_reader_task
self._cliraop_proc = None
self._ffmpeg_proc = None
import asyncio
from collections.abc import AsyncGenerator, Awaitable, Callable
+from contextlib import suppress
from typing import TYPE_CHECKING
if TYPE_CHECKING:
return
if self._task and not self._task.done():
self._task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self._task
self._done.set()
async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
self._base_url = f"{bind_ip}:{bind_port}"
http_handler = sxm.http.make_http_handler(self._client)
- self._sxm_server = Webserver(self.logger)
+ self._sxm_server = Webserver(self.mass, self.logger)
await self._sxm_server.setup(
bind_ip=bind_ip,
await self._delete_current_snapstream(stream, media)
# start streaming the queue (pcm) audio in a background task
- self._stream_tasks[player_id] = asyncio.create_task(_streamer())
+ self._stream_tasks[player_id] = self.mass.create_task(_streamer())
async def _delete_current_snapstream(self, stream: Snapstream, media: PlayerMedia) -> None:
with suppress(TypeError, KeyError, AttributeError):
"""Start the built-in Snapserver."""
if self._use_builtin_server:
self._snapserver_started = asyncio.Event()
- self._snapserver_runner = asyncio.create_task(self._builtin_server_runner())
+ self._snapserver_runner = self.mass.create_task(self._builtin_server_runner())
await asyncio.wait_for(self._snapserver_started.wait(), 10)
def _handle_disconnect(self, exc: Exception) -> None:
finally:
self._discovery_running = False
- await self.mass.create_task(do_discover)
+ await asyncio.to_thread(do_discover)
def reschedule() -> None:
self._discovery_reschedule_timer = None
) -> None:
"""Sync given players/speakers with this player."""
async with self.sonos_prov.topology_condition:
- group: list[SonosPlayer] = await self.mass.create_task(self._join, members)
+ group: list[SonosPlayer] = await asyncio.to_thread(self._join, members)
await self.wait_for_groups([group])
async def unjoin(self) -> None:
"""Unjoin player from all/any groups."""
async with self.sonos_prov.topology_condition:
- await self.mass.create_task(self._unjoin)
+ await asyncio.to_thread(self._unjoin)
await self.wait_for_groups([[self]])
def update_player(self, signal_update: bool = True) -> None:
if group:
assert isinstance(group, str)
return group.split(",")
- return await self.mass.create_task(_get_soco_group)
+ return await asyncio.to_thread(_get_soco_group)
def _regroup(group: list[str]) -> None:
"""Rebuild internal group layout (async safe)."""
logger.log(VERBOSE_LOG_LEVEL, line)
if stderr:
- log_reader = asyncio.create_task(_read_stderr())
+ log_reader = self.mass.create_task(_read_stderr())
async for chunk in librespot_proc.iter_any(chunk_size):
yield chunk
def _setup_player_daemon(self) -> None:
"""Handle setup of the spotify connect daemon for a player."""
self._librespot_started.clear()
- self._runner_task = asyncio.create_task(self._librespot_runner())
+ self._runner_task = self.mass.create_task(self._librespot_runner())
def _on_mass_player_event(self, event: MassEvent) -> None:
"""Handle incoming event from linked airplay player."""