improve eventbus
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 10 Apr 2022 21:26:54 +0000 (23:26 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 10 Apr 2022 21:26:54 +0000 (23:26 +0200)
14 files changed:
music_assistant/constants.py
music_assistant/controllers/music/__init__.py
music_assistant/controllers/music/albums.py
music_assistant/controllers/music/artists.py
music_assistant/controllers/music/playlists.py
music_assistant/controllers/music/radio.py
music_assistant/controllers/music/tracks.py
music_assistant/controllers/players.py
music_assistant/controllers/stream.py
music_assistant/helpers/audio.py
music_assistant/mass.py
music_assistant/models/player.py
music_assistant/models/player_queue.py
music_assistant/providers/qobuz.py

index 8a054cf60be42ae5fdfdfe528a473cac6cecf3e0..af2a3d50de039d35dc9092c191da3d8a049b2a1b 100755 (executable)
@@ -1,6 +1,8 @@
 """All constants for Music Assistant."""
 
+from dataclasses import dataclass
 from enum import Enum
+from typing import Any, Optional
 
 
 class EventType(Enum):
@@ -8,7 +10,7 @@ class EventType(Enum):
 
     PLAYER_ADDED = "player added"
     PLAYER_REMOVED = "player removed"
-    PLAYER_CHANGED = "player changed"
+    PLAYER_UPDATED = "player updated"
     STREAM_STARTED = "streaming started"
     STREAM_ENDED = "streaming ended"
     CONFIG_CHANGED = "config changed"
@@ -26,6 +28,15 @@ class EventType(Enum):
     PROVIDER_REGISTERED = "PROVIDER_REGISTERED"
 
 
+@dataclass
+class MassEvent:
+    """Representation of an Event emitted in/by Music Assistant."""
+
+    type: EventType
+    object_id: Optional[str] = None  # player_id, queue_id or uri
+    data: Optional[Any] = None  # optional data (such as the object)
+
+
 # player attributes
 ATTR_PLAYER_ID = "player_id"
 ATTR_PROVIDER_ID = "provider_id"
index f5378034a713a5ab9b289e6152877665a29eb3a1..ae00a0960edf86f20d352bb01b9e14d1db557d58 100755 (executable)
@@ -5,7 +5,7 @@ import asyncio
 import statistics
 from typing import Dict, List, Tuple
 
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
 from music_assistant.controllers.music.albums import AlbumsController
 from music_assistant.controllers.music.artists import ArtistsController
 from music_assistant.controllers.music.playlists import PlaylistController
@@ -94,7 +94,11 @@ class MusicController:
             ) from err
         else:
             self._providers[provider.id] = provider
