_client: ListenBrainz = None
_currently_playing: str | None = None
_on_unload: list[Callable[[], None]] = []
+ _last_scrobbled: str | None = None
def __init__(
self,
self.logger.error("no client available during _on_mass_media_item_played")
return
- report = event.data
+ report: MediaItemPlaybackProgressReport = event.data
+
+ # poor mans attempt to detect a song on loop
+ if not report.fully_played and report.uri == self._last_scrobbled:
+ self.logger.debug(
+ "reset _last_scrobbled and _currently_playing because the song was restarted"
+ )
+ self._last_scrobbled = None
+ # reset currently playing to avoid it expiring when looping single songs
+ self._currently_playing = None
def make_listen(report: Any) -> Listen:
# album artist and track number are not available without an extra API call
listen = make_listen(report)
listen.listened_at = int(time.time())
self._client.submit_single_listen(listen)
+ self._last_scrobbled = report.uri
except Exception as err:
self.logger.exception(err)
if self.should_scrobble(report):
await asyncio.to_thread(scrobble)
- if report.fully_played:
- # reset currently playing to avoid it expiring when looping songs
- self._currently_playing = None
-
def should_scrobble(self, report: MediaItemPlaybackProgressReport) -> bool:
"""Determine if a track should be scrobbled, to be extended later."""
+ if self._last_scrobbled == report.uri:
+ self.logger.debug("skipped scrobbling due to duplicate event")
+ return False
+
# ideally we want more precise control
# but because the event is triggered every 30s
# and we don't have full queue details to determine