-"""Various server-seecific utils/helpers."""
+"""Various server-specific utils/helpers."""
--- /dev/null
+"""Helper class to aid scrobblers."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from music_assistant_models.event import MassEvent
+ from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
+
+
+class ScrobblerHelper:
+ """Base class to aid scrobbling tracks."""
+
+ logger: logging.Logger
+ currently_playing: str | None = None
+ last_scrobbled: str | None = None
+
+ def __init__(self, logger: logging.Logger) -> None:
+ """Initialize."""
+ self.logger = logger
+
+ def _is_configured(self) -> bool:
+ """Override if subclass needs specific configuration."""
+ return True
+
+ def _update_now_playing(self, report: MediaItemPlaybackProgressReport) -> None:
+ """Send a Now Playing update to the scrobbling service."""
+
+ def _scrobble(self, report: MediaItemPlaybackProgressReport) -> None:
+ """Scrobble."""
+
+ async def _on_mass_media_item_played(self, event: MassEvent) -> None:
+ """Media item has finished playing, we'll scrobble the track."""
+ if not self._is_configured():
+ return
+
+ report: MediaItemPlaybackProgressReport = event.data
+
+ # poor mans attempt to detect a song on loop
+ if not report.fully_played and report.uri == self.last_scrobbled:
+ self.logger.debug(
+ "reset _last_scrobbled and _currently_playing because the song was restarted"
+ )
+ self.last_scrobbled = None
+ # reset currently playing to avoid it expiring when looping single songs
+ self.currently_playing = None
+
+ def update_now_playing() -> None:
+ try:
+ self._update_now_playing(report)
+ self.logger.debug(f"track {report.uri} marked as 'now playing'")
+ self.currently_playing = report.uri
+ except Exception as err:
+ self.logger.exception(err)
+
+ def scrobble() -> None:
+ try:
+ self._scrobble(report)
+ self.last_scrobbled = report.uri
+ except Exception as err:
+ self.logger.exception(err)
+
+ # update now playing if needed
+ if report.is_playing and (
+ self.currently_playing is None or self.currently_playing != report.uri
+ ):
+ await asyncio.to_thread(update_now_playing)
+
+ if self.should_scrobble(report):
+ await asyncio.to_thread(scrobble)
+
+ def should_scrobble(self, report: MediaItemPlaybackProgressReport) -> bool:
+ """Determine if a track should be scrobbled, to be extended later."""
+ if self.last_scrobbled == report.uri:
+ self.logger.debug("skipped scrobbling due to duplicate event")
+ return False
+
+ # ideally we want more precise control
+ # but because the event is triggered every 30s
+ # and we don't have full queue details to determine
+ # the exact context in which the event was fired
+ # we can only rely on fully_played for now
+ return bool(report.fully_played)
"""Allows scrobbling of tracks with the help of PyLast."""
-import asyncio
import logging
import time
from collections.abc import Callable
from music_assistant_models.constants import SECURE_STRING_SUBSTITUTE
from music_assistant_models.enums import ConfigEntryType, EventType
from music_assistant_models.errors import LoginFailed, SetupFailedError
-from music_assistant_models.event import MassEvent
from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
from music_assistant_models.provider import ProviderManifest
from music_assistant.constants import MASS_LOGGER_NAME
from music_assistant.helpers.auth import AuthenticationHelper
+from music_assistant.helpers.scrobbler import ScrobblerHelper
from music_assistant.mass import MusicAssistant
from music_assistant.models import ProviderInstanceType
from music_assistant.models.plugin import PluginProvider
unload_cb()
-class LastFMEventHandler:
+class LastFMEventHandler(ScrobblerHelper):
"""Handles the event handling."""
- logger: logging.Logger
network: pylast._Network
- currently_playing: str | None = None
- last_scrobbled: str | None = None
def __init__(self, network: pylast._Network, logger: logging.Logger) -> None:
"""Initialize."""
+ super().__init__(logger)
self.network = network
- self.logger = logger
- async def _on_mass_media_item_played(self, event: MassEvent) -> None:
- """Media item has finished playing, we'll scrobble the track."""
+ def _is_configured(self) -> bool:
if self.network is None:
self.logger.error("no network available during _on_mass_media_item_played")
- return
+ return False
- report: MediaItemPlaybackProgressReport = event.data
+ return True
- # poor mans attempt to detect a song on loop
- if not report.fully_played and report.uri == self.last_scrobbled:
- self.logger.debug(
- "reset _last_scrobbled and _currently_playing because the song was restarted"
- )
- self.last_scrobbled = None
- # reset currently playing to avoid it expiring when looping single songs
- self.currently_playing = None
-
- def update_now_playing() -> None:
- try:
- self.network.update_now_playing(
- report.artist,
- report.name,
- report.album,
- duration=report.duration,
- mbid=report.mbid,
- )
- self.logger.debug(f"track {report.uri} marked as 'now playing'")
- self.currently_playing = report.uri
- except Exception as err:
- self.logger.exception(err)
-
- def scrobble() -> None:
- try:
- # album artist and track number are not available without an extra API call
- # so they won't be scrobbled
- self.network.scrobble(
- report.artist,
- report.name,
- time.time(),
- report.album,
- duration=report.duration,
- mbid=report.mbid,
- )
- self.last_scrobbled = report.uri
- except Exception as err:
- self.logger.exception(err)
-
- # update now playing if needed
- if report.is_playing and (
- self.currently_playing is None or self.currently_playing != report.uri
- ):
- await asyncio.to_thread(update_now_playing)
-
- if self.should_scrobble(report):
- await asyncio.to_thread(scrobble)
-
- def should_scrobble(self, report: MediaItemPlaybackProgressReport) -> bool:
- """Determine if a track should be scrobbled, to be extended later."""
- if self.last_scrobbled == report.uri:
- self.logger.debug("skipped scrobbling due to duplicate event")
- return False
+ def _update_now_playing(self, report: MediaItemPlaybackProgressReport) -> None:
+ self.network.update_now_playing(
+ report.artist,
+ report.name,
+ report.album,
+ duration=report.duration,
+ mbid=report.mbid,
+ )
- # ideally we want more precise control
- # but because the event is triggered every 30s
- # and we don't have full queue details to determine
- # the exact context in which the event was fired
- # we can only rely on fully_played for now
- return bool(report.fully_played)
+ def _scrobble(self, report: MediaItemPlaybackProgressReport) -> None:
+ # album artist and track number are not available without an extra API call
+ # so they won't be scrobbled
+ self.network.scrobble(
+ report.artist,
+ report.name,
+ time.time(),
+ report.album,
+ duration=report.duration,
+ mbid=report.mbid,
+ )
# configuration keys
# released under the Creative Commons Attribution-ShareAlike(BY-SA) 4.0 license.
# https://creativecommons.org/licenses/by-sa/4.0/
-import asyncio
+import logging
import time
from collections.abc import Callable
from typing import Any
from music_assistant_models.constants import SECURE_STRING_SUBSTITUTE
from music_assistant_models.enums import ConfigEntryType, EventType
from music_assistant_models.errors import SetupFailedError
-from music_assistant_models.event import MassEvent
from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
from music_assistant_models.provider import ProviderManifest
+from music_assistant.helpers.scrobbler import ScrobblerHelper
from music_assistant.mass import MusicAssistant
from music_assistant.models import ProviderInstanceType
from music_assistant.models.plugin import PluginProvider
"""Plugin provider to support scrobbling of tracks."""
_client: ListenBrainz = None
- _currently_playing: str | None = None
_on_unload: list[Callable[[], None]] = []
- _last_scrobbled: str | None = None
def __init__(
self,
"""Call after the provider has been loaded."""
await super().loaded_in_mass()
+ handler = ListenBrainzEventHandler(self._client, self.logger)
+
# subscribe to internal event
self._on_unload.append(
- self.mass.subscribe(self._on_mass_media_item_played, EventType.MEDIA_ITEM_PLAYED)
+ self.mass.subscribe(handler._on_mass_media_item_played, EventType.MEDIA_ITEM_PLAYED)
)
async def unload(self, is_removed: bool = False) -> None:
for unload_cb in self._on_unload:
unload_cb()
- async def _on_mass_media_item_played(self, event: MassEvent) -> None:
- """Media item has finished playing, we'll scrobble the track."""
+
+class ListenBrainzEventHandler(ScrobblerHelper):
+ """Handles the event handling."""
+
+ _client: ListenBrainz = None
+
+ def __init__(self, client: ListenBrainz, logger: logging.Logger) -> None:
+ """Initialize."""
+ super().__init__(logger)
+ self._client = client
+
+ def _is_configured(self) -> bool:
+ """Check that we are configured."""
if self._client is None:
self.logger.error("no client available during _on_mass_media_item_played")
- return
-
- report: MediaItemPlaybackProgressReport = event.data
-
- # poor mans attempt to detect a song on loop
- if not report.fully_played and report.uri == self._last_scrobbled:
- self.logger.debug(
- "reset _last_scrobbled and _currently_playing because the song was restarted"
- )
- self._last_scrobbled = None
- # reset currently playing to avoid it expiring when looping single songs
- self._currently_playing = None
-
- def make_listen(report: Any) -> Listen:
- # album artist and track number are not available without an extra API call
- # so they won't be scrobbled
-
- # https://pylistenbrainz.readthedocs.io/en/latest/api_ref.html#class-listen
- return Listen(
- track_name=report.name,
- artist_name=report.artist,
- release_name=report.album,
- recording_mbid=report.mbid,
- listening_from="music-assistant",
- )
-
- def update_now_playing() -> None:
- try:
- listen = make_listen(report)
- self._client.submit_playing_now(listen)
- self.logger.debug(f"track {report.uri} marked as 'now playing'")
- self._currently_playing = report.uri
- except Exception as err:
- self.logger.exception(err)
-
- def scrobble() -> None:
- try:
- listen = make_listen(report)
- listen.listened_at = int(time.time())
- self._client.submit_single_listen(listen)
- self._last_scrobbled = report.uri
- except Exception as err:
- self.logger.exception(err)
-
- # update now playing if needed
- if self._currently_playing is None or self._currently_playing != report.uri:
- await asyncio.to_thread(update_now_playing)
-
- if self.should_scrobble(report):
- await asyncio.to_thread(scrobble)
-
- def should_scrobble(self, report: MediaItemPlaybackProgressReport) -> bool:
- """Determine if a track should be scrobbled, to be extended later."""
- if self._last_scrobbled == report.uri:
- self.logger.debug("skipped scrobbling due to duplicate event")
return False
+ return True
+
+ def _make_listen(self, report: Any) -> Listen:
+ # album artist and track number are not available without an extra API call
+ # so they won't be scrobbled
+
+ # https://pylistenbrainz.readthedocs.io/en/latest/api_ref.html#class-listen
+ return Listen(
+ track_name=report.name,
+ artist_name=report.artist,
+ release_name=report.album,
+ recording_mbid=report.mbid,
+ listening_from="music-assistant",
+ )
- # ideally we want more precise control
- # but because the event is triggered every 30s
- # and we don't have full queue details to determine
- # the exact context in which the event was fired
- # we can only rely on fully_played for now
- return bool(report.fully_played)
+ def _update_now_playing(self, report: MediaItemPlaybackProgressReport) -> None:
+ try:
+ listen = self._make_listen(report)
+ self._client.submit_playing_now(listen)
+ self.logger.debug(f"track {report.uri} marked as 'now playing'")
+ self._currently_playing = report.uri
+ except Exception as err:
+ self.logger.exception(err)
+
+ def _scrobble(self, report: MediaItemPlaybackProgressReport) -> None:
+ try:
+ listen = self._make_listen(report)
+ listen.listened_at = int(time.time())
+ self._client.submit_single_listen(listen)
+ self._last_scrobbled = report.uri
+ except Exception as err:
+ self.logger.exception(err)
async def get_config_entries(
--- /dev/null
+"""Tests the event handling for LastFM Plugin Provider."""
+
+import logging
+
+from music_assistant_models.enums import EventType, MediaType
+from music_assistant_models.event import MassEvent
+from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
+
+from music_assistant.helpers.scrobbler import ScrobblerHelper
+
+
+class DummyHandler(ScrobblerHelper):
+ """Spy version of a ScrobblerHelper to allow easy testing."""
+
+ _tracked = 0
+ _now_playing = 0
+
+ def __init__(self, logger: logging.Logger) -> None:
+ """Initialize."""
+ super().__init__(logger)
+
+ def _is_configured(self) -> bool:
+ return True
+
+ def _update_now_playing(self, report: MediaItemPlaybackProgressReport) -> None:
+ self._now_playing += 1
+
+ def _scrobble(self, report: MediaItemPlaybackProgressReport) -> None:
+ self._tracked += 1
+
+
+async def test_it_does_not_scrobble_the_same_track_twice() -> None:
+ """While songs are playing we get updates every 30 seconds.
+
+ Here we test that songs only get scrobbled once during each play.
+ """
+ handler = DummyHandler(logging.getLogger())
+
+ # not fully played yet
+ await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
+ assert handler._tracked == 0
+
+ # fully played near the end
+ await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=176))
+ assert handler._tracked == 1
+
+ # fully played on track change should not scrobble again
+ await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=180))
+ assert handler._tracked == 1
+
+ # single song is on repeat and started playing again
+ await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
+ assert handler._tracked == 1
+
+ # fully played for the second time
+ await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=179))
+ assert handler._tracked == 2
+
+
+async def test_it_resets_now_playing_when_songs_are_on_loop() -> None:
+ """When a song starts playing we update the 'now playing' endpoint.
+
+ This ends automatically, so if a single song is on repeat, we need to send the request again
+ """
+ handler = DummyHandler(logging.getLogger())
+
+ # started playing, should update now_playing
+ await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
+ assert handler._now_playing == 1
+
+ # fully played on track change should not update again
+ await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=180))
+ assert handler._now_playing == 1
+
+ # restarted same song, should scrobble again
+ await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
+ assert handler._now_playing == 2
+
+
+async def test_it_does_not_update_now_playing_on_pause() -> None:
+ """Don't update now_playing when pausing the player early in the song."""
+ handler = DummyHandler(logging.getLogger())
+
+ await handler._on_mass_media_item_played(
+ create_report(duration=180, seconds_played=20, is_playing=False)
+ )
+ assert handler._now_playing == 0
+
+
+def create_report(
+ duration: int, seconds_played: int, is_playing: bool = True, uri: str = "filesystem://track/1"
+) -> MassEvent:
+ """Create the MediaItemPlaybackProgressReport and wrap it in a MassEvent."""
+ return wrap_event(
+ MediaItemPlaybackProgressReport(
+ uri=uri,
+ media_type=MediaType.TRACK,
+ name="track",
+ artist=None,
+ album=None,
+ image_url=None,
+ duration=duration,
+ mbid="",
+ seconds_played=seconds_played,
+ fully_played=duration - seconds_played < 5,
+ is_playing=is_playing,
+ )
+ )
+
+
+def wrap_event(data: MediaItemPlaybackProgressReport) -> MassEvent:
+ """Create a MEDIA_ITEM_PLAYED event."""
+ return MassEvent(EventType.MEDIA_ITEM_PLAYED, data.uri, data)
+++ /dev/null
-"""Tests the event handling for LastFM Plugin Provider."""
-
-import logging
-
-from music_assistant_models.enums import EventType, MediaType
-from music_assistant_models.event import MassEvent
-from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
-from pylast import _Network
-
-from music_assistant.providers.lastfm_scrobble import LastFMEventHandler
-
-
-async def test_it_does_not_scrobble_the_same_track_twice() -> None:
- """While songs are playing we get updates every 30 seconds.
-
- Here we test that songs only get scrobbled once during each play.
- """
- handler = LastFMEventHandler(DummyNetwork(), logging.getLogger())
-
- # not fully played yet
- await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
- assert handler.network._tracked == 0
-
- # fully played near the end
- await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=176))
- assert handler.network._tracked == 1
-
- # fully played on track change should not scrobble again
- await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=180))
- assert handler.network._tracked == 1
-
- # single song is on repeat and started playing again
- await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
- assert handler.network._tracked == 1
-
- # fully played for the second time
- await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=179))
- assert handler.network._tracked == 2
-
-
-async def test_it_resets_now_playing_when_songs_are_on_loop() -> None:
- """When a song starts playing we update the 'now playing' endpoint.
-
- This ends automatically, so if a single song is on repeat, we need to send the request again
- """
- handler = LastFMEventHandler(DummyNetwork(), logging.getLogger())
-
- # started playing, should update now_playing
- await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
- assert handler.network._now_playing == 1
-
- # fully played on track change should not update again
- await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=180))
- assert handler.network._now_playing == 1
-
- # restarted same song, should scrobble again
- await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
- assert handler.network._now_playing == 2
-
-
-async def test_it_does_not_update_now_playing_on_pause() -> None:
- """Don't update now_playing when pausing the player early in the song."""
- handler = LastFMEventHandler(DummyNetwork(), logging.getLogger())
-
- await handler._on_mass_media_item_played(
- create_report(duration=180, seconds_played=20, is_playing=False)
- )
- assert handler.network._now_playing == 0
-
-
-def create_report(
- duration: int, seconds_played: int, is_playing: bool = True, uri: str = "filesystem://track/1"
-) -> MassEvent:
- """Create the MediaItemPlaybackProgressReport and wrap it in a MassEvent."""
- return wrap_event(
- MediaItemPlaybackProgressReport(
- uri=uri,
- media_type=MediaType.TRACK,
- name="track",
- artist=None,
- album=None,
- image_url=None,
- duration=duration,
- mbid="",
- seconds_played=seconds_played,
- fully_played=duration - seconds_played < 5,
- is_playing=is_playing,
- )
- )
-
-
-def wrap_event(data: MediaItemPlaybackProgressReport) -> MassEvent:
- """Create a MEDIA_ITEM_PLAYED event."""
- return MassEvent(EventType.MEDIA_ITEM_PLAYED, data.uri, data)
-
-
-class DummyNetwork(_Network): # type: ignore[misc]
- """Spy version of a pylast._Network to allow easy testing."""
-
- _tracked = 0
- _now_playing = 0
-
- def __init__(self) -> None:
- """Initialize."""
- super().__init__(
- name="Dummy",
- homepage="",
- ws_server="",
- api_key="",
- api_secret="",
- session_key="",
- username="",
- password_hash="",
- token="",
- domain_names={},
- urls={},
- )
-
- def update_now_playing(
- self,
- artist: str,
- title: str,
- album: str | None = None,
- album_artist: str | None = None,
- duration: str | None = None,
- track_number: str | None = None,
- mbid: str | None = None,
- context: str | None = None,
- ) -> None:
- """Track call count."""
- self._now_playing += 1
-
- def scrobble(
- self,
- artist: str,
- title: str,
- timestamp: int,
- album: str | None = None,
- album_artist: str | None = None,
- track_number: int | None = None,
- duration: int | None = None,
- stream_id: str | None = None,
- context: str | None = None,
- mbid: str | None = None,
- ) -> None:
- """Track call count."""
- self._tracked += 1