"""All constants for Music Assistant."""
+from dataclasses import dataclass
from enum import Enum
+from typing import Any, Optional
class EventType(Enum):
PLAYER_ADDED = "player added"
PLAYER_REMOVED = "player removed"
- PLAYER_CHANGED = "player changed"
+ PLAYER_UPDATED = "player updated"
STREAM_STARTED = "streaming started"
STREAM_ENDED = "streaming ended"
CONFIG_CHANGED = "config changed"
PROVIDER_REGISTERED = "PROVIDER_REGISTERED"
+@dataclass
+class MassEvent:
+ """Representation of an Event emitted in/by Music Assistant."""
+
+ type: EventType
+ object_id: Optional[str] = None # player_id, queue_id or uri
+ data: Optional[Any] = None # optional data (such as the object)
+
+
# player attributes
ATTR_PLAYER_ID = "player_id"
ATTR_PROVIDER_ID = "provider_id"
import statistics
from typing import Dict, List, Tuple
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
from music_assistant.controllers.music.albums import AlbumsController
from music_assistant.controllers.music.artists import ArtistsController
from music_assistant.controllers.music.playlists import PlaylistController
) from err
else:
self._providers[provider.id] = provider
- self.mass.signal_event(EventType.PROVIDER_REGISTERED, provider)
+ self.mass.signal_event(
+ MassEvent(
+ EventType.PROVIDER_REGISTERED, object_id=provider.id, data=provider
+ )
+ )
self.mass.create_task(self.run_provider_sync(provider.id))
async def search(
import asyncio
from typing import List
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
from music_assistant.helpers.cache import cached
from music_assistant.helpers.compare import compare_album, compare_strings
from music_assistant.helpers.json import json_serializer
# also fetch same album on all providers
await self._match(db_item)
db_item = await self.get_db_item(db_item.item_id)
- self.mass.signal_event(EventType.ALBUM_ADDED, db_item)
+ self.mass.signal_event(
+ MassEvent(EventType.ALBUM_ADDED, object_id=db_item.uri, data=db_item)
+ )
return db_item
async def get_provider_album_tracks(
import itertools
from typing import List
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
from music_assistant.helpers.cache import cached
from music_assistant.helpers.compare import (
compare_album,
# also fetch same artist on all providers
await self.match_artist(db_item)
db_item = await self.get_db_item(db_item.item_id)
- self.mass.signal_event(EventType.ARTIST_ADDED, db_item)
+ self.mass.signal_event(
+ MassEvent(EventType.ARTIST_ADDED, object_id=db_item.uri, data=db_item)
+ )
return db_item
async def match_artist(self, db_artist: Artist):
import time
from typing import List
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
from music_assistant.helpers.cache import cached
from music_assistant.helpers.json import json_serializer
from music_assistant.helpers.util import create_sort_name, merge_dict, merge_list
async def add(self, item: Playlist) -> Playlist:
"""Add playlist to local db and return the new database item."""
db_item = await self.add_db_item(item)
- self.mass.signal_event(EventType.PLAYLIST_ADDED, db_item)
+ self.mass.signal_event(
+ MassEvent(EventType.PLAYLIST_ADDED, object_id=db_item.uri, data=db_item)
+ )
return db_item
async def add_playlist_tracks(
"""Manage MediaItems of type Radio."""
from __future__ import annotations
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
from music_assistant.helpers.json import json_serializer
from music_assistant.helpers.util import create_sort_name, merge_dict, merge_list
from music_assistant.models.media_controller import MediaControllerBase
async def add(self, item: Radio) -> Radio:
"""Add radio to local db and return the new database item."""
db_item = await self.add_db_item(item)
- self.mass.signal_event(EventType.RADIO_ADDED, db_item)
+ self.mass.signal_event(
+ MassEvent(EventType.RADIO_ADDED, object_id=db_item.uri, data=db_item)
+ )
return db_item
async def add_db_item(self, radio: Radio) -> Radio:
import asyncio
from typing import List
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
from music_assistant.helpers.compare import (
compare_artists,
compare_strings,
# also fetch same track on all providers (will also get other quality versions)
await self._match(db_item)
db_item = await self.get_db_item(db_item.item_id)
- self.mass.signal_event(EventType.TRACK_ADDED, db_item)
+ self.mass.signal_event(
+ MassEvent(EventType.TRACK_ADDED, object_id=db_item.uri, data=db_item)
+ )
return db_item
async def versions(self, item_id: str, provider_id: str) -> List[Track]:
from typing import Dict, Tuple, Union
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
from music_assistant.helpers.typing import MusicAssistant
from music_assistant.models.errors import AlreadyRegisteredError
from music_assistant.models.player import Player, PlayerGroup
volume_normalization_target INTEGER)"""
)
+ async def cleanup(self) -> None:
+ """Cleanup on exit."""
+ for player_id in set(self._players.keys()):
+ player = self._players.pop(player_id)
+ player.on_remove()
+ for queue_id in set(self._player_queues.keys()):
+ self._player_queues.pop(queue_id)
+
@property
def players(self) -> Tuple[PlayerType]:
"""Return all available players."""
player_id,
player.name,
)
- self.mass.signal_event(EventType.PLAYER_ADDED, player)
+ self.mass.signal_event(
+ MassEvent(EventType.PLAYER_ADDED, object_id=player.player_id, data=player)
+ )
from aiohttp import web
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
from music_assistant.helpers.audio import (
check_audio_support,
crossfade_pcm_parts,
http_site = web.TCPSite(runner, host=None, port=self._port)
await http_site.start()
- async def on_shutdown_event(*args, **kwargs):
+ async def on_shutdown_event(*event: MassEvent):
"""Handle shutdown event."""
await http_site.stop()
await runner.cleanup()
import aiofiles
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
from music_assistant.helpers.process import AsyncProcess, check_output
from music_assistant.helpers.typing import MusicAssistant, QueueItem
from music_assistant.helpers.util import create_tempfile
) -> AsyncGenerator[Tuple[bool, bytes], None]:
"""Get the audio stream for the given streamdetails."""
- mass.signal_event(EventType.STREAM_STARTED, streamdetails)
+ mass.signal_event(
+ MassEvent(
+ EventType.STREAM_STARTED,
+ object_id=streamdetails.provider,
+ data=streamdetails,
+ )
+ )
args = await get_sox_args(streamdetails, output_format, resample)
async with AsyncProcess(args) as sox_proc:
streamdetails.item_id, streamdetails.provider
)
finally:
- mass.signal_event(EventType.STREAM_ENDED, streamdetails)
+ mass.signal_event(
+ MassEvent(
+ EventType.STREAM_ENDED,
+ object_id=streamdetails.provider,
+ data=streamdetails,
+ )
+ )
# send analyze job to background worker
if streamdetails.loudness is None:
uri = f"{streamdetails.provider}://{streamdetails.media_type.value}/{streamdetails.item_id}"
import aiohttp
from databases import DatabaseURL
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
from music_assistant.controllers.metadata import MetaDataController
from music_assistant.controllers.music import MusicController
from music_assistant.controllers.players import PlayerController
from music_assistant.helpers.cache import Cache
from music_assistant.helpers.database import Database
-EventCallBackType = Callable[[EventType, Any], None]
-EventSubscriptionType = Tuple[EventCallBackType, Optional[Tuple[EventType]]]
+EventCallBackType = Callable[[MassEvent], None]
+EventSubscriptionType = Tuple[
+ EventCallBackType, Optional[Tuple[EventType]], Optional[Tuple[str]]
+]
class MusicAssistant:
async def stop(self) -> None:
"""Stop running the music assistant server."""
self.logger.info("Stop called, cleaning up...")
+ await self.players.cleanup()
# cancel all running tasks
for task in self._tracked_tasks:
task.cancel()
- self.signal_event(EventType.SHUTDOWN)
+ self.signal_event(MassEvent(EventType.SHUTDOWN))
self.closed = True
if self.http_session and not self.http_session_provided:
await self.http_session.close()
- def signal_event(self, event_type: EventType, event_details: Any = None) -> None:
- """
- Signal (systemwide) event.
-
- :param event_msg: the eventmessage to signal
- :param event_details: optional details to send with the event.
- """
+ def signal_event(self, event: MassEvent) -> None:
+ """Signal event to subscribers."""
if self.closed:
return
- for cb_func, event_filter in self._listeners:
- if event_filter is None or event_type in event_filter:
- self.create_task(cb_func, event_type, event_details)
+ for cb_func, event_filter, id_filter in self._listeners:
+ if not (event_filter is None or event.type in event_filter):
+ continue
+ if not (id_filter is None or event.object_id in id_filter):
+ continue
+ self.create_task(cb_func, event)
def subscribe(
self,
cb_func: EventCallBackType,
event_filter: Union[EventType, Tuple[EventType], None] = None,
+ id_filter: Union[str, Tuple[str], None] = None,
) -> Callable:
"""
Add callback to event listeners.
Returns function to remove the listener.
:param cb_func: callback function or coroutine
:param event_filter: Optionally only listen for these events
+ :param id_filter: Optionally only listen for these id's (player_id, queue_id, uri)
"""
if isinstance(event_filter, EventType):
event_filter = (event_filter,)
- listener = (cb_func, event_filter)
+ if isinstance(id_filter, str):
+ id_filter = (id_filter,)
+ listener = (cb_func, event_filter, id_filter)
self._listeners.append(listener)
def remove_listener():
from mashumaro import DataClassDictMixin
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
from music_assistant.helpers.typing import MusicAssistant
from music_assistant.helpers.util import get_changed_keys
"""Toggle power on player."""
await self.power(not self.powered)
- def on_update_state(self) -> None:
- """Call when player state is about to be updated in the player manager."""
- # this is called from `update_state` to apply some additional custom logic
-
def on_child_update(self, player_id: str, changed_keys: set) -> None:
"""Call when one of the child players of a playergroup updates."""
self.update_state(skip_forward=True)
"""Call when (one of) the parent player(s) of a grouped player updates."""
self.update_state(skip_forward=True)
+ def on_remove(self) -> None:
+ """Call when player is about to be removed (cleaned up) from player manager."""
+
# DO NOT OVERRIDE BELOW
def update_state(self, skip_forward: bool = False) -> None:
if self.mass is None or self.mass.closed:
# guard
return
- self.on_update_state()
self._attr_group_parents = self._get_attr_group_parents()
# determine active queue for player
self._attr_active_queue_id = self._get_active_queue_id()
return
self._prev_state = cur_state
- self.mass.signal_event(EventType.PLAYER_CHANGED, self)
+ if changed_keys != {"elapsed_time"}:
+ self.mass.signal_event(
+ MassEvent(EventType.PLAYER_UPDATED, object_id=self.player_id, data=self)
+ )
if skip_forward:
return
from mashumaro import DataClassDictMixin
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
from music_assistant.helpers.audio import get_stream_details
from music_assistant.helpers.typing import MusicAssistant
from music_assistant.models.errors import MediaNotFoundError, QueueEmpty
async def setup(self) -> None:
"""Handle async setup of instance."""
await self._restore_saved_state()
- self.mass.signal_event(EventType.QUEUE_ADDED, self)
+ self.mass.signal_event(
+ MassEvent(EventType.QUEUE_ADDED, object_id=self.queue_id, data=self)
+ )
@property
def player(self) -> Player | PlayerGroup:
played_items = self.items[: self._current_index]
next_items = self.__shuffle_items(self.items[self._current_index + 1 :])
items = played_items + [self.current_item] + next_items
- self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+ self.mass.signal_event(
+ MassEvent(
+ EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self
+ )
+ )
await self.update(items)
elif self._shuffle_enabled and not enable_shuffle:
# unshuffle
next_items = self.items[self._current_index + 1 :]
next_items.sort(key=lambda x: x.sort_index, reverse=False)
items = played_items + [self.current_item] + next_items
- self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+ self.mass.signal_event(
+ MassEvent(
+ EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self
+ )
+ )
await self.update(items)
async def set_repeat_enabled(self, enable_repeat: bool) -> None:
"""Set the repeat mode for this queue."""
if self._repeat_enabled != enable_repeat:
self._repeat_enabled = enable_repeat
- self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+ self.mass.signal_event(
+ MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
+ )
await self._save_state(False)
async def set_crossfade_duration(self, duration: int) -> None:
duration = max(duration, 10)
if self._crossfade_duration != duration:
self._crossfade_duration = duration
- self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+ self.mass.signal_event(
+ MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
+ )
await self._save_state(False)
async def set_volume_normalization_enabled(self, enable: bool) -> None:
"""Set volume normalization."""
if self._repeat_enabled != enable:
self._repeat_enabled = enable
- self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+ self.mass.signal_event(
+ MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
+ )
await self._save_state(False)
async def set_volume_normalization_target(self, target: int) -> None:
target = max(target, -40)
if self._volume_normalization_target != target:
self._volume_normalization_target = target
- self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+ self.mass.signal_event(
+ MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
+ )
await self._save_state(False)
async def stop(self) -> None:
if self._shuffle_enabled and len(queue_items) > 5:
queue_items = self.__shuffle_items(queue_items)
self._items = queue_items
- self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, self)
+ self.mass.signal_event(
+ MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self)
+ )
await self.play_index(0)
await self._save_state()
if offset == 0:
await self.play_index(insert_at_index)
- self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, self)
+ self.mass.signal_event(
+ MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self)
+ )
await self._save_state()
async def append(self, queue_items: List[QueueItem]) -> None:
await self.update(items)
return
self._items = self._items + queue_items
- self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, self)
+ self.mass.signal_event(
+ MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self)
+ )
await self._save_state()
async def update(self, queue_items: List[QueueItem]) -> None:
"""Update the existing queue items, mostly caused by reordering."""
self._items = queue_items
- self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, self)
+ self.mass.signal_event(
+ MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self)
+ )
await self._save_state()
async def clear(self) -> None:
if not self.update_state():
# fire event anyway when player updated.
- self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+ self.mass.signal_event(
+ MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
+ )
def update_state(self) -> bool:
"""Update queue details, called when player updates."""
new_item_loaded
or abs(prev_item_time - self._current_item_elapsed_time) >= 1
):
- self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+ self.mass.signal_event(
+ MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
+ )
return True
return False
import aiohttp
from asyncio_throttle import Throttler
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
from music_assistant.helpers.app_vars import ( # pylint: disable=no-name-in-module
get_app_var,
)
raise LoginFailed(f"Login failed for user {self._username}")
# subscribe to stream events so we can report playback to Qobuz
self.mass.subscribe(
- self.on_stream_event, (EventType.STREAM_STARTED, EventType.STREAM_ENDED)
+ self.on_stream_event,
+ (EventType.STREAM_STARTED, EventType.STREAM_ENDED),
+ id_filter=self.id,
)
async def search(
details=streamdata, # we need these details for reporting playback
)
- async def on_stream_event(self, msg, msg_details):
+ async def on_stream_event(self, event: MassEvent):
"""
Received event from mass.
# TODO: need to figure out if the streamed track is purchased by user
# https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx
# {"albums":{"total":0,"items":[]},"tracks":{"total":0,"items":[]},"user":{"id":xxxx,"login":"xxxxx"}}
- if msg == EventType.STREAM_STARTED and msg_details.provider == self.id:
+ if event.type == EventType.STREAM_STARTED:
# report streaming started to qobuz
device_id = self.__user_auth_info["user"]["device"]["id"]
credential_id = self.__user_auth_info["user"]["credential"]["id"]
user_id = self.__user_auth_info["user"]["id"]
- format_id = msg_details.details["format_id"]
+ format_id = event.data.details["format_id"]
timestamp = int(time.time())
events = [
{
"sample": False,
"intent": "stream",
"device_id": device_id,
- "track_id": str(msg_details.item_id),
+ "track_id": str(event.data.item_id),
"purchase": False,
"date": timestamp,
"credential_id": credential_id,
}
]
await self._post_data("track/reportStreamingStart", data=events)
- elif msg == EventType.STREAM_ENDED and msg_details.provider == self.id:
+ elif event.type == EventType.STREAM_ENDED:
# report streaming ended to qobuz
user_id = self.__user_auth_info["user"]["id"]
params = {
"user_id": user_id,
- "track_id": str(msg_details.item_id),
- "duration": try_parse_int(msg_details.seconds_played),
+ "track_id": str(event.data.item_id),
+ "duration": try_parse_int(event.data.seconds_played),
}
await self._get_data("/track/reportStreamingEnd", params)