Refactor common code from listenbrainz and lastfm scrobblers (#2042)
authorIan Campbell <ijc@users.noreply.github.com>
Sun, 16 Mar 2025 10:40:55 +0000 (10:40 +0000)
committerGitHub <noreply@github.com>
Sun, 16 Mar 2025 10:40:55 +0000 (11:40 +0100)
Pull the common code from `LastFMEventHandler` into a common base class and use
it for `ListenBrainzEventHandler` too.

This puts all the logic for deciding when to scrobble into a single place.

music_assistant/helpers/__init__.py
music_assistant/helpers/scrobbler.py [new file with mode: 0644]
music_assistant/providers/lastfm_scrobble/__init__.py
music_assistant/providers/listenbrainz_scrobble/__init__.py
tests/core/test_scrobbler.py [new file with mode: 0644]
tests/providers/lastfm_scrobble/test_events.py [deleted file]

index ca6813943c1f41b7cf74f50e59855f026317e6a3..2434cc20345e2a10d84166597a5accad4c106900 100644 (file)
@@ -1 +1 @@
-"""Various server-seecific utils/helpers."""
+"""Various server-specific utils/helpers."""
diff --git a/music_assistant/helpers/scrobbler.py b/music_assistant/helpers/scrobbler.py
new file mode 100644 (file)
index 0000000..b2995c7
--- /dev/null
@@ -0,0 +1,86 @@
+"""Helper class to aid scrobblers."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from music_assistant_models.event import MassEvent
+    from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
+
+
+class ScrobblerHelper:
+    """Base class to aid scrobbling tracks."""
+
+    logger: logging.Logger
+    currently_playing: str | None = None
+    last_scrobbled: str | None = None
+
+    def __init__(self, logger: logging.Logger) -> None:
+        """Initialize."""
+        self.logger = logger
+
+    def _is_configured(self) -> bool:
+        """Override if subclass needs specific configuration."""
+        return True
+
+    def _update_now_playing(self, report: MediaItemPlaybackProgressReport) -> None:
+        """Send a Now Playing update to the scrobbling service."""
+
+    def _scrobble(self, report: MediaItemPlaybackProgressReport) -> None:
+        """Scrobble."""
+
+    async def _on_mass_media_item_played(self, event: MassEvent) -> None:
+        """Media item has finished playing, we'll scrobble the track."""
+        if not self._is_configured():
+            return
+
+        report: MediaItemPlaybackProgressReport = event.data
+
+        # poor mans attempt to detect a song on loop
+        if not report.fully_played and report.uri == self.last_scrobbled:
+            self.logger.debug(
+                "reset _last_scrobbled and _currently_playing because the song was restarted"
+            )
+            self.last_scrobbled = None
+            # reset currently playing to avoid it expiring when looping single songs
+            self.currently_playing = None
+
+        def update_now_playing() -> None:
+            try:
+                self._update_now_playing(report)
+                self.logger.debug(f"track {report.uri} marked as 'now playing'")
+                self.currently_playing = report.uri
+            except Exception as err:
+                self.logger.exception(err)
+
+        def scrobble() -> None:
+            try:
+                self._scrobble(report)
+                self.last_scrobbled = report.uri
+            except Exception as err:
+                self.logger.exception(err)
+
+        # update now playing if needed
+        if report.is_playing and (
+            self.currently_playing is None or self.currently_playing != report.uri
+        ):
+            await asyncio.to_thread(update_now_playing)
+
+        if self.should_scrobble(report):
+            await asyncio.to_thread(scrobble)
+
+    def should_scrobble(self, report: MediaItemPlaybackProgressReport) -> bool:
+        """Determine if a track should be scrobbled, to be extended later."""
+        if self.last_scrobbled == report.uri:
+            self.logger.debug("skipped scrobbling due to duplicate event")
+            return False
+
+        # ideally we want more precise control
+        # but because the event is triggered every 30s
+        # and we don't have full queue details to determine
+        # the exact context in which the event was fired
+        # we can only rely on fully_played for now
+        return bool(report.fully_played)
index fe376b50751136c4a6f0c9801fce459e48899025..3ff359312303884067b0be843778b06d5b00dcd5 100644 (file)
@@ -1,6 +1,5 @@
 """Allows scrobbling of tracks with the help of PyLast."""
 
-import asyncio
 import logging
 import time
 from collections.abc import Callable
@@ -15,12 +14,12 @@ from music_assistant_models.config_entries import (
 from music_assistant_models.constants import SECURE_STRING_SUBSTITUTE
 from music_assistant_models.enums import ConfigEntryType, EventType
 from music_assistant_models.errors import LoginFailed, SetupFailedError
-from music_assistant_models.event import MassEvent
 from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
 from music_assistant_models.provider import ProviderManifest
 
 from music_assistant.constants import MASS_LOGGER_NAME
 from music_assistant.helpers.auth import AuthenticationHelper
+from music_assistant.helpers.scrobbler import ScrobblerHelper
 from music_assistant.mass import MusicAssistant
 from music_assistant.models import ProviderInstanceType
 from music_assistant.models.plugin import PluginProvider
@@ -81,87 +80,43 @@ class LastFMScrobbleProvider(PluginProvider):
             unload_cb()
 
 
-class LastFMEventHandler:
+class LastFMEventHandler(ScrobblerHelper):
     """Handles the event handling."""
 
-    logger: logging.Logger
     network: pylast._Network
-    currently_playing: str | None = None
-    last_scrobbled: str | None = None
 
     def __init__(self, network: pylast._Network, logger: logging.Logger) -> None:
         """Initialize."""
+        super().__init__(logger)
         self.network = network
-        self.logger = logger
 
-    async def _on_mass_media_item_played(self, event: MassEvent) -> None:
-        """Media item has finished playing, we'll scrobble the track."""
+    def _is_configured(self) -> bool:
         if self.network is None:
             self.logger.error("no network available during _on_mass_media_item_played")
-            return
+            return False
 
-        report: MediaItemPlaybackProgressReport = event.data
+        return True
 
-        # poor mans attempt to detect a song on loop
-        if not report.fully_played and report.uri == self.last_scrobbled:
-            self.logger.debug(
-                "reset _last_scrobbled and _currently_playing because the song was restarted"
-            )
-            self.last_scrobbled = None
-            # reset currently playing to avoid it expiring when looping single songs
-            self.currently_playing = None
-
-        def update_now_playing() -> None:
-            try:
-                self.network.update_now_playing(
-                    report.artist,
-                    report.name,
-                    report.album,
-                    duration=report.duration,
-                    mbid=report.mbid,
-                )
-                self.logger.debug(f"track {report.uri} marked as 'now playing'")
-                self.currently_playing = report.uri
-            except Exception as err:
-                self.logger.exception(err)
-
-        def scrobble() -> None:
-            try:
-                # album artist and track number are not available without an extra API call
-                # so they won't be scrobbled
-                self.network.scrobble(
-                    report.artist,
-                    report.name,
-                    time.time(),
-                    report.album,
-                    duration=report.duration,
-                    mbid=report.mbid,
-                )
-                self.last_scrobbled = report.uri
-            except Exception as err:
-                self.logger.exception(err)
-
-        # update now playing if needed
-        if report.is_playing and (
-            self.currently_playing is None or self.currently_playing != report.uri
-        ):
-            await asyncio.to_thread(update_now_playing)
-
-        if self.should_scrobble(report):
-            await asyncio.to_thread(scrobble)
-
-    def should_scrobble(self, report: MediaItemPlaybackProgressReport) -> bool:
-        """Determine if a track should be scrobbled, to be extended later."""
-        if self.last_scrobbled == report.uri:
-            self.logger.debug("skipped scrobbling due to duplicate event")
-            return False
+    def _update_now_playing(self, report: MediaItemPlaybackProgressReport) -> None:
+        self.network.update_now_playing(
+            report.artist,
+            report.name,
+            report.album,
+            duration=report.duration,
+            mbid=report.mbid,
+        )
 
-        # ideally we want more precise control
-        # but because the event is triggered every 30s
-        # and we don't have full queue details to determine
-        # the exact context in which the event was fired
-        # we can only rely on fully_played for now
-        return bool(report.fully_played)
+    def _scrobble(self, report: MediaItemPlaybackProgressReport) -> None:
+        # album artist and track number are not available without an extra API call
+        # so they won't be scrobbled
+        self.network.scrobble(
+            report.artist,
+            report.name,
+            time.time(),
+            report.album,
+            duration=report.duration,
+            mbid=report.mbid,
+        )
 
 
 # configuration keys
index 153b3047a1e164848973f35183e1ee908d46cc75..c3823f5d2724c41dff214688e61654f0f75b32f9 100644 (file)
@@ -4,7 +4,7 @@
 # released under the Creative Commons Attribution-ShareAlike(BY-SA) 4.0 license.
 # https://creativecommons.org/licenses/by-sa/4.0/
 
-import asyncio
+import logging
 import time
 from collections.abc import Callable
 from typing import Any
@@ -18,10 +18,10 @@ from music_assistant_models.config_entries import (
 from music_assistant_models.constants import SECURE_STRING_SUBSTITUTE
 from music_assistant_models.enums import ConfigEntryType, EventType
 from music_assistant_models.errors import SetupFailedError
-from music_assistant_models.event import MassEvent
 from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
 from music_assistant_models.provider import ProviderManifest
 
+from music_assistant.helpers.scrobbler import ScrobblerHelper
 from music_assistant.mass import MusicAssistant
 from music_assistant.models import ProviderInstanceType
 from music_assistant.models.plugin import PluginProvider
@@ -49,9 +49,7 @@ class ListenBrainzScrobbleProvider(PluginProvider):
     """Plugin provider to support scrobbling of tracks."""
 
     _client: ListenBrainz = None
-    _currently_playing: str | None = None
     _on_unload: list[Callable[[], None]] = []
-    _last_scrobbled: str | None = None
 
     def __init__(
         self,
@@ -68,9 +66,11 @@ class ListenBrainzScrobbleProvider(PluginProvider):
         """Call after the provider has been loaded."""
         await super().loaded_in_mass()
 
+        handler = ListenBrainzEventHandler(self._client, self.logger)
+
         # subscribe to internal event
         self._on_unload.append(
-            self.mass.subscribe(self._on_mass_media_item_played, EventType.MEDIA_ITEM_PLAYED)
+            self.mass.subscribe(handler._on_mass_media_item_played, EventType.MEDIA_ITEM_PLAYED)
         )
 
     async def unload(self, is_removed: bool = False) -> None:
@@ -82,73 +82,54 @@ class ListenBrainzScrobbleProvider(PluginProvider):
         for unload_cb in self._on_unload:
             unload_cb()
 
-    async def _on_mass_media_item_played(self, event: MassEvent) -> None:
-        """Media item has finished playing, we'll scrobble the track."""
+
+class ListenBrainzEventHandler(ScrobblerHelper):
+    """Handles the event handling."""
+
+    _client: ListenBrainz = None
+
+    def __init__(self, client: ListenBrainz, logger: logging.Logger) -> None:
+        """Initialize."""
+        super().__init__(logger)
+        self._client = client
+
+    def _is_configured(self) -> bool:
+        """Check that we are configured."""
         if self._client is None:
             self.logger.error("no client available during _on_mass_media_item_played")
-            return
-
-        report: MediaItemPlaybackProgressReport = event.data
-
-        # poor mans attempt to detect a song on loop
-        if not report.fully_played and report.uri == self._last_scrobbled:
-            self.logger.debug(
-                "reset _last_scrobbled and _currently_playing because the song was restarted"
-            )
-            self._last_scrobbled = None
-            # reset currently playing to avoid it expiring when looping single songs
-            self._currently_playing = None
-
-        def make_listen(report: Any) -> Listen:
-            # album artist and track number are not available without an extra API call
-            # so they won't be scrobbled
-
-            # https://pylistenbrainz.readthedocs.io/en/latest/api_ref.html#class-listen
-            return Listen(
-                track_name=report.name,
-                artist_name=report.artist,
-                release_name=report.album,
-                recording_mbid=report.mbid,
-                listening_from="music-assistant",
-            )
-
-        def update_now_playing() -> None:
-            try:
-                listen = make_listen(report)
-                self._client.submit_playing_now(listen)
-                self.logger.debug(f"track {report.uri} marked as 'now playing'")
-                self._currently_playing = report.uri
-            except Exception as err:
-                self.logger.exception(err)
-
-        def scrobble() -> None:
-            try:
-                listen = make_listen(report)
-                listen.listened_at = int(time.time())
-                self._client.submit_single_listen(listen)
-                self._last_scrobbled = report.uri
-            except Exception as err:
-                self.logger.exception(err)
-
-        # update now playing if needed
-        if self._currently_playing is None or self._currently_playing != report.uri:
-            await asyncio.to_thread(update_now_playing)
-
-        if self.should_scrobble(report):
-            await asyncio.to_thread(scrobble)
-
-    def should_scrobble(self, report: MediaItemPlaybackProgressReport) -> bool:
-        """Determine if a track should be scrobbled, to be extended later."""
-        if self._last_scrobbled == report.uri:
-            self.logger.debug("skipped scrobbling due to duplicate event")
             return False
+        return True
+
+    def _make_listen(self, report: Any) -> Listen:
+        # album artist and track number are not available without an extra API call
+        # so they won't be scrobbled
+
+        # https://pylistenbrainz.readthedocs.io/en/latest/api_ref.html#class-listen
+        return Listen(
+            track_name=report.name,
+            artist_name=report.artist,
+            release_name=report.album,
+            recording_mbid=report.mbid,
+            listening_from="music-assistant",
+        )
 
-        # ideally we want more precise control
-        # but because the event is triggered every 30s
-        # and we don't have full queue details to determine
-        # the exact context in which the event was fired
-        # we can only rely on fully_played for now
-        return bool(report.fully_played)
+    def _update_now_playing(self, report: MediaItemPlaybackProgressReport) -> None:
+        try:
+            listen = self._make_listen(report)
+            self._client.submit_playing_now(listen)
+            self.logger.debug(f"track {report.uri} marked as 'now playing'")
+            self._currently_playing = report.uri
+        except Exception as err:
+            self.logger.exception(err)
+
+    def _scrobble(self, report: MediaItemPlaybackProgressReport) -> None:
+        try:
+            listen = self._make_listen(report)
+            listen.listened_at = int(time.time())
+            self._client.submit_single_listen(listen)
+            self._last_scrobbled = report.uri
+        except Exception as err:
+            self.logger.exception(err)
 
 
 async def get_config_entries(
diff --git a/tests/core/test_scrobbler.py b/tests/core/test_scrobbler.py
new file mode 100644 (file)
index 0000000..21d8d6e
--- /dev/null
@@ -0,0 +1,113 @@
+"""Tests the event handling for LastFM Plugin Provider."""
+
+import logging
+
+from music_assistant_models.enums import EventType, MediaType
+from music_assistant_models.event import MassEvent
+from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
+
+from music_assistant.helpers.scrobbler import ScrobblerHelper
+
+
+class DummyHandler(ScrobblerHelper):
+    """Spy version of a ScrobblerHelper to allow easy testing."""
+
+    _tracked = 0
+    _now_playing = 0
+
+    def __init__(self, logger: logging.Logger) -> None:
+        """Initialize."""
+        super().__init__(logger)
+
+    def _is_configured(self) -> bool:
+        return True
+
+    def _update_now_playing(self, report: MediaItemPlaybackProgressReport) -> None:
+        self._now_playing += 1
+
+    def _scrobble(self, report: MediaItemPlaybackProgressReport) -> None:
+        self._tracked += 1
+
+
+async def test_it_does_not_scrobble_the_same_track_twice() -> None:
+    """While songs are playing we get updates every 30 seconds.
+
+    Here we test that songs only get scrobbled once during each play.
+    """
+    handler = DummyHandler(logging.getLogger())
+
+    # not fully played yet
+    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
+    assert handler._tracked == 0
+
+    # fully played near the end
+    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=176))
+    assert handler._tracked == 1
+
+    # fully played on track change should not scrobble again
+    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=180))
+    assert handler._tracked == 1
+
+    # single song is on repeat and started playing again
+    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
+    assert handler._tracked == 1
+
+    # fully played for the second time
+    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=179))
+    assert handler._tracked == 2
+
+
+async def test_it_resets_now_playing_when_songs_are_on_loop() -> None:
+    """When a song starts playing we update the 'now playing' endpoint.
+
+    This ends automatically, so if a single song is on repeat, we need to send the request again
+    """
+    handler = DummyHandler(logging.getLogger())
+
+    # started playing, should update now_playing
+    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
+    assert handler._now_playing == 1
+
+    # fully played on track change should not update again
+    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=180))
+    assert handler._now_playing == 1
+
+    # restarted same song, should scrobble again
+    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
+    assert handler._now_playing == 2
+
+
+async def test_it_does_not_update_now_playing_on_pause() -> None:
+    """Don't update now_playing when pausing the player early in the song."""
+    handler = DummyHandler(logging.getLogger())
+
+    await handler._on_mass_media_item_played(
+        create_report(duration=180, seconds_played=20, is_playing=False)
+    )
+    assert handler._now_playing == 0
+
+
+def create_report(
+    duration: int, seconds_played: int, is_playing: bool = True, uri: str = "filesystem://track/1"
+) -> MassEvent:
+    """Create the MediaItemPlaybackProgressReport and wrap it in a MassEvent."""
+    return wrap_event(
+        MediaItemPlaybackProgressReport(
+            uri=uri,
+            media_type=MediaType.TRACK,
+            name="track",
+            artist=None,
+            album=None,
+            image_url=None,
+            duration=duration,
+            mbid="",
+            seconds_played=seconds_played,
+            fully_played=duration - seconds_played < 5,
+            is_playing=is_playing,
+        )
+    )
+
+
+def wrap_event(data: MediaItemPlaybackProgressReport) -> MassEvent:
+    """Create a MEDIA_ITEM_PLAYED event."""
+    return MassEvent(EventType.MEDIA_ITEM_PLAYED, data.uri, data)
diff --git a/tests/providers/lastfm_scrobble/test_events.py b/tests/providers/lastfm_scrobble/test_events.py
deleted file mode 100644 (file)
index 71a6c51..0000000
+++ /dev/null
@@ -1,147 +0,0 @@
-"""Tests the event handling for LastFM Plugin Provider."""
-
-import logging
-
-from music_assistant_models.enums import EventType, MediaType
-from music_assistant_models.event import MassEvent
-from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
-from pylast import _Network
-
-from music_assistant.providers.lastfm_scrobble import LastFMEventHandler
-
-
-async def test_it_does_not_scrobble_the_same_track_twice() -> None:
-    """While songs are playing we get updates every 30 seconds.
-
-    Here we test that songs only get scrobbled once during each play.
-    """
-    handler = LastFMEventHandler(DummyNetwork(), logging.getLogger())
-
-    # not fully played yet
-    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
-    assert handler.network._tracked == 0
-
-    # fully played near the end
-    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=176))
-    assert handler.network._tracked == 1
-
-    # fully played on track change should not scrobble again
-    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=180))
-    assert handler.network._tracked == 1
-
-    # single song is on repeat and started playing again
-    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
-    assert handler.network._tracked == 1
-
-    # fully played for the second time
-    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=179))
-    assert handler.network._tracked == 2
-
-
-async def test_it_resets_now_playing_when_songs_are_on_loop() -> None:
-    """When a song starts playing we update the 'now playing' endpoint.
-
-    This ends automatically, so if a single song is on repeat, we need to send the request again
-    """
-    handler = LastFMEventHandler(DummyNetwork(), logging.getLogger())
-
-    # started playing, should update now_playing
-    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
-    assert handler.network._now_playing == 1
-
-    # fully played on track change should not update again
-    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=180))
-    assert handler.network._now_playing == 1
-
-    # restarted same song, should scrobble again
-    await handler._on_mass_media_item_played(create_report(duration=180, seconds_played=30))
-    assert handler.network._now_playing == 2
-
-
-async def test_it_does_not_update_now_playing_on_pause() -> None:
-    """Don't update now_playing when pausing the player early in the song."""
-    handler = LastFMEventHandler(DummyNetwork(), logging.getLogger())
-
-    await handler._on_mass_media_item_played(
-        create_report(duration=180, seconds_played=20, is_playing=False)
-    )
-    assert handler.network._now_playing == 0
-
-
-def create_report(
-    duration: int, seconds_played: int, is_playing: bool = True, uri: str = "filesystem://track/1"
-) -> MassEvent:
-    """Create the MediaItemPlaybackProgressReport and wrap it in a MassEvent."""
-    return wrap_event(
-        MediaItemPlaybackProgressReport(
-            uri=uri,
-            media_type=MediaType.TRACK,
-            name="track",
-            artist=None,
-            album=None,
-            image_url=None,
-            duration=duration,
-            mbid="",
-            seconds_played=seconds_played,
-            fully_played=duration - seconds_played < 5,
-            is_playing=is_playing,
-        )
-    )
-
-
-def wrap_event(data: MediaItemPlaybackProgressReport) -> MassEvent:
-    """Create a MEDIA_ITEM_PLAYED event."""
-    return MassEvent(EventType.MEDIA_ITEM_PLAYED, data.uri, data)
-
-
-class DummyNetwork(_Network):  # type: ignore[misc]
-    """Spy version of a pylast._Network to allow easy testing."""
-
-    _tracked = 0
-    _now_playing = 0
-
-    def __init__(self) -> None:
-        """Initialize."""
-        super().__init__(
-            name="Dummy",
-            homepage="",
-            ws_server="",
-            api_key="",
-            api_secret="",
-            session_key="",
-            username="",
-            password_hash="",
-            token="",
-            domain_names={},
-            urls={},
-        )
-
-    def update_now_playing(
-        self,
-        artist: str,
-        title: str,
-        album: str | None = None,
-        album_artist: str | None = None,
-        duration: str | None = None,
-        track_number: str | None = None,
-        mbid: str | None = None,
-        context: str | None = None,
-    ) -> None:
-        """Track call count."""
-        self._now_playing += 1
-
-    def scrobble(
-        self,
-        artist: str,
-        title: str,
-        timestamp: int,
-        album: str | None = None,
-        album_artist: str | None = None,
-        track_number: int | None = None,
-        duration: int | None = None,
-        stream_id: str | None = None,
-        context: str | None = None,
-        mbid: str | None = None,
-    ) -> None:
-        """Track call count."""
-        self._tracked += 1