"""Deezer music provider support for MusicAssistant."""
+import datetime
import hashlib
+import uuid
from asyncio import TaskGroup
from collections.abc import AsyncGenerator
from math import ceil
content_type=ContentType.try_parse(url_details["format"].split("_")[0])
),
duration=int(song_data["DURATION"]),
- data=url,
+ data={"url": url, "format": url_details["format"]},
expires=url_details["exp"],
size=int(song_data[f"FILESIZE_{url_details['format']}"]),
+ callback=self.log_listen_cb,
)
async def get_audio_stream(
headers["Range"] = f"bytes={skip_bytes}-"
buffer = bytearray()
+ streamdetails.data["start_ts"] = datetime.datetime.utcnow().timestamp()
+ streamdetails.data["stream_id"] = uuid.uuid1()
+ self.mass.create_task(self.gw_client.log_listen(next_track=streamdetails.item_id))
async with self.mass.http_session.get(
- streamdetails.data, headers=headers, timeout=timeout
+ streamdetails.data["url"], headers=headers, timeout=timeout
) as resp:
async for chunk in resp.content.iter_chunked(2048):
buffer += chunk
del buffer[:2048]
yield bytes(buffer)
+ async def log_listen_cb(self, stream_details):
+ """Log the end of a track playback."""
+ await self.gw_client.log_listen(last_track=stream_details)
+
### PARSING METADATA FUNCTIONS ###
def parse_metadata_track(self, track: deezer.Track) -> MediaItemMetadata:
from aiohttp import ClientSession
from yarl import URL
+from music_assistant.common.models.media_items import StreamDetails
+
USER_AGENT_HEADER = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/79.0.3945.130 Safari/537.36"
headers={"User-Agent": USER_AGENT_HEADER},
)
result_json = await result.json()
+
if result_json["error"]:
if retry:
await self._update_user_data()
raise DeezerGWError("Received an error from API", error)
return result_json["data"][0]["media"][0], song_data["results"]
+
+ async def log_listen(
+ self, next_track: str | None = None, last_track: StreamDetails | None = None
+ ):
+ """Log the next and/or previous track of the current playback queue."""
+ if not (next_track or last_track):
+ raise DeezerGWError("last or current track information must be provided.")
+
+ payload = {}
+
+ if next_track:
+ payload["next_media"] = {"media": {"id": next_track, "type": "song"}}
+
+ if last_track:
+ seconds_streamed = min(
+ datetime.datetime.utcnow().timestamp() - last_track.data["start_ts"],
+ last_track.seconds_streamed,
+ )
+
+ payload["params"] = {
+ "media": {
+ "id": last_track.item_id,
+ "type": "song",
+ "format": last_track.data["format"],
+ },
+ "type": 1,
+ "stat": {
+ "seek": 1 if last_track.seconds_skipped else 0,
+ "pause": 0,
+ "sync": 0,
+ "next": bool(next_track),
+ },
+ "lt": int(seconds_streamed),
+ "ctxt": {"t": "search_page", "id": last_track.item_id},
+ "dev": {"v": "10020230525142740", "t": 0},
+ "ls": [],
+ "ts_listen": int(last_track.data["start_ts"]),
+ "is_shuffle": False,
+ "stream_id": str(last_track.data["stream_id"]),
+ }
+
+ await self._gw_api_call("log.listen", args=payload)