import logging
import os
-from aiorun import run
-
from music_assistant.mass import MusicAssistant
from music_assistant.models.player import Player, PlayerState
from music_assistant.providers.filesystem import FileSystemProvider
os.makedirs(data_dir)
db_file = os.path.join(data_dir, "music_assistant.db")
-mass = MusicAssistant(f"sqlite:///{db_file}")
-
providers = []
if args.spotify_username and args.spotify_password:
self.update_state()
-def main():
+async def main():
"""Handle main execution."""
- async def async_main():
- """Async main routine."""
- asyncio.get_event_loop().set_debug(args.debug)
+ asyncio.get_event_loop().set_debug(args.debug)
+
+ async with MusicAssistant(f"sqlite:///{db_file}") as mass:
- await mass.setup()
# register music provider(s)
for prov in providers:
await mass.music.register_provider(prov)
if len(playlists) > 0:
await test_player.active_queue.play_media(playlists[0].uri)
- def on_shutdown(loop):
- loop.run_until_complete(mass.stop())
-
- run(
- async_main(),
- use_uvloop=True,
- shutdown_callback=on_shutdown,
- executor_workers=64,
- )
+ await asyncio.sleep(3600)
if __name__ == "__main__":
- main()
+ try:
+ asyncio.run(main())
+ except KeyboardInterrupt:
+ pass
import logging
import os
-from aiorun import run
from music_assistant.mass import MusicAssistant
from music_assistant.providers.spotify import SpotifyProvider
spotify = SpotifyProvider(args.username, args.password)
-def main():
+async def main():
"""Handle main execution."""
- async def async_main():
- """Async main routine."""
- asyncio.get_event_loop().set_debug(args.debug)
- await mass.setup()
- # register music provider(s)
- await mass.music.register_provider(spotify)
- # get some data
- await mass.music.artists.library()
- await mass.music.tracks.library()
- await mass.music.radio.library()
-
- def on_shutdown(loop):
- loop.run_until_complete(mass.stop())
-
- run(
- async_main(),
- use_uvloop=True,
- shutdown_callback=on_shutdown,
- executor_workers=64,
- )
+ asyncio.get_event_loop().set_debug(args.debug)
+
+ # without contextmanager we need to call the async setup
+ await mass.setup()
+ # register music provider(s)
+ await mass.music.register_provider(spotify)
+ # get some data
+ await mass.music.artists.library()
+ await mass.music.tracks.library()
+ await mass.music.radio.library()
+
+ # run for an hour until someone hits CTRL+C
+ await asyncio.sleep(3600)
+
+ # without contextmanager we need to call the stop
+ await mass.stop()
if __name__ == "__main__":
- main()
+ try:
+ asyncio.run(main())
+ except KeyboardInterrupt:
+ pass
from music_assistant.helpers.cache import cached
from music_assistant.helpers.datetime import utc_timestamp
from music_assistant.helpers.typing import MusicAssistant
-from music_assistant.helpers.util import create_task, run_periodic
+from music_assistant.helpers.util import run_periodic
from music_assistant.models.errors import (
AlreadyRegisteredError,
MusicAssistantError,
await self.tracks.setup()
await self.radio.setup()
await self.playlists.setup()
- create_task(self.__periodic_sync)
+ self.mass.create_task(self.__periodic_sync)
@property
def provider_count(self) -> int:
else:
self._providers[provider.id] = provider
self.mass.signal_event(EventType.PROVIDER_REGISTERED, provider)
- create_task(self.run_provider_sync(provider.id))
+ self.mass.create_task(self.run_provider_sync(provider.id))
async def search(
self, search_query, media_types: List[MediaType], limit: int = 10
)
from music_assistant.helpers.process import AsyncProcess
from music_assistant.helpers.typing import MusicAssistant
-from music_assistant.helpers.util import create_task, get_ip
+from music_assistant.helpers.util import get_ip
from music_assistant.models.errors import MediaNotFoundError
from music_assistant.models.media_items import ContentType
from music_assistant.models.player_queue import PlayerQueue
# write eof when last packet is received
sox_proc.write_eof()
- create_task(writer)
+ self.mass.create_task(writer)
# read bytes from final output
chunksize = 32000 if output_fmt == ContentType.MP3 else 90000
from typing import Awaitable
from music_assistant.helpers.typing import MusicAssistant
-from music_assistant.helpers.util import create_task
DB_TABLE = "cache"
async def cached(
- cache,
+ cache: Cache,
cache_key: str,
coro_func: Awaitable,
*args,
result = await coro_func
else:
result = await coro_func(*args)
- create_task(cache.set(cache_key, result, checksum, expires))
+ cache.mass.create_task(cache.set(cache_key, result, checksum, expires))
return result
from __future__ import annotations
import asyncio
-import functools
import os
import platform
import socket
import tempfile
-import threading
-from asyncio.events import AbstractEventLoop
-from typing import Any, Callable, Dict, List, Optional, Set, TypeVar, Union
+from typing import Any, Callable, Dict, List, Optional, Set, TypeVar
import memory_tempfile
CALLBACK_TYPE = Callable[[], None]
# pylint: enable=invalid-name
-DEFAULT_LOOP = None
-
-
-def create_task(
- target: Callable[..., Any],
- *args: Any,
- loop: AbstractEventLoop = None,
- **kwargs: Any,
-) -> Union[asyncio.Task, asyncio.Future]:
- """Create Task on (main) event loop from Callable or awaitable.
-
- target: target to call.
- loop: Running (main) event loop, defaults to loop in current thread
- args/kwargs: parameters for method to call.
- """
- try:
- loop = loop or asyncio.get_running_loop()
- except RuntimeError:
- # try to fetch the default loop from global variable
- loop = DEFAULT_LOOP
-
- # Check for partials to properly determine if coroutine function
- check_target = target
- while isinstance(check_target, functools.partial):
- check_target = check_target.func
-
- async def executor_wrapper(_target: Callable, *_args, **_kwargs):
- return await loop.run_in_executor(None, _target, *_args, **_kwargs)
-
- # called from other thread
- if threading.current_thread() is not threading.main_thread():
- if asyncio.iscoroutine(check_target):
- return asyncio.run_coroutine_threadsafe(target, loop)
- if asyncio.iscoroutinefunction(check_target):
- return asyncio.run_coroutine_threadsafe(target(*args), loop)
- return asyncio.run_coroutine_threadsafe(
- executor_wrapper(target, *args, **kwargs), loop
- )
-
- if asyncio.iscoroutine(check_target):
- return loop.create_task(target)
- if asyncio.iscoroutinefunction(check_target):
- return loop.create_task(target(*args))
- return loop.create_task(executor_wrapper(target, *args, **kwargs))
-
def run_periodic(delay: float, later: bool = False):
"""Run a coroutine at interval."""
from __future__ import annotations
import asyncio
+import functools
import logging
+import threading
from time import time
-from typing import Any, Callable, Coroutine, Optional, Tuple, Union
+from types import TracebackType
+from typing import Any, Callable, Coroutine, List, Optional, Tuple, Type, Union
import aiohttp
from databases import DatabaseURL
from music_assistant.controllers.metadata import MetaDataController
from music_assistant.controllers.music import MusicController
from music_assistant.controllers.players import PlayerController
-from music_assistant.helpers import util
from music_assistant.helpers.cache import Cache
from music_assistant.helpers.database import Database
-from music_assistant.helpers.util import create_task
EventCallBackType = Callable[[EventType, Any], None]
EventSubscriptionType = Tuple[EventCallBackType, Optional[Tuple[EventType]]]
self.metadata = MetaDataController(self)
self.music = MusicController(self)
self.players = PlayerController(self, stream_port)
- self._jobs_task: asyncio.Task = None
+ self._tracked_tasks: List[asyncio.Task] = []
async def setup(self) -> None:
"""Async setup of music assistant."""
# initialize loop
self.loop = asyncio.get_event_loop()
- util.DEFAULT_LOOP = self.loop
# create shared aiohttp ClientSession
if not self.http_session:
self.http_session = aiohttp.ClientSession(
await self.music.setup()
await self.metadata.setup()
await self.players.setup()
- self._jobs_task = create_task(self.__process_jobs())
+ self.create_task(self.__process_jobs())
async def stop(self) -> None:
"""Stop running the music assistant server."""
- self.logger.info("Application shutdown")
+ self.logger.info("Stop called, cleaning up...")
+ # cancel any running tasks
+ for task in self._tracked_tasks:
+ task.cancel()
self.signal_event(EventType.SHUTDOWN)
- if self._jobs_task is not None:
- self._jobs_task.cancel()
+ # wait for any remaining tasks launched by the shutdown event
+ await asyncio.wait_for(asyncio.wait(self._tracked_tasks), 2)
if self.http_session and not self.http_session_provided:
await self.http_session.connector.close()
- self.http_session.detach()
+ self.http_session.detach()
def signal_event(self, event_type: EventType, event_details: Any = None) -> None:
"""
:param event_details: optional details to send with the event.
"""
for cb_func, event_filter in self._listeners:
- if not event_filter or event_type in event_filter:
- create_task(cb_func, event_type, event_details)
+ if event_filter is None or event_type in event_filter:
+ self.create_task(cb_func, event_type, event_details)
def subscribe(
self,
"""
if isinstance(event_filter, EventType):
event_filter = (event_filter,)
- elif event_filter is None:
- event_filter = tuple()
listener = (cb_func, event_filter)
self._listeners.append(listener)
name = job.__qualname__ or job.__name__
self._jobs.put_nowait((name, job))
+ def create_task(
+ self,
+ target: Callable[..., Any],
+ *args: Any,
+ **kwargs: Any,
+ ) -> Union[asyncio.Task, asyncio.Future]:
+ """
+ Create Task on (main) event loop from Callable or awaitable.
+
+ Tasks create dby this helper will be properly cancelled on stop.
+ """
+
+ # Check for partials to properly determine if coroutine function
+ check_target = target
+ while isinstance(check_target, functools.partial):
+ check_target = check_target.func
+
+ async def executor_wrapper(_target: Callable, *_args, **_kwargs):
+ return await self.loop.run_in_executor(None, _target, *_args, **_kwargs)
+
+ # called from other thread
+ if threading.current_thread() is not threading.main_thread():
+ if asyncio.iscoroutine(check_target):
+ task = asyncio.run_coroutine_threadsafe(target, self.loop)
+ elif asyncio.iscoroutinefunction(check_target):
+ task = asyncio.run_coroutine_threadsafe(target(*args), self.loop)
+ else:
+ task = asyncio.run_coroutine_threadsafe(
+ executor_wrapper(target, *args, **kwargs), self.loop
+ )
+ else:
+ if asyncio.iscoroutine(check_target):
+ task = self.loop.create_task(target)
+ elif asyncio.iscoroutinefunction(check_target):
+ task = self.loop.create_task(target(*args))
+ else:
+ task = self.loop.create_task(executor_wrapper(target, *args, **kwargs))
+
+ def task_done_callback(*args, **kwargs):
+ self.logger.debug("task finished %s", task.get_name())
+ self._tracked_tasks.remove(task)
+
+ self._tracked_tasks.append(task)
+ task.add_done_callback(task_done_callback)
+ self.logger.debug("spawned task %s", task.get_name())
+ return task
+
async def __process_jobs(self):
"""Process jobs in the background."""
while True:
self.logger.debug("Start processing job [%s].", name)
try:
# await job
- task = asyncio.create_task(job, name=name)
+ task = self.create_task(job, name=name)
await task
except Exception as err: # pylint: disable=broad-except
self.logger.error(
else:
duration = round(time() - time_start, 2)
self.logger.info("Finished job [%s] in %s seconds.", name, duration)
+
+ async def __aenter__(self) -> "MusicAssistant":
+ """Return Context manager."""
+ await self.setup()
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: Type[BaseException],
+ exc_val: BaseException,
+ exc_tb: TracebackType,
+ ) -> Optional[bool]:
+ """Exit context manager."""
+ await self.stop()
+ if exc_val:
+ raise exc_val
+ return exc_type
from mashumaro import DataClassDictMixin
from music_assistant.constants import EventType
from music_assistant.helpers.typing import MusicAssistant
-from music_assistant.helpers.util import create_task
if TYPE_CHECKING:
from .player_queue import PlayerQueue
# update group player childs when parent updates
for child_player_id in self.group_childs:
if player := self.mass.players.get_player(child_player_id):
- create_task(player.update_state)
+ self.mass.create_task(player.update_state)
else:
# update group player when child updates
for group_player_id in self.get_group_parents():
if player := self.mass.players.get_player(group_player_id):
- create_task(player.update_state)
+ self.mass.create_task(player.update_state)
def get_group_parents(self) -> List[str]:
"""Get any/all group player id's this player belongs to."""
from music_assistant.constants import EventType
from music_assistant.helpers.audio import get_stream_details
from music_assistant.helpers.typing import MusicAssistant
-from music_assistant.helpers.util import create_task
from music_assistant.models.errors import MediaNotFoundError, QueueEmpty
from music_assistant.models.media_items import MediaType, StreamDetails
# handle case where stream stopped on purpose and we need to restart it
if self.player.state != PlayerState.PLAYING and self._signal_next:
self._signal_next = False
- create_task(self.play())
+ self.mass.create_task(self.play())
# start updater task if needed
if self.player.state == PlayerState.PLAYING:
if not self._update_task:
- self._update_task = create_task(self.__update_task())
+ self._update_task = self.mass.create_task(self.__update_task())
else:
if self._update_task:
self._update_task.cancel()
if self._save_task and not self._save_task.cancelled():
return
- self._save_task = self.mass.loop.call_later(60, create_task, cache_items)
+ self._save_task = self.mass.loop.call_later(
+ 60, self.mass.create_task, cache_items
+ )
aiosqlite>=0.13,<=0.17
python-slugify>=4.0,<=6.1.1
memory-tempfile<=2.2.3
-aiorun>=2021.10,<=2021.10.1
pillow>=8.0,<9.1.1
unidecode>=1.0,<=1.3.4
ujson>=4.0,<=5.1.0