-            self.mass.signal_event(EventType.PROVIDER_REGISTERED, provider)
+            self.mass.signal_event(
+                MassEvent(
+                    EventType.PROVIDER_REGISTERED, object_id=provider.id, data=provider
+                )
+            )
             self.mass.create_task(self.run_provider_sync(provider.id))
 
     async def search(
index 6e0abe291caadfddb091ee9a6b30dd0c2d679702..0d5f33610d6129b3b91d7fe3824cc04b8be456dd 100644 (file)
@@ -4,7 +4,7 @@ from __future__ import annotations
 import asyncio
 from typing import List
 
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
 from music_assistant.helpers.cache import cached
 from music_assistant.helpers.compare import compare_album, compare_strings
 from music_assistant.helpers.json import json_serializer
@@ -92,7 +92,9 @@ class AlbumsController(MediaControllerBase[Album]):
         # also fetch same album on all providers
         await self._match(db_item)
         db_item = await self.get_db_item(db_item.item_id)
-        self.mass.signal_event(EventType.ALBUM_ADDED, db_item)
+        self.mass.signal_event(
+            MassEvent(EventType.ALBUM_ADDED, object_id=db_item.uri, data=db_item)
+        )
         return db_item
 
     async def get_provider_album_tracks(
index f03cf9574c51d6671cd38f985f1e9a0486294a9f..77a5bc377b533ae9d9e60bf7bc8eea9baba59ce1 100644 (file)
@@ -4,7 +4,7 @@ import asyncio
 import itertools
 from typing import List
 
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
 from music_assistant.helpers.cache import cached
 from music_assistant.helpers.compare import (
     compare_album,
@@ -88,7 +88,9 @@ class ArtistsController(MediaControllerBase[Artist]):
         # also fetch same artist on all providers
         await self.match_artist(db_item)
         db_item = await self.get_db_item(db_item.item_id)
-        self.mass.signal_event(EventType.ARTIST_ADDED, db_item)
+        self.mass.signal_event(
+            MassEvent(EventType.ARTIST_ADDED, object_id=db_item.uri, data=db_item)
+        )
         return db_item
 
     async def match_artist(self, db_artist: Artist):
index 9b1570520f04e170ad9dfeb4838caefee28f6604..190a6444ab1c78060d303c12d2fe56bead5b2335 100644 (file)
@@ -4,7 +4,7 @@ from __future__ import annotations
 import time
 from typing import List
 
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
 from music_assistant.helpers.cache import cached
 from music_assistant.helpers.json import json_serializer
 from music_assistant.helpers.util import create_sort_name, merge_dict, merge_list
@@ -68,7 +68,9 @@ class PlaylistController(MediaControllerBase[Playlist]):
     async def add(self, item: Playlist) -> Playlist:
         """Add playlist to local db and return the new database item."""
         db_item = await self.add_db_item(item)
-        self.mass.signal_event(EventType.PLAYLIST_ADDED, db_item)
+        self.mass.signal_event(
+            MassEvent(EventType.PLAYLIST_ADDED, object_id=db_item.uri, data=db_item)
+        )
         return db_item
 
     async def add_playlist_tracks(
index bdb8ddd11f968b8998f8bf7b0f1b30fcc00b3537..84b629673825fc1fd2ce2772338fd02828fc41eb 100644 (file)
@@ -1,7 +1,7 @@
 """Manage MediaItems of type Radio."""
 from __future__ import annotations
 
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
 from music_assistant.helpers.json import json_serializer
 from music_assistant.helpers.util import create_sort_name, merge_dict, merge_list
 from music_assistant.models.media_controller import MediaControllerBase
@@ -37,7 +37,9 @@ class RadioController(MediaControllerBase[Radio]):
     async def add(self, item: Radio) -> Radio:
         """Add radio to local db and return the new database item."""
         db_item = await self.add_db_item(item)
-        self.mass.signal_event(EventType.RADIO_ADDED, db_item)
+        self.mass.signal_event(
+            MassEvent(EventType.RADIO_ADDED, object_id=db_item.uri, data=db_item)
+        )
         return db_item
 
     async def add_db_item(self, radio: Radio) -> Radio:
index c9d37e0053e16b7d02ef32900c86a9333421ebc2..4330e383d1aab1004cfa1899716542d42e179b13 100644 (file)
@@ -4,7 +4,7 @@ from __future__ import annotations
 import asyncio
 from typing import List
 
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
 from music_assistant.helpers.compare import (
     compare_artists,
     compare_strings,
@@ -50,7 +50,9 @@ class TracksController(MediaControllerBase[Track]):
         # also fetch same track on all providers (will also get other quality versions)
         await self._match(db_item)
         db_item = await self.get_db_item(db_item.item_id)
-        self.mass.signal_event(EventType.TRACK_ADDED, db_item)
+        self.mass.signal_event(
+            MassEvent(EventType.TRACK_ADDED, object_id=db_item.uri, data=db_item)
+        )
         return db_item
 
     async def versions(self, item_id: str, provider_id: str) -> List[Track]:
index a7c777916a96c412e2353f2853ee5e7dc324e4fa..07b4f48976c69ebf6c7e8cd2a7e31a873e7dc834 100755 (executable)
@@ -3,7 +3,7 @@ from __future__ import annotations
 
 from typing import Dict, Tuple, Union
 
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
 from music_assistant.helpers.typing import MusicAssistant
 from music_assistant.models.errors import AlreadyRegisteredError
 from music_assistant.models.player import Player, PlayerGroup
@@ -37,6 +37,14 @@ class PlayerController:
                     volume_normalization_target INTEGER)"""
             )
 
+    async def cleanup(self) -> None:
+        """Cleanup on exit."""
+        for player_id in set(self._players.keys()):
+            player = self._players.pop(player_id)
+            player.on_remove()
+        for queue_id in set(self._player_queues.keys()):
+            self._player_queues.pop(queue_id)
+
     @property
     def players(self) -> Tuple[PlayerType]:
         """Return all available players."""
@@ -92,4 +100,6 @@ class PlayerController:
             player_id,
             player.name,
         )
-        self.mass.signal_event(EventType.PLAYER_ADDED, player)
+        self.mass.signal_event(
+            MassEvent(EventType.PLAYER_ADDED, object_id=player.player_id, data=player)
+        )
index 7bf941e05aad7cd814238dbdc4e165e13946a7d7..2f9897d8120236b6bbd1f3191451ebe35525eb3d 100644 (file)
@@ -8,7 +8,7 @@ from typing import AsyncGenerator, Dict, Optional, Set
 
 from aiohttp import web
 
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
 from music_assistant.helpers.audio import (
     check_audio_support,
     crossfade_pcm_parts,
@@ -64,7 +64,7 @@ class StreamController:
         http_site = web.TCPSite(runner, host=None, port=self._port)
         await http_site.start()
 
-        async def on_shutdown_event(*args, **kwargs):
+        async def on_shutdown_event(*event: MassEvent):
             """Handle shutdown event."""
             await http_site.stop()
             await runner.cleanup()
index 9438c70a096ceca5b5a7b21841f85e60c85864b8..ca07938f774f3f4287a355313f8ea46191f6dc3c 100644 (file)
@@ -9,7 +9,7 @@ from typing import AsyncGenerator, List, Optional, Tuple
 
 import aiofiles
 
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
 from music_assistant.helpers.process import AsyncProcess, check_output
 from music_assistant.helpers.typing import MusicAssistant, QueueItem
 from music_assistant.helpers.util import create_tempfile
@@ -419,7 +419,13 @@ async def get_media_stream(
 ) -> AsyncGenerator[Tuple[bool, bytes], None]:
     """Get the audio stream for the given streamdetails."""
 
-    mass.signal_event(EventType.STREAM_STARTED, streamdetails)
+    mass.signal_event(
+        MassEvent(
+            EventType.STREAM_STARTED,
+            object_id=streamdetails.provider,
+            data=streamdetails,
+        )
+    )
     args = await get_sox_args(streamdetails, output_format, resample)
     async with AsyncProcess(args) as sox_proc:
 
@@ -457,7 +463,13 @@ async def get_media_stream(
                 streamdetails.item_id, streamdetails.provider
             )
         finally:
-            mass.signal_event(EventType.STREAM_ENDED, streamdetails)
+            mass.signal_event(
+                MassEvent(
+                    EventType.STREAM_ENDED,
+                    object_id=streamdetails.provider,
+                    data=streamdetails,
+                )
+            )
             # send analyze job to background worker
             if streamdetails.loudness is None:
                 uri = f"{streamdetails.provider}://{streamdetails.media_type.value}/{streamdetails.item_id}"
index d98342f91024bb61c01322ea59f92576f407c31f..599db0b2be8c4c120cec426c431efb65513134e1 100644 (file)
@@ -12,7 +12,7 @@ from typing import Any, Callable, Coroutine, List, Optional, Tuple, Type, Union
 import aiohttp
 from databases import DatabaseURL
 
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
 from music_assistant.controllers.metadata import MetaDataController
 from music_assistant.controllers.music import MusicController
 from music_assistant.controllers.players import PlayerController
@@ -20,8 +20,10 @@ from music_assistant.controllers.stream import StreamController
 from music_assistant.helpers.cache import Cache
 from music_assistant.helpers.database import Database
 
-EventCallBackType = Callable[[EventType, Any], None]
-EventSubscriptionType = Tuple[EventCallBackType, Optional[Tuple[EventType]]]
+EventCallBackType = Callable[[MassEvent], None]
+EventSubscriptionType = Tuple[
+    EventCallBackType, Optional[Tuple[EventType]], Optional[Tuple[str]]
+]
 
 
 class MusicAssistant:
@@ -80,31 +82,31 @@ class MusicAssistant:
     async def stop(self) -> None:
         """Stop running the music assistant server."""
         self.logger.info("Stop called, cleaning up...")
+        await self.players.cleanup()
         # cancel all running tasks
         for task in self._tracked_tasks:
             task.cancel()
-        self.signal_event(EventType.SHUTDOWN)
+        self.signal_event(MassEvent(EventType.SHUTDOWN))
         self.closed = True
         if self.http_session and not self.http_session_provided:
             await self.http_session.close()
 
-    def signal_event(self, event_type: EventType, event_details: Any = None) -> None:
-        """
-        Signal (systemwide) event.
-
-            :param event_msg: the eventmessage to signal
-            :param event_details: optional details to send with the event.
-        """
+    def signal_event(self, event: MassEvent) -> None:
+        """Signal event to subscribers."""
         if self.closed:
             return
-        for cb_func, event_filter in self._listeners:
-            if event_filter is None or event_type in event_filter:
-                self.create_task(cb_func, event_type, event_details)
+        for cb_func, event_filter, id_filter in self._listeners:
+            if not (event_filter is None or event.type in event_filter):
+                continue
+            if not (id_filter is None or event.object_id in id_filter):
+                continue
+            self.create_task(cb_func, event)
 
     def subscribe(
         self,
         cb_func: EventCallBackType,
         event_filter: Union[EventType, Tuple[EventType], None] = None,
+        id_filter: Union[str, Tuple[str], None] = None,
     ) -> Callable:
         """
         Add callback to event listeners.
@@ -112,10 +114,13 @@ class MusicAssistant:
         Returns function to remove the listener.
             :param cb_func: callback function or coroutine
             :param event_filter: Optionally only listen for these events
+            :param id_filter: Optionally only listen for these id's (player_id, queue_id, uri)
         """
         if isinstance(event_filter, EventType):
             event_filter = (event_filter,)
-        listener = (cb_func, event_filter)
+        if isinstance(id_filter, str):
+            id_filter = (id_filter,)
+        listener = (cb_func, event_filter, id_filter)
         self._listeners.append(listener)
 
         def remove_listener():
index 0095b6d5a0e0384c242e515b1b9df10552a20d7b..d15d58d49dbc0c02f9829a297c704910a5aa20a2 100755 (executable)
@@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, Any, Dict, List
 
 from mashumaro import DataClassDictMixin
 
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
 from music_assistant.helpers.typing import MusicAssistant
 from music_assistant.helpers.util import get_changed_keys
 
@@ -199,10 +199,6 @@ class Player(ABC):
         """Toggle power on player."""
         await self.power(not self.powered)
 
-    def on_update_state(self) -> None:
-        """Call when player state is about to be updated in the player manager."""
-        # this is called from `update_state` to apply some additional custom logic
-
     def on_child_update(self, player_id: str, changed_keys: set) -> None:
         """Call when one of the child players of a playergroup updates."""
         self.update_state(skip_forward=True)
@@ -211,6 +207,9 @@ class Player(ABC):
         """Call when (one of) the parent player(s) of a grouped player updates."""
         self.update_state(skip_forward=True)
 
+    def on_remove(self) -> None:
+        """Call when player is about to be removed (cleaned up) from player manager."""
+
     # DO NOT OVERRIDE BELOW
 
     def update_state(self, skip_forward: bool = False) -> None:
@@ -218,7 +217,6 @@ class Player(ABC):
         if self.mass is None or self.mass.closed:
             # guard
             return
-        self.on_update_state()
         self._attr_group_parents = self._get_attr_group_parents()
         # determine active queue for player
         self._attr_active_queue_id = self._get_active_queue_id()
@@ -233,7 +231,10 @@ class Player(ABC):
             return
 
         self._prev_state = cur_state
-        self.mass.signal_event(EventType.PLAYER_CHANGED, self)
+        if changed_keys != {"elapsed_time"}:
+            self.mass.signal_event(
+                MassEvent(EventType.PLAYER_UPDATED, object_id=self.player_id, data=self)
+            )
 
         if skip_forward:
             return
index 4ebd4b388b56f0cf2b9ebd8fbccacba9ae5eaaa3..8c47ccf2c04f8322a7b41b27e646ff2da11eefa8 100644 (file)
@@ -11,7 +11,7 @@ from uuid import uuid4
 
 from mashumaro import DataClassDictMixin
 
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
 from music_assistant.helpers.audio import get_stream_details
 from music_assistant.helpers.typing import MusicAssistant
 from music_assistant.models.errors import MediaNotFoundError, QueueEmpty
@@ -95,7 +95,9 @@ class PlayerQueue:
     async def setup(self) -> None:
         """Handle async setup of instance."""
         await self._restore_saved_state()
-        self.mass.signal_event(EventType.QUEUE_ADDED, self)
+        self.mass.signal_event(
+            MassEvent(EventType.QUEUE_ADDED, object_id=self.queue_id, data=self)
+        )
 
     @property
     def player(self) -> Player | PlayerGroup:
@@ -277,7 +279,11 @@ class PlayerQueue:
                 played_items = self.items[: self._current_index]
                 next_items = self.__shuffle_items(self.items[self._current_index + 1 :])
                 items = played_items + [self.current_item] + next_items
-                self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+                self.mass.signal_event(
+                    MassEvent(
+                        EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self
+                    )
+                )
                 await self.update(items)
         elif self._shuffle_enabled and not enable_shuffle:
             # unshuffle
@@ -287,14 +293,20 @@ class PlayerQueue:
                 next_items = self.items[self._current_index + 1 :]
                 next_items.sort(key=lambda x: x.sort_index, reverse=False)
                 items = played_items + [self.current_item] + next_items
-                self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+                self.mass.signal_event(
+                    MassEvent(
+                        EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self
+                    )
+                )
                 await self.update(items)
 
     async def set_repeat_enabled(self, enable_repeat: bool) -> None:
         """Set the repeat mode for this queue."""
         if self._repeat_enabled != enable_repeat:
             self._repeat_enabled = enable_repeat
-            self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+            self.mass.signal_event(
+                MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
+            )
             await self._save_state(False)
 
     async def set_crossfade_duration(self, duration: int) -> None:
@@ -302,14 +314,18 @@ class PlayerQueue:
         duration = max(duration, 10)
         if self._crossfade_duration != duration:
             self._crossfade_duration = duration
-            self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+            self.mass.signal_event(
+                MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
+            )
             await self._save_state(False)
 
     async def set_volume_normalization_enabled(self, enable: bool) -> None:
         """Set volume normalization."""
         if self._repeat_enabled != enable:
             self._repeat_enabled = enable
-            self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+            self.mass.signal_event(
+                MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
+            )
             await self._save_state(False)
 
     async def set_volume_normalization_target(self, target: int) -> None:
@@ -318,7 +334,9 @@ class PlayerQueue:
         target = max(target, -40)
         if self._volume_normalization_target != target:
             self._volume_normalization_target = target
-            self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+            self.mass.signal_event(
+                MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
+            )
             await self._save_state(False)
 
     async def stop(self) -> None:
@@ -427,7 +445,9 @@ class PlayerQueue:
         if self._shuffle_enabled and len(queue_items) > 5:
             queue_items = self.__shuffle_items(queue_items)
         self._items = queue_items
-        self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, self)
+        self.mass.signal_event(
+            MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self)
+        )
         await self.play_index(0)
         await self._save_state()
 
@@ -464,7 +484,9 @@ class PlayerQueue:
         if offset == 0:
             await self.play_index(insert_at_index)
 
-        self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, self)
+        self.mass.signal_event(
+            MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self)
+        )
         await self._save_state()
 
     async def append(self, queue_items: List[QueueItem]) -> None:
@@ -479,13 +501,17 @@ class PlayerQueue:
             await self.update(items)
             return
         self._items = self._items + queue_items
-        self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, self)
+        self.mass.signal_event(
+            MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self)
+        )
         await self._save_state()
 
     async def update(self, queue_items: List[QueueItem]) -> None:
         """Update the existing queue items, mostly caused by reordering."""
         self._items = queue_items
-        self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, self)
+        self.mass.signal_event(
+            MassEvent(EventType.QUEUE_ITEMS_UPDATED, object_id=self.queue_id, data=self)
+        )
         await self._save_state()
 
     async def clear(self) -> None:
@@ -518,7 +544,9 @@ class PlayerQueue:
 
         if not self.update_state():
             # fire event anyway when player updated.
-            self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+            self.mass.signal_event(
+                MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
+            )
 
     def update_state(self) -> bool:
         """Update queue details, called when player updates."""
@@ -556,7 +584,9 @@ class PlayerQueue:
             new_item_loaded
             or abs(prev_item_time - self._current_item_elapsed_time) >= 1
         ):
-            self.mass.signal_event(EventType.QUEUE_UPDATED, self)
+            self.mass.signal_event(
+                MassEvent(EventType.QUEUE_UPDATED, object_id=self.queue_id, data=self)
+            )
             return True
         return False
 
index aeadba29b5b86c868cf9e588fc71ea1920ed66a1..29927424221621d468ec42582453faeb57a0c7a6 100644 (file)
@@ -10,7 +10,7 @@ from typing import List, Optional
 import aiohttp
 from asyncio_throttle import Throttler
 
-from music_assistant.constants import EventType
+from music_assistant.constants import EventType, MassEvent
 from music_assistant.helpers.app_vars import (  # pylint: disable=no-name-in-module
     get_app_var,
 )
@@ -59,7 +59,9 @@ class QobuzProvider(MusicProvider):
             raise LoginFailed(f"Login failed for user {self._username}")
         # subscribe to stream events so we can report playback to Qobuz
         self.mass.subscribe(
-            self.on_stream_event, (EventType.STREAM_STARTED, EventType.STREAM_ENDED)
+            self.on_stream_event,
+            (EventType.STREAM_STARTED, EventType.STREAM_ENDED),
+            id_filter=self.id,
         )
 
     async def search(
@@ -355,7 +357,7 @@ class QobuzProvider(MusicProvider):
             details=streamdata,  # we need these details for reporting playback
         )
 
-    async def on_stream_event(self, msg, msg_details):
+    async def on_stream_event(self, event: MassEvent):
         """
         Received event from mass.
 
@@ -366,12 +368,12 @@ class QobuzProvider(MusicProvider):
         # TODO: need to figure out if the streamed track is purchased by user
         # https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx
         # {"albums":{"total":0,"items":[]},"tracks":{"total":0,"items":[]},"user":{"id":xxxx,"login":"xxxxx"}}
-        if msg == EventType.STREAM_STARTED and msg_details.provider == self.id:
+        if event.type == EventType.STREAM_STARTED:
             # report streaming started to qobuz
             device_id = self.__user_auth_info["user"]["device"]["id"]
             credential_id = self.__user_auth_info["user"]["credential"]["id"]
             user_id = self.__user_auth_info["user"]["id"]
-            format_id = msg_details.details["format_id"]
+            format_id = event.data.details["format_id"]
             timestamp = int(time.time())
             events = [
                 {
@@ -379,7 +381,7 @@ class QobuzProvider(MusicProvider):
                     "sample": False,
                     "intent": "stream",
                     "device_id": device_id,
-                    "track_id": str(msg_details.item_id),
+                    "track_id": str(event.data.item_id),
                     "purchase": False,
                     "date": timestamp,
                     "credential_id": credential_id,
@@ -389,13 +391,13 @@ class QobuzProvider(MusicProvider):
                 }
             ]
             await self._post_data("track/reportStreamingStart", data=events)
-        elif msg == EventType.STREAM_ENDED and msg_details.provider == self.id:
+        elif event.type == EventType.STREAM_ENDED:
             # report streaming ended to qobuz
             user_id = self.__user_auth_info["user"]["id"]
             params = {
                 "user_id": user_id,
-                "track_id": str(msg_details.item_id),
-                "duration": try_parse_int(msg_details.seconds_played),
+                "track_id": str(event.data.item_id),
+                "duration": try_parse_int(event.data.seconds_played),
             }
             await self._get_data("/track/reportStreamingEnd", params)