From: Marcel van der Veldt Date: Sun, 10 Apr 2022 21:26:54 +0000 (+0200) Subject: improve eventbus X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=f8f384434a6ee63207e79d0a1bc2e1591106751d;p=music-assistant-server.git improve eventbus --- diff --git a/music_assistant/constants.py b/music_assistant/constants.py index 8a054cf6..af2a3d50 100755 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -1,6 +1,8 @@ """All constants for Music Assistant.""" +from dataclasses import dataclass from enum import Enum +from typing import Any, Optional class EventType(Enum): @@ -8,7 +10,7 @@ class EventType(Enum): PLAYER_ADDED = "player added" PLAYER_REMOVED = "player removed" - PLAYER_CHANGED = "player changed" + PLAYER_UPDATED = "player updated" STREAM_STARTED = "streaming started" STREAM_ENDED = "streaming ended" CONFIG_CHANGED = "config changed" @@ -26,6 +28,15 @@ class EventType(Enum): PROVIDER_REGISTERED = "PROVIDER_REGISTERED" +@dataclass +class MassEvent: + """Representation of an Event emitted in/by Music Assistant.""" + + type: EventType + object_id: Optional[str] = None # player_id, queue_id or uri + data: Optional[Any] = None # optional data (such as the object) + + # player attributes ATTR_PLAYER_ID = "player_id" ATTR_PROVIDER_ID = "provider_id" diff --git a/music_assistant/controllers/music/__init__.py b/music_assistant/controllers/music/__init__.py index f5378034..ae00a096 100755 --- a/music_assistant/controllers/music/__init__.py +++ b/music_assistant/controllers/music/__init__.py @@ -5,7 +5,7 @@ import asyncio import statistics from typing import Dict, List, Tuple -from music_assistant.constants import EventType +from music_assistant.constants import EventType, MassEvent from music_assistant.controllers.music.albums import AlbumsController from music_assistant.controllers.music.artists import ArtistsController from music_assistant.controllers.music.playlists import PlaylistController @@ -94,7 +94,11 @@ class MusicController: ) from err else: self._providers[provider.id] = provider - self.mass.signal_event(EventType.PROVIDER_REGISTERED, provider) + self.mass.signal_event( + MassEvent( + EventType.PROVIDER_REGISTERED, object_id=provider.id, data=provider + ) + ) self.mass.create_task(self.run_provider_sync(provider.id)) async def search( diff --git a/music_assistant/controllers/music/albums.py b/music_assistant/controllers/music/albums.py index 6e0abe29..0d5f3361 100644 --- a/music_assistant/controllers/music/albums.py +++ b/music_assistant/controllers/music/albums.py @@ -4,7 +4,7 @@ from __future__ import annotations import asyncio from typing import List -from music_assistant.constants import EventType +from music_assistant.constants import EventType, MassEvent from music_assistant.helpers.cache import cached from music_assistant.helpers.compare import compare_album, compare_strings from music_assistant.helpers.json import json_serializer @@ -92,7 +92,9 @@ class AlbumsController(MediaControllerBase[Album]): # also fetch same album on all providers await self._match(db_item) db_item = await self.get_db_item(db_item.item_id) - self.mass.signal_event(EventType.ALBUM_ADDED, db_item) + self.mass.signal_event( + MassEvent(EventType.ALBUM_ADDED, object_id=db_item.uri, data=db_item) + ) return db_item async def get_provider_album_tracks( diff --git a/music_assistant/controllers/music/artists.py b/music_assistant/controllers/music/artists.py index f03cf957..77a5bc37 100644 --- a/music_assistant/controllers/music/artists.py +++ b/music_assistant/controllers/music/artists.py @@ -4,7 +4,7 @@ import asyncio import itertools from typing import List -from music_assistant.constants import EventType +from music_assistant.constants import EventType, MassEvent from music_assistant.helpers.cache import cached from music_assistant.helpers.compare import ( compare_album, @@ -88,7 +88,9 @@ class ArtistsController(MediaControllerBase[Artist]): # also fetch same artist on all providers await self.match_artist(db_item) db_item = await self.get_db_item(db_item.item_id) - self.mass.signal_event(EventType.ARTIST_ADDED, db_item) + self.mass.signal_event( + MassEvent(EventType.ARTIST_ADDED, object_id=db_item.uri, data=db_item) + ) return db_item async def match_artist(self, db_artist: Artist): diff --git a/music_assistant/controllers/music/playlists.py b/music_assistant/controllers/music/playlists.py index 9b157052..190a6444 100644 --- a/music_assistant/controllers/music/playlists.py +++ b/music_assistant/controllers/music/playlists.py @@ -4,7 +4,7 @@ from __future__ import annotations import time from typing import List -from music_assistant.constants import EventType +from music_assistant.constants import EventType, MassEvent from music_assistant.helpers.cache import cached from music_assistant.helpers.json import json_serializer from music_assistant.helpers.util import create_sort_name, merge_dict, merge_list @@ -68,7 +68,9 @@ class PlaylistController(MediaControllerBase[Playlist]): async def add(self, item: Playlist) -> Playlist: """Add playlist to local db and return the new database item.""" db_item = await self.add_db_item(item) - self.mass.signal_event(EventType.PLAYLIST_ADDED, db_item) + self.mass.signal_event( + MassEvent(EventType.PLAYLIST_ADDED, object_id=db_item.uri, data=db_item) + ) return db_item async def add_playlist_tracks( diff --git a/music_assistant/controllers/music/radio.py b/music_assistant/controllers/music/radio.py index bdb8ddd1..84b62967 100644 --- a/music_assistant/controllers/music/radio.py +++ b/music_assistant/controllers/music/radio.py @@ -1,7 +1,7 @@ """Manage MediaItems of type Radio.""" from __future__ import annotations -from music_assistant.constants import EventType +from music_assistant.constants import EventType, MassEvent from music_assistant.helpers.json import json_serializer from music_assistant.helpers.util import create_sort_name, merge_dict, merge_list from music_assistant.models.media_controller import MediaControllerBase @@ -37,7 +37,9 @@ class RadioController(MediaControllerBase[Radio]): async def add(self, item: Radio) -> Radio: """Add radio to local db and return the new database item.""" db_item = await self.add_db_item(item) - self.mass.signal_event(EventType.RADIO_ADDED, db_item) + self.mass.signal_event( + MassEvent(EventType.RADIO_ADDED, object_id=db_item.uri, data=db_item) + ) return db_item async def add_db_item(self, radio: Radio) -> Radio: diff --git a/music_assistant/controllers/music/tracks.py b/music_assistant/controllers/music/tracks.py index c9d37e00..4330e383 100644 --- a/music_assistant/controllers/music/tracks.py +++ b/music_assistant/controllers/music/tracks.py @@ -4,7 +4,7 @@ from __future__ import annotations import asyncio from typing import List -from music_assistant.constants import EventType +from music_assistant.constants import EventType, MassEvent from music_assistant.helpers.compare import ( compare_artists, compare_strings, @@ -50,7 +50,9 @@ class TracksController(MediaControllerBase[Track]): # also fetch same track on all providers (will also get other quality versions) await self._match(db_item) db_item = await self.get_db_item(db_item.item_id) - self.mass.signal_event(EventType.TRACK_ADDED, db_item) + self.mass.signal_event( + MassEvent(EventType.TRACK_ADDED, object_id=db_item.uri, data=db_item) + ) return db_item async def versions(self, item_id: str, provider_id: str) -> List[Track]: diff --git a/music_assistant/controllers/players.py b/music_assistant/controllers/players.py index a7c77791..07b4f489 100755 --- a/music_assistant/controllers/players.py +++ b/music_assistant/controllers/players.py @@ -3,7 +3,7 @@ from __future__ import annotations from typing import Dict, Tuple, Union -from music_assistant.constants import EventType +from music_assistant.constants import EventType, MassEvent from music_assistant.helpers.typing import MusicAssistant from music_assistant.models.errors import AlreadyRegisteredError from music_assistant.models.player import Player, PlayerGroup @@ -37,6 +37,14 @@ class PlayerController: volume_normalization_target INTEGER)""" ) + async def cleanup(self) -> None: + """Cleanup on exit.""" + for player_id in set(self._players.keys()): + player = self._players.pop(player_id) + player.on_remove() + for queue_id in set(self._player_queues.keys()): + self._player_queues.pop(queue_id) + @property def players(self) -> Tuple[PlayerType]: """Return all available players.""" @@ -92,4 +100,6 @@ class PlayerController: player_id, player.name, ) - self.mass.signal_event(EventType.PLAYER_ADDED, player) + self.mass.signal_event( + MassEvent(EventType.PLAYER_ADDED, object_id=player.player_id, data=player) + ) diff --git a/music_assistant/controllers/stream.py b/music_assistant/controllers/stream.py index 7bf941e0..2f9897d8 100644 --- a/music_assistant/controllers/stream.py +++ b/music_assistant/controllers/stream.py @@ -8,7 +8,7 @@ from typing import AsyncGenerator, Dict, Optional, Set from aiohttp import web -from music_assistant.constants import EventType +from music_assistant.constants import EventType, MassEvent from music_assistant.helpers.audio import ( check_audio_support, crossfade_pcm_parts, @@ -64,7 +64,7 @@ class StreamController: http_site = web.TCPSite(runner, host=None, port=self._port) await http_site.start() - async def on_shutdown_event(*args, **kwargs): + async def on_shutdown_event(*event: MassEvent): """Handle shutdown event.""" await http_site.stop() await runner.cleanup() diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index 9438c70a..ca07938f 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -9,7 +9,7 @@ from typing import AsyncGenerator, List, Optional, Tuple import aiofiles -from music_assistant.constants import EventType +from music_assistant.constants import EventType, MassEvent from music_assistant.helpers.process import AsyncProcess, check_output from music_assistant.helpers.typing import MusicAssistant, QueueItem from music_assistant.helpers.util import create_tempfile @@ -419,7 +419,13 @@ async def get_media_stream( ) -> AsyncGenerator[Tuple[bool, bytes], None]: """Get the audio stream for the given streamdetails.""" - mass.signal_event(EventType.STREAM_STARTED, streamdetails) + mass.signal_event( + MassEvent( + EventType.STREAM_STARTED, + object_id=streamdetails.provider, + data=streamdetails, + ) + ) args = await get_sox_args(streamdetails, output_format, resample) async with AsyncProcess(args) as sox_proc: @@ -457,7 +463,13 @@ async def get_media_stream( streamdetails.item_id, streamdetails.provider ) finally: - mass.signal_event(EventType.STREAM_ENDED, streamdetails) + mass.signal_event( + MassEvent( + EventType.STREAM_ENDED, + object_id=streamdetails.provider, + data=streamdetails, + ) + ) # send analyze job to background worker if streamdetails.loudness is None: uri = f"{streamdetails.provider}://{streamdetails.media_type.value}/{streamdetails.item_id}" diff --git a/music_assistant/mass.py b/music_assistant/mass.py index d98342f9..599db0b2 100644 --- a/music_assistant/mass.py +++ b/music_assistant/mass.py @@ -12,7 +12,7 @@ from typing import Any, Callable, Coroutine, List, Optional, Tuple, Type, Union import aiohttp from databases import DatabaseURL -from music_assistant.constants import EventType +from music_assistant.constants import EventType, MassEvent from music_assistant.controllers.metadata import MetaDataController from music_assistant.controllers.music import MusicController from music_assistant.controllers.players import PlayerController @@ -20,8 +20,10 @@ from music_assistant.controllers.stream import StreamController from music_assistant.helpers.cache import Cache from music_assistant.helpers.database import Database -EventCallBackType = Callable[[EventType, Any], None] -EventSubscriptionType = Tuple[EventCallBackType, Optional[Tuple[EventType]]] +EventCallBackType = Callable[[MassEvent], None] +EventSubscriptionType = Tuple[ + EventCallBackType, Optional[Tuple[EventType]], Optional[Tuple[str]] +] class MusicAssistant: @@ -80,31 +82,31 @@ class MusicAssistant: async def stop(self) -> None: """Stop running the music assistant server.""" self.logger.info("Stop called, cleaning up...") + await self.players.cleanup() # cancel all running tasks for task in self._tracked_tasks: task.cancel() - self.signal_event(EventType.SHUTDOWN) + self.signal_event(MassEvent(EventType.SHUTDOWN)) self.closed = True if self.http_session and not self.http_session_provided: await self.http_session.close() - def signal_event(self, event_type: EventType, event_details: Any = None) -> None: - """ - Signal (systemwide) event. - - :param event_msg: the eventmessage to signal - :param event_details: optional details to send with the event. - """ + def signal_event(self, event: MassEvent) -> None: + """Signal event to subscribers.""" if self.closed: return - for cb_func, event_filter in self._listeners: - if event_filter is None or event_type in event_filter: - self.create_task(cb_func, event_type, event_details) + for cb_func, event_filter, id_filter in self._listeners: + if not (event_filter is None or event.type in event_filter): + continue + if not (id_filter is None or event.object_id in id_filter): + continue + self.create_task(cb_func, event) def subscribe( self, cb_func: EventCallBackType, event_filter: Union[EventType, Tuple[EventType], None] = None, + id_filter: Union[str, Tuple[str], None] = None, ) -> Callable: """ Add callback to event listeners. @@ -112,10 +114,13 @@ class MusicAssistant: Returns function to remove the listener. :param cb_func: callback function or coroutine :param event_filter: Optionally only listen for these events + :param id_filter: Optionally only listen for these id's (player_id, queue_id, uri) """ if isinstance(event_filter, EventType): event_filter = (event_filter,) - listener = (cb_func, event_filter) + if isinstance(id_filter, str): + id_filter = (id_filter,) + listener = (cb_func, event_filter, id_filter) self._listeners.append(listener) def remove_listener(): diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index 0095b6d5..d15d58d4 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, Any, Dict, List from mashumaro import DataClassDictMixin -from music_assistant.constants import EventType +from music_assistant.constants import EventType, MassEvent from music_assistant.helpers.typing import MusicAssistant from music_assistant.helpers.util import get_changed_keys @@ -199,10 +199,6 @@ class Player(ABC): """Toggle power on player.""" await self.power(not self.powered) - def on_update_state(self) -> None: - """Call when player state is about to be updated in the player manager.""" - # this is called from `update_state` to apply some additional custom logic - def on_child_update(self, player_id: str, changed_keys: set) -> None: """Call when one of the child players of a playergroup updates.""" self.update_state(skip_forward=True) @@ -211,6 +207,9 @@ class Player(ABC): """Call when (one of) the parent player(s) of a grouped player updates.""" self.update_state(skip_forward=True) + def on_remove(self) -> None: + """Call when player is about to be removed (cleaned up) from player manager.""" + # DO NOT OVERRIDE BELOW def update_state(self, skip_forward: bool = False) -> None: @@ -218,7 +217,6 @@ class Player(ABC): if self.mass is None or self.mass.closed: # guard return - self.on_update_state() self._attr_group_parents = self._get_attr_group_parents() # determine active queue for player self._attr_active_queue_id = self._get_active_queue_id() @@ -233,7 +231,10 @@ class Player(ABC): return self._prev_state = cur_state - self.mass.signal_event(EventType.PLAYER_CHANGED, self) + if changed_keys != {"elapsed_time"}: + self.mass.signal_event( + MassEvent(EventType.PLAYER_UPDATED, object_id=self.player_id, data=self) + ) if skip_forward: return diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 4ebd4b38..8c47ccf2 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -11,7 +11,7 @@ from uuid import uuid4 from mashumaro import DataClassDictMixin -from music_assistant.constants import EventType +from music_assistant.constants import EventType, MassEvent from music_assistant.helpers.audio import get_stream_details from music_assistant.helpers.typing import MusicAssistant from music_assistant.models.errors import MediaNotFoundError, QueueEmpty @@ -95,7 +95,9 @@ class PlayerQueue: async def setup(self) -> None: """Handle async setup of instance.""" await self._restore_saved_state() - self.mass.signal_event(EventType.QUEUE_ADDED, self) + self.mass.signal_event( + MassEvent(EventType.QUEUE_ADDED, object_id=self.queue_id, data=self) + ) @property def player(self) -> Player | PlayerGroup: @@ -277,7 +279,11 @@ class PlayerQueue: played_items = self.items[: self._current_index] next_items = self.__shuffle_items(self.items[self._current_index + 1 :]) items = played_items + [self.current_item] + next_items - self.mass.signal_event(EventType.QUEUE_UPDATED, self) + self.mass.signal_event( + MassEvent( + EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self + ) + ) await self.update(items) elif self._shuffle_enabled and not enable_shuffle: # unshuffle @@ -287,14 +293,20 @@ class PlayerQueue: next_items = self.items[self._current_index + 1 :] next_items.sort(key=lambda x: x.sort_index, reverse=False) items = played_items + [self.current_item] + next_items - self.mass.signal_event(EventType.QUEUE_UPDATED, self) + self.mass.signal_event( + MassEvent( + EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self + ) + ) await self.update(items) async def set_repeat_enabled(self, enable_repeat: bool) -> None: """Set the repeat mode for this queue.""" if self._repeat_enabled != enable_repeat: self._repeat_enabled = enable_repeat - self.mass.signal_event(EventType.QUEUE_UPDATED, self) + self.mass.signal_event( + MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self) + ) await self._save_state(False) async def set_crossfade_duration(self, duration: int) -> None: @@ -302,14 +314,18 @@ class PlayerQueue: duration = max(duration, 10) if self._crossfade_duration != duration: self._crossfade_duration = duration - self.mass.signal_event(EventType.QUEUE_UPDATED, self) + self.mass.signal_event( + MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self) + ) await self._save_state(False) async def set_volume_normalization_enabled(self, enable: bool) -> None: """Set volume normalization.""" if self._repeat_enabled != enable: self._repeat_enabled = enable - self.mass.signal_event(EventType.QUEUE_UPDATED, self) + self.mass.signal_event( + MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self) + ) await self._save_state(False) async def set_volume_normalization_target(self, target: int) -> None: @@ -318,7 +334,9 @@ class PlayerQueue: target = max(target, -40) if self._volume_normalization_target != target: self._volume_normalization_target = target - self.mass.signal_event(EventType.QUEUE_UPDATED, self) + self.mass.signal_event( + MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self) + ) await self._save_state(False) async def stop(self) -> None: @@ -427,7 +445,9 @@ class PlayerQueue: if self._shuffle_enabled and len(queue_items) > 5: queue_items = self.__shuffle_items(queue_items) self._items = queue_items - self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, self) + self.mass.signal_event( + MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self) + ) await self.play_index(0) await self._save_state() @@ -464,7 +484,9 @@ class PlayerQueue: if offset == 0: await self.play_index(insert_at_index) - self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, self) + self.mass.signal_event( + MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self) + ) await self._save_state() async def append(self, queue_items: List[QueueItem]) -> None: @@ -479,13 +501,17 @@ class PlayerQueue: await self.update(items) return self._items = self._items + queue_items - self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, self) + self.mass.signal_event( + MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self) + ) await self._save_state() async def update(self, queue_items: List[QueueItem]) -> None: """Update the existing queue items, mostly caused by reordering.""" self._items = queue_items - self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, self) + self.mass.signal_event( + MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self) + ) await self._save_state() async def clear(self) -> None: @@ -518,7 +544,9 @@ class PlayerQueue: if not self.update_state(): # fire event anyway when player updated. - self.mass.signal_event(EventType.QUEUE_UPDATED, self) + self.mass.signal_event( + MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self) + ) def update_state(self) -> bool: """Update queue details, called when player updates.""" @@ -556,7 +584,9 @@ class PlayerQueue: new_item_loaded or abs(prev_item_time - self._current_item_elapsed_time) >= 1 ): - self.mass.signal_event(EventType.QUEUE_UPDATED, self) + self.mass.signal_event( + MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self) + ) return True return False diff --git a/music_assistant/providers/qobuz.py b/music_assistant/providers/qobuz.py index aeadba29..29927424 100644 --- a/music_assistant/providers/qobuz.py +++ b/music_assistant/providers/qobuz.py @@ -10,7 +10,7 @@ from typing import List, Optional import aiohttp from asyncio_throttle import Throttler -from music_assistant.constants import EventType +from music_assistant.constants import EventType, MassEvent from music_assistant.helpers.app_vars import ( # pylint: disable=no-name-in-module get_app_var, ) @@ -59,7 +59,9 @@ class QobuzProvider(MusicProvider): raise LoginFailed(f"Login failed for user {self._username}") # subscribe to stream events so we can report playback to Qobuz self.mass.subscribe( - self.on_stream_event, (EventType.STREAM_STARTED, EventType.STREAM_ENDED) + self.on_stream_event, + (EventType.STREAM_STARTED, EventType.STREAM_ENDED), + id_filter=self.id, ) async def search( @@ -355,7 +357,7 @@ class QobuzProvider(MusicProvider): details=streamdata, # we need these details for reporting playback ) - async def on_stream_event(self, msg, msg_details): + async def on_stream_event(self, event: MassEvent): """ Received event from mass. @@ -366,12 +368,12 @@ class QobuzProvider(MusicProvider): # TODO: need to figure out if the streamed track is purchased by user # https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx # {"albums":{"total":0,"items":[]},"tracks":{"total":0,"items":[]},"user":{"id":xxxx,"login":"xxxxx"}} - if msg == EventType.STREAM_STARTED and msg_details.provider == self.id: + if event.type == EventType.STREAM_STARTED: # report streaming started to qobuz device_id = self.__user_auth_info["user"]["device"]["id"] credential_id = self.__user_auth_info["user"]["credential"]["id"] user_id = self.__user_auth_info["user"]["id"] - format_id = msg_details.details["format_id"] + format_id = event.data.details["format_id"] timestamp = int(time.time()) events = [ { @@ -379,7 +381,7 @@ class QobuzProvider(MusicProvider): "sample": False, "intent": "stream", "device_id": device_id, - "track_id": str(msg_details.item_id), + "track_id": str(event.data.item_id), "purchase": False, "date": timestamp, "credential_id": credential_id, @@ -389,13 +391,13 @@ class QobuzProvider(MusicProvider): } ] await self._post_data("track/reportStreamingStart", data=events) - elif msg == EventType.STREAM_ENDED and msg_details.provider == self.id: + elif event.type == EventType.STREAM_ENDED: # report streaming ended to qobuz user_id = self.__user_auth_info["user"]["id"] params = { "user_id": user_id, - "track_id": str(msg_details.item_id), - "duration": try_parse_int(msg_details.seconds_played), + "track_id": str(event.data.item_id), + "duration": try_parse_int(event.data.seconds_played), } await self._get_data("/track/reportStreamingEnd", params)