From 4b38671996ea26a4b63ae026711e8291c840fd08 Mon Sep 17 00:00:00 2001 From: Willem-Jan Zijderveld Date: Wed, 12 Mar 2025 00:49:51 +0100 Subject: [PATCH] [LastFM Scrobbler] Add some basic unit tests and fix a bug (#2028) * Extract logic to a handler and added a couple unit tests * Bugfix: now_playing was triggered when pausing the player early in the song --- .../providers/lastfm_scrobble/__init__.py | 45 ++++-- tests/providers/lastfm_scrobble/__init__.py | 1 + .../providers/lastfm_scrobble/test_events.py | 147 ++++++++++++++++++ 3 files changed, 178 insertions(+), 15 deletions(-) create mode 100644 tests/providers/lastfm_scrobble/__init__.py create mode 100644 tests/providers/lastfm_scrobble/test_events.py diff --git a/music_assistant/providers/lastfm_scrobble/__init__.py b/music_assistant/providers/lastfm_scrobble/__init__.py index e6a408c6..fe376b50 100644 --- a/music_assistant/providers/lastfm_scrobble/__init__.py +++ b/music_assistant/providers/lastfm_scrobble/__init__.py @@ -45,10 +45,7 @@ async def setup( class LastFMScrobbleProvider(PluginProvider): """Plugin provider to support scrobbling of tracks.""" - _network: pylast._Network = None - _currently_playing: str | None = None _on_unload: list[Callable[[], None]] = [] - _last_scrobbled: str | None = None def _get_network_config(self) -> dict[str, ConfigValueType]: return { @@ -67,11 +64,11 @@ class LastFMScrobbleProvider(PluginProvider): self.logger.info("No session key available, don't forget to authenticate!") return - self._network = _get_network(self._get_network_config()) + handler = LastFMEventHandler(_get_network(self._get_network_config()), self.logger) # subscribe to internal event self._on_unload.append( - self.mass.subscribe(self._on_mass_media_item_played, EventType.MEDIA_ITEM_PLAYED) + self.mass.subscribe(handler._on_mass_media_item_played, EventType.MEDIA_ITEM_PLAYED) ) async def unload(self, is_removed: bool = False) -> None: @@ -83,26 +80,40 @@ class LastFMScrobbleProvider(PluginProvider): for unload_cb in self._on_unload: unload_cb() + +class LastFMEventHandler: + """Handles the event handling.""" + + logger: logging.Logger + network: pylast._Network + currently_playing: str | None = None + last_scrobbled: str | None = None + + def __init__(self, network: pylast._Network, logger: logging.Logger) -> None: + """Initialize.""" + self.network = network + self.logger = logger + async def _on_mass_media_item_played(self, event: MassEvent) -> None: """Media item has finished playing, we'll scrobble the track.""" - if self._network is None: + if self.network is None: self.logger.error("no network available during _on_mass_media_item_played") return report: MediaItemPlaybackProgressReport = event.data # poor mans attempt to detect a song on loop - if not report.fully_played and report.uri == self._last_scrobbled: + if not report.fully_played and report.uri == self.last_scrobbled: self.logger.debug( "reset _last_scrobbled and _currently_playing because the song was restarted" ) - self._last_scrobbled = None + self.last_scrobbled = None # reset currently playing to avoid it expiring when looping single songs - self._currently_playing = None + self.currently_playing = None def update_now_playing() -> None: try: - self._network.update_now_playing( + self.network.update_now_playing( report.artist, report.name, report.album, @@ -110,7 +121,7 @@ class LastFMScrobbleProvider(PluginProvider): mbid=report.mbid, ) self.logger.debug(f"track {report.uri} marked as 'now playing'") - self._currently_playing = report.uri + self.currently_playing = report.uri except Exception as err: self.logger.exception(err) @@ -118,7 +129,7 @@ class LastFMScrobbleProvider(PluginProvider): try: # album artist and track number are not available without an extra API call # so they won't be scrobbled - self._network.scrobble( + self.network.scrobble( report.artist, report.name, time.time(), @@ -126,12 +137,14 @@ class LastFMScrobbleProvider(PluginProvider): duration=report.duration, mbid=report.mbid, ) - self._last_scrobbled = report.uri + self.last_scrobbled = report.uri except Exception as err: self.logger.exception(err) # update now playing if needed - if self._currently_playing is None or self._currently_playing != report.uri: + if report.is_playing and ( + self.currently_playing is None or self.currently_playing != report.uri + ): await asyncio.to_thread(update_now_playing) if self.should_scrobble(report): @@ -139,7 +152,7 @@ class LastFMScrobbleProvider(PluginProvider): def should_scrobble(self, report: MediaItemPlaybackProgressReport) -> bool: """Determine if a track should be scrobbled, to be extended later.""" - if self._last_scrobbled == report.uri: + if self.last_scrobbled == report.uri: self.logger.debug("skipped scrobbling due to duplicate event") return False @@ -317,3 +330,5 @@ def _get_network(config: dict[str, ConfigValueType]) -> pylast._Network: return pylast.LibreFMNetwork( key, secret, username=config.get(CONF_USERNAME), session_key=session_key ) + case _: + raise SetupFailedError(f"unknown provider {provider} configured") diff --git a/tests/providers/lastfm_scrobble/__init__.py b/tests/providers/lastfm_scrobble/__init__.py new file mode 100644 index 00000000..a224e98c --- /dev/null +++ b/tests/providers/lastfm_scrobble/__init__.py @@ -0,0 +1 @@ +"""Tests for the lastfm_scrobble plugin provider.""" diff --git a/tests/providers/lastfm_scrobble/test_events.py b/tests/providers/lastfm_scrobble/test_events.py new file mode 100644 index 00000000..71a6c512 --- /dev/null +++ b/tests/providers/lastfm_scrobble/test_events.py @@ -0,0 +1,147 @@ +"""Tests the event handling for LastFM Plugin Provider.""" + +import logging + +from music_assistant_models.enums import EventType, MediaType +from music_assistant_models.event import MassEvent +from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport +from pylast import _Network + +from music_assistant.providers.lastfm_scrobble import LastFMEventHandler + + +async def test_it_does_not_scrobble_the_same_track_twice() -> None: + """While songs are playing we get updates every 30 seconds. + + Here we test that songs only get scrobbled once during each play. + """ + handler = LastFMEventHandler(DummyNetwork(), logging.getLogger()) + + # not fully played yet + await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30)) + assert handler.network._tracked == 0 + + # fully played near the end + await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=176)) + assert handler.network._tracked == 1 + + # fully played on track change should not scrobble again + await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=180)) + assert handler.network._tracked == 1 + + # single song is on repeat and started playing again + await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30)) + assert handler.network._tracked == 1 + + # fully played for the second time + await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=179)) + assert handler.network._tracked == 2 + + +async def test_it_resets_now_playing_when_songs_are_on_loop() -> None: + """When a song starts playing we update the 'now playing' endpoint. + + This ends automatically, so if a single song is on repeat, we need to send the request again + """ + handler = LastFMEventHandler(DummyNetwork(), logging.getLogger()) + + # started playing, should update now_playing + await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30)) + assert handler.network._now_playing == 1 + + # fully played on track change should not update again + await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=180)) + assert handler.network._now_playing == 1 + + # restarted same song, should scrobble again + await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30)) + assert handler.network._now_playing == 2 + + +async def test_it_does_not_update_now_playing_on_pause() -> None: + """Don't update now_playing when pausing the player early in the song.""" + handler = LastFMEventHandler(DummyNetwork(), logging.getLogger()) + + await handler._on_mass_media_item_played( + create_report(duration=180, seconds_played=20, is_playing=False) + ) + assert handler.network._now_playing == 0 + + +def create_report( + duration: int, seconds_played: int, is_playing: bool = True, uri: str = "filesystem://track/1" +) -> MassEvent: + """Create the MediaItemPlaybackProgressReport and wrap it in a MassEvent.""" + return wrap_event( + MediaItemPlaybackProgressReport( + uri=uri, + media_type=MediaType.TRACK, + name="track", + artist=None, + album=None, + image_url=None, + duration=duration, + mbid="", + seconds_played=seconds_played, + fully_played=duration - seconds_played < 5, + is_playing=is_playing, + ) + ) + + +def wrap_event(data: MediaItemPlaybackProgressReport) -> MassEvent: + """Create a MEDIA_ITEM_PLAYED event.""" + return MassEvent(EventType.MEDIA_ITEM_PLAYED, data.uri, data) + + +class DummyNetwork(_Network): # type: ignore[misc] + """Spy version of a pylast._Network to allow easy testing.""" + + _tracked = 0 + _now_playing = 0 + + def __init__(self) -> None: + """Initialize.""" + super().__init__( + name="Dummy", + homepage="", + ws_server="", + api_key="", + api_secret="", + session_key="", + username="", + password_hash="", + token="", + domain_names={}, + urls={}, + ) + + def update_now_playing( + self, + artist: str, + title: str, + album: str | None = None, + album_artist: str | None = None, + duration: str | None = None, + track_number: str | None = None, + mbid: str | None = None, + context: str | None = None, + ) -> None: + """Track call count.""" + self._now_playing += 1 + + def scrobble( + self, + artist: str, + title: str, + timestamp: int, + album: str | None = None, + album_artist: str | None = None, + track_number: int | None = None, + duration: int | None = None, + stream_id: str | None = None, + context: str | None = None, + mbid: str | None = None, + ) -> None: + """Track call count.""" + self._tracked += 1 -- 2.34.1