"""Deezer music provider support for MusicAssistant."""
-import datetime
import hashlib
import uuid
from asyncio import TaskGroup
import deezer
from aiohttp import ClientSession, ClientTimeout
from Crypto.Cipher import Blowfish
+from deezer import exceptions as deezer_exceptions
+from music_assistant.common.helpers.datetime import utc_timestamp
from music_assistant.common.models.config_entries import (
ConfigEntry,
ConfigValueType,
return self.parse_artist(
artist=await self.client.get_artist(artist_id=int(prov_artist_id))
)
- except deezer.exceptions.DeezerErrorResponse as error:
+ except deezer_exceptions.DeezerErrorResponse as error:
self.logger.warning("Failed getting artist: %s", error)
async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
try:
return self.parse_album(album=await self.client.get_album(album_id=int(prov_album_id)))
- except deezer.exceptions.DeezerErrorResponse as error:
+ except deezer_exceptions.DeezerErrorResponse as error:
self.logger.warning("Failed getting album: %s", error)
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
return self.parse_playlist(
playlist=await self.client.get_playlist(playlist_id=int(prov_playlist_id)),
)
- except deezer.exceptions.DeezerErrorResponse as error:
+ except deezer_exceptions.DeezerErrorResponse as error:
self.logger.warning("Failed getting playlist: %s", error)
async def get_track(self, prov_track_id: str) -> Track:
track=await self.client.get_track(track_id=int(prov_track_id)),
user_country=self.gw_client.user_country,
)
- except deezer.exceptions.DeezerErrorResponse as error:
+ except deezer_exceptions.DeezerErrorResponse as error:
self.logger.warning("Failed getting track: %s", error)
async def get_album_tracks(self, prov_album_id: str) -> list[AlbumTrack]:
headers["Range"] = f"bytes={skip_bytes}-"
buffer = bytearray()
- streamdetails.data["start_ts"] = datetime.datetime.utcnow().timestamp()
+ streamdetails.data["start_ts"] = utc_timestamp()
streamdetails.data["stream_id"] = uuid.uuid1()
self.mass.create_task(self.gw_client.log_listen(next_track=streamdetails.item_id))
async with self.mass.http_session.get(
remotely_accessible=True,
)
],
- checksum=playlist.checksum,
+ cache_checksum=playlist.checksum,
),
is_editable=creator.id == self.user.id,
owner=creator.name,
from aiohttp import ClientSession
from yarl import URL
+from music_assistant.common.helpers.datetime import utc_timestamp
from music_assistant.common.models.streamdetails import StreamDetails
USER_AGENT_HEADER = (
async def _update_user_data(self) -> None:
user_data = await self._gw_api_call("deezer.getUserData", False)
if not user_data["results"]["USER"]["USER_ID"]:
- await self._get_cookie()
+ await self._set_cookie()
user_data = await self._gw_api_call("deezer.getUserData", False)
if not user_data["results"]["OFFER_ID"]:
- msg = "Free subscriptions cannot be used in MA."
+ msg = "Free subscriptions cannot be used in MA. Make sure you set a valid ARL."
raise DeezerGWError(msg)
self._gw_csrf_token = user_data["results"]["checkForm"]
if last_track:
seconds_streamed = min(
- datetime.datetime.utcnow().timestamp() - last_track.data["start_ts"],
+ utc_timestamp() - last_track.data["start_ts"],
last_track.seconds_streamed,
)
},
"type": 1,
"stat": {
- "seek": 1 if last_track.seconds_skipped else 0,
+ "seek": 1 if seconds_streamed < last_track.duration else 0,
"pause": 0,
"sync": 0,
"next": bool(next_track),