"""All constants for Music Assistant."""
-__version__ = "0.1.1"
+__version__ = "0.1.2"
REQUIRED_PYTHON_VER = "3.7"
# configuration keys/attributes
return final_list
-def unique_item_ids(objects):
- """Filter duplicate item id's from list of items."""
- return set({object_.item_id for object_ in objects})
-
-
def try_load_json_file(jsonfile):
"""Try to load json from file."""
try:
]
async def get_tracks_from_provider_ids(
- self, provider_id: Union[str, List[str]], prov_item_ids: List[str]
- ) -> dict:
+ self,
+ provider_id: Union[str, List[str], Set[str]],
+ prov_item_ids: Union[List[str], Set[str]],
+ ) -> List[Track]:
"""Get track records for the given prov_ids."""
provider_ids = provider_id if isinstance(provider_id, list) else [provider_id]
prov_id_str = ",".join([f'"{x}"' for x in provider_ids])
)
if db_item.checksum != playlist.checksum:
db_item = await self.mass.database.add_playlist(playlist)
+ # precache playlist tracks
+ for playlist_track in await self.mass.music.get_playlist_tracks(
+ playlist.item_id, provider_id
+ ):
+ # try to find substitutes for unavailable tracks with matching technique
+ if not playlist_track.available:
+ await self.mass.music.get_track(
+ playlist_track.item_id,
+ playlist_track.provider,
+ playlist_track,
+ )
cur_db_ids.add(db_item.item_id)
await self.mass.database.add_to_library(
db_item.item_id, MediaType.Playlist, playlist.provider
)
- # precache playlist tracks
- for playlist_track in await self.mass.music.get_playlist_tracks(
- playlist.item_id, provider_id
- ):
- # try to find substitutes for unavailable tracks with matching technique
- if not db_item.available and not playlist_track.available:
- if playlist_track.provider == "database":
- await self.mass.music.match_track(playlist_track)
- else:
- await self.mass.music.add_track(playlist_track)
+
# process playlist deletions
for db_id in prev_db_ids:
if db_id not in cur_db_ids:
import asyncio
import logging
-from typing import List
+from typing import List, Set, Tuple
from music_assistant.constants import (
EVENT_ALBUM_ADDED,
compare_track,
)
from music_assistant.helpers.musicbrainz import MusicBrainz
-from music_assistant.helpers.util import unique_item_ids
from music_assistant.helpers.web import api_route
from music_assistant.models.media_types import (
Album,
"""Async initialize of module."""
@property
- def providers(self) -> List[MusicProvider]:
+ def providers(self) -> Tuple[MusicProvider]:
"""Return all providers of type musicprovider."""
return self.mass.get_providers(ProviderType.MUSIC_PROVIDER)
)
# retrieve list of db items
db_tracks = await self.mass.database.get_tracks_from_provider_ids(
- [x.provider for x in album.provider_ids],
- [x.item_id for x in all_prov_tracks],
+ {x.provider for x in album.provider_ids},
+ {x.item_id for x in all_prov_tracks},
)
# combine provider tracks with db tracks
return [
]
@api_route("albums/:provider_id/:item_id/versions")
- async def get_album_versions(self, item_id: str, provider_id: str) -> List[Album]:
+ async def get_album_versions(self, item_id: str, provider_id: str) -> Set[Album]:
"""Return all versions of an album we can find on all providers."""
album = await self.get_album(item_id, provider_id)
- provider_ids = [
+ provider_ids = {
item.id for item in self.mass.get_providers(ProviderType.MUSIC_PROVIDER)
- ]
+ }
search_query = f"{album.artist.name} {album.name}"
- return [
+ return {
prov_item
for prov_items in await asyncio.gather(
*[
)
for prov_item in prov_items.albums
if compare_strings(prov_item.artist.name, album.artist.name)
- ]
+ }
@api_route("tracks/:provider_id/:item_id/versions")
- async def get_track_versions(self, item_id: str, provider_id: str) -> List[Track]:
+ async def get_track_versions(self, item_id: str, provider_id: str) -> Set[Track]:
"""Return all versions of a track we can find on all providers."""
track = await self.get_track(item_id, provider_id)
- provider_ids = [
+ provider_ids = {
item.id for item in self.mass.get_providers(ProviderType.MUSIC_PROVIDER)
- ]
- search_query = f"{track.artists[0].name} {track.name}"
- return [
+ }
+ first_artist = next(iter(track.artists))
+ search_query = f"{first_artist.name} {track.name}"
+ return {
prov_item
for prov_items in await asyncio.gather(
*[
)
for prov_item in prov_items.tracks
if compare_artists(prov_item.artists, track.artists)
- ]
+ }
@api_route("playlists/:provider_id/:item_id/tracks")
async def get_playlist_tracks(self, item_id: str, provider_id: str) -> List[Track]:
checksum=cache_checksum,
)
db_tracks = await self.mass.database.get_tracks_from_provider_ids(
- provider_id, [x.item_id for x in playlist_tracks]
+ provider_id, {x.item_id for x in playlist_tracks}
)
# combine provider tracks with db tracks
return [
):
"""Return combined result of provider item and db result."""
for db_item in db_items:
- if item.item_id in [x.item_id for x in db_item.provider_ids]:
+ if item.item_id in {x.item_id for x in db_item.provider_ids}:
item = db_item
break
if index is not None and not item.position:
item.disc_number = disc_number
if track_number is not None:
item.track_number = track_number
- # make sure artists are unique
- # if hasattr(item, "artists"):
- # item.artists = unique_item_ids(item.artists)
return item
@api_route("artists/:provider_id/:item_id/tracks")
- async def get_artist_toptracks(self, item_id: str, provider_id: str) -> List[Track]:
+ async def get_artist_toptracks(self, item_id: str, provider_id: str) -> Set[Track]:
"""Return top tracks for an artist."""
artist = await self.get_artist(item_id, provider_id)
# get results from all providers
- all_prov_tracks = [
+ all_prov_tracks = {
track
for prov_tracks in await asyncio.gather(
*[
]
)
for track in prov_tracks
- ]
+ }
# retrieve list of db items
db_tracks = await self.mass.database.get_tracks_from_provider_ids(
- [x.provider for x in artist.provider_ids],
- [x.item_id for x in all_prov_tracks],
+ {x.provider for x in artist.provider_ids},
+ {x.item_id for x in all_prov_tracks},
)
# combine provider tracks with db tracks and filter duplicate itemid's
return {await self.__process_item(item, db_tracks) for item in all_prov_tracks}
- # return unique_item_ids(
- # [await self.__process_item(item, db_tracks) for item in all_prov_tracks]
- # )
async def _get_provider_artist_toptracks(
self, item_id: str, provider_id: str
)
@api_route("artists/:provider_id/:item_id/albums")
- async def get_artist_albums(self, item_id: str, provider_id: str) -> List[Album]:
+ async def get_artist_albums(self, item_id: str, provider_id: str) -> Set[Album]:
"""Return (all) albums for an artist."""
artist = await self.get_artist(item_id, provider_id)
# get results from all providers
- all_prov_albums = [
+ all_prov_albums = {
album
for prov_albums in await asyncio.gather(
*[
]
)
for album in prov_albums
- ]
+ }
# retrieve list of db items
db_tracks = await self.mass.database.get_albums_from_provider_ids(
[x.provider for x in artist.provider_ids],
[x.item_id for x in all_prov_albums],
)
# combine provider tracks with db tracks and filter duplicate itemid's
- return unique_item_ids(
- [await self.__process_item(item, db_tracks) for item in all_prov_albums]
- )
+ return {await self.__process_item(item, db_tracks) for item in all_prov_albums}
async def _get_provider_artist_albums(
self, item_id: str, provider_id: str
if not provider or not provider.available:
LOGGER.error("Provider %s is not available", provider_id)
return []
- cache_key = f"{provider_id}.artist_albums.{item_id}"
+ cache_key = f"{provider_id}.artistalbums.{item_id}"
return await cached(
self.cache,
cache_key,
artist_albums = await self.get_artist_albums(
db_artist.item_id, db_artist.provider
)
- for ref_album in artist_albums[:50]:
+ for ref_album in artist_albums:
if ref_album.album_type == AlbumType.Compilation:
continue
searchstr = "%s %s" % (db_artist.name, ref_album.name)
async def register_player_control(self, control_id: str, control: PlayerControl):
"""Register a playercontrol with the player manager."""
control.mass = self.mass
- control.type = PlayerControlType(control.type)
self._controls[control_id] = control
LOGGER.info(
"New PlayerControl (%s) registered: %s\\%s",
await self.cmd_power_on(player_id)
# load items into the queue
player_queue = self.get_player_queue(player_id)
- if queue_opt == QueueOption.Replace or (
- len(queue_items) > 10 and queue_opt in [QueueOption.Play, QueueOption.Next]
- ):
+ if queue_opt == QueueOption.Replace:
+ return await player_queue.load(queue_items)
+ if queue_opt in [QueueOption.Play, QueueOption.Next] and len(queue_items) > 100:
return await player_queue.load(queue_items)
if queue_opt == QueueOption.Next:
return await player_queue.insert(queue_items, 1)
class MediaItem(DataClassDictMixin):
"""Representation of a media item."""
- item_id: str = ""
- provider: str = ""
+ item_id: str
+ provider: str
name: str = ""
metadata: Dict[str, Any] = field(default_factory=dict)
provider_ids: Set[MediaItemProviderId] = field(default_factory=set)
@dataclass
-class Artist(MediaItem, DataClassDictMixin):
+class Artist(MediaItem):
"""Model for an artist."""
media_type: MediaType = MediaType.Artist
class ItemMapping(DataClassDictMixin):
"""Representation of a minimized item object."""
- item_id: str = ""
- provider: str = ""
+ item_id: str
+ provider: str
name: str = ""
media_type: MediaType = MediaType.Artist
@dataclass
-class Album(MediaItem, DataClassDictMixin):
+class Album(MediaItem):
"""Model for an album."""
media_type: MediaType = MediaType.Album
Off = "off"
-@dataclass
+@dataclass(frozen=True)
class DeviceInfo(DataClassDictMixin):
"""Model for a player's deviceinfo."""
structure to override common player commands.
"""
- # pylint: disable=no-member
-
- type: PlayerControlType = PlayerControlType.UNKNOWN
- control_id: str = ""
- provider: str = ""
- name: str = ""
+ type: PlayerControlType
+ control_id: str
+ provider: str
+ name: str
state: Any = None
+ def __hash__(self):
+ """Return custom hash."""
+ return hash((self.type, self.provider, self.control_id))
+
async def set_state(self, new_state: Any) -> None:
"""Handle command to set the state for a player control."""
# by default we just signal an event on the eventbus
# pickup this event (e.g. from the websocket api)
# or override this method with your own implementation.
-
+ # pylint: disable=no-member
self.mass.signal_event(f"players/controls/{self.control_id}/state", new_state)
features: Set[PlayerFeature] = field(default_factory=set)
active_queue: str = None
+ def __hash__(self):
+ """Return custom hash."""
+ return hash((self.provider_id, self.player_id))
+
def update(self, new_obj: "PlayerState") -> Set[str]:
"""Update state from other PlayerState instance and return changed keys."""
changed_keys = set()
:param offset: offset from current queue position
"""
- if (
- not self.items
- or self.cur_index is None
- or self.cur_index == 0
- or (self.cur_index + offset > len(self.items))
- ):
+ if not self.items or self.cur_index is None:
return await self.load(queue_items)
insert_at_index = self.cur_index + offset
for index, item in enumerate(queue_items):
LOGGER.debug(
"cmd_queue_insert not supported by player, fallback to cmd_queue_load "
)
- self._items = self._items[self.cur_index :]
+ self._items = self._items[self.cur_index + offset :]
return await self.player.cmd_queue_load(self._items)
self.mass.signal_event(EVENT_QUEUE_ITEMS_UPDATED, self)
self.mass.add_job(self._save_state())
"""Play single uri on player."""
player_queue = self.mass.players.get_player_queue(self.player_id)
if player_queue.use_queue_stream:
- # create CC queue so that skip and previous will work
- queue_item = QueueItem(name="Music Assistant", uri=uri)
+ # create (fake) CC queue so that skip and previous will work
+ queue_item = QueueItem(
+ item_id=uri, provider="mass", name="Music Assistant", uri=uri
+ )
return await self.cmd_queue_load([queue_item, queue_item])
await self.chromecast_command(self._chromecast.play_media, uri, "audio/flac")
return [
await self._parse_album(item)
for item in await self._get_all_items(endpoint, params, key="albums")
- if (item and item["id"])
+ if (item and item["id"] and str(item["artist"]["id"]) == prov_artist_id)
]
async def get_artist_toptracks(self, prov_artist_id) -> List[Track]:
params = {
"playlist_id": prov_playlist_id,
"track_ids": ",".join(prov_track_ids),
+ "playlist_track_ids": ",".join(prov_track_ids),
}
return await self._get_data("playlist/addTracks", params)
playlist_track_ids = set()
params = {"playlist_id": prov_playlist_id, "extra": "tracks"}
for track in await self._get_all_items("playlist/get", params, key="tracks"):
- if track["id"] in prov_track_ids:
- playlist_track_ids.add(track["playlist_track_id"])
+ if str(track["id"]) in prov_track_ids:
+ playlist_track_ids.add(str(track["playlist_track_id"]))
params = {
"playlist_id": prov_playlist_id,
- "track_ids": ",".join(playlist_track_ids),
+ "playlist_track_ids": ",".join(playlist_track_ids),
}
return await self._get_data("playlist/deleteTracks", params)
async def _parse_playlist(self, playlist_obj):
"""Parse qobuz playlist object to generic layout."""
playlist = Playlist(
- item_id=playlist_obj["id"],
+ item_id=str(playlist_obj["id"]),
provider=PROV_ID,
name=playlist_obj["name"],
owner=playlist_obj["owner"]["name"],
await ws_client.prepare(request)
request.app["clients"].append(ws_client)
- try:
- # handle incoming messages
- async for msg in ws_client:
+ # handle incoming messages
+ async for msg in ws_client:
+ try:
if msg.type == WSMsgType.error:
LOGGER.warning(
"ws connection closed with exception %s", ws_client.exception()
await self._handle_event(
ws_client, json_msg["event"], json_msg.get("data")
)
- except AuthenticationError as exc: # pylint:disable=broad-except
- # disconnect client on auth errors
- await self._send_json(ws_client, error=str(exc), **json_msg)
- await ws_client.close(message=str(exc).encode())
- except Exception as exc: # pylint:disable=broad-except
- # log the error only
- await self._send_json(ws_client, error=str(exc), **json_msg)
- LOGGER.error("Error with WS client", exc_info=exc)
+ except AuthenticationError as exc: # pylint:disable=broad-except
+ # disconnect client on auth errors
+ await self._send_json(ws_client, error=str(exc), **json_msg)
+ await ws_client.close(message=str(exc).encode())
+ except Exception as exc: # pylint:disable=broad-except
+ # log the error only
+ await self._send_json(ws_client, error=str(exc), **json_msg)
+ LOGGER.error("Error with WS client", exc_info=exc)
# websocket disconnected
request.app["clients"].remove(ws_client)