"""Models and helpers for media items."""
from __future__ import annotations
-from collections.abc import Mapping
from dataclasses import dataclass, field, fields
from time import time
from typing import Any
from mashumaro import DataClassDictMixin
-from music_assistant.common.helpers.json import json_dumps, json_loads
from music_assistant.common.helpers.uri import create_uri
from music_assistant.common.helpers.util import create_sort_name, merge_lists
from music_assistant.common.models.enums import (
MetadataTypes = int | bool | str | list[str]
-JSON_KEYS = ("artists", "metadata", "provider_mappings")
-
@dataclass
class AudioFormat(DataClassDictMixin):
def __hash__(self) -> int:
"""Return custom hash."""
- return hash((self.provider_instance, self.item_id))
+ return hash((self.provider_instance, self.item_id.lower()))
def __eq__(self, other: ProviderMapping) -> bool:
"""Check equality of two items."""
- return self.provider_instance == other.provider_instance and self.item_id == other.item_id
+ if not other:
+ return False
+ return (
+ self.provider_instance == other.provider_instance
+ and self.item_id.lower() == other.item_id.lower()
+ )
@dataclass(frozen=True)
return self
-@dataclass
+@dataclass(kw_only=True)
class MediaItem(DataClassDictMixin):
"""Base representation of a media item."""
+ media_type: MediaType
item_id: str
provider: str # provider instance id or provider domain
name: str
- provider_mappings: set[ProviderMapping] = field(default_factory=set)
+ metadata: MediaItemMetadata
+ provider_mappings: set[ProviderMapping]
# optional fields below
+ # provider_mappings: set[ProviderMapping] = field(default_factory=set)
metadata: MediaItemMetadata = field(default_factory=MediaItemMetadata)
favorite: bool = False
media_type: MediaType = MediaType.UNKNOWN
if not self.sort_name:
self.sort_name = create_sort_name(self.name)
- @classmethod
- def from_db_row(cls, db_row: Mapping):
- """Create MediaItem object from database row."""
- db_row = dict(db_row)
- db_row["provider"] = "library"
- for key in JSON_KEYS:
- if key in db_row and db_row[key] is not None:
- db_row[key] = json_loads(db_row[key])
- if "favorite" in db_row:
- db_row["favorite"] = bool(db_row["favorite"])
- db_row["item_id"] = str(db_row["item_id"])
- return cls.from_dict(db_row)
-
- def to_db_row(self) -> dict:
- """Create dict from item suitable for db."""
-
- def get_db_value(key, value) -> Any:
- """Transform value for db storage."""
- if key in JSON_KEYS:
- return json_dumps(value)
- return value
-
- return {
- key: get_db_value(key, value)
- for key, value in self.to_dict().items()
- if key
- not in [
- "item_id",
- "provider",
- "media_type",
- "uri",
- "album",
- "position",
- "track_number",
- "disc_number",
- ]
- }
-
@property
def available(self):
"""Return (calculated) availability."""
return None
return next((x for x in self.metadata.images if x.type == ImageType.THUMB), None)
- def add_provider_mapping(self, prov_mapping: ProviderMapping) -> None:
- """Add provider ID, overwrite existing entry."""
- self.provider_mappings = {
- x
- for x in self.provider_mappings
- if not (
- x.item_id == prov_mapping.item_id
- and x.provider_instance == prov_mapping.provider_instance
- )
- }
- self.provider_mappings.add(prov_mapping)
-
def __hash__(self) -> int:
"""Return custom hash."""
return hash(self.uri)
return self.uri == other.uri
-@dataclass
+@dataclass(kw_only=True)
class Artist(MediaItem):
"""Model for an artist."""
mbid: str | None = None
-@dataclass
+@dataclass(kw_only=True)
class Album(MediaItem):
"""Model for an album."""
mbid: str | None = None # release group id
-@dataclass
+@dataclass(kw_only=True)
class Track(MediaItem):
"""Model for a track."""
"""Return custom hash."""
return hash((self.provider, self.item_id))
- @property
- def image(self) -> MediaItemImage | None:
- """Return (first/random) image/thumb from metadata (if any)."""
- if image := super().image:
- return image
- # fallback to album image (use getattr to guard for ItemMapping)
- if self.album:
- return getattr(self.album, "image", None)
- return None
-
@property
def has_chapters(self) -> bool:
"""
position: int # required
-@dataclass
+@dataclass(kw_only=True)
class Playlist(MediaItem):
"""Model for a playlist."""
owner: str = ""
is_editable: bool = False
- def __hash__(self):
- """Return custom hash."""
- return hash((self.provider, self.item_id))
-
-@dataclass
+@dataclass(kw_only=True)
class Radio(MediaItem):
"""Model for a radio station."""
media_type: MediaType = MediaType.RADIO
duration: int = 172800
- def to_db_row(self) -> dict:
- """Create dict from item suitable for db."""
- val = super().to_db_row()
- val.pop("duration", None)
- return val
- def __hash__(self):
- """Return custom hash."""
- return hash((self.provider, self.item_id))
-
-
-@dataclass
+@dataclass(kw_only=True)
class BrowseFolder(MediaItem):
"""Representation of a Folder used in Browse (which contains media items)."""
label: str = ""
# subitems of this folder when expanding
items: list[MediaItemType | BrowseFolder] | None = None
+ provider_mappings: set[ProviderMapping] = field(default_factory=set)
def __post_init__(self):
"""Call after init."""
super().__post_init__()
if not self.path:
self.path = f"{self.provider}://{self.item_id}"
+ if not self.provider_mappings:
+ self.provider_mappings.add(
+ ProviderMapping(
+ item_id=self.item_id,
+ provider_domain=self.provider,
+ provider_instance=self.provider,
+ )
+ )
MediaItemType = Artist | Album | Track | Radio | Playlist | BrowseFolder
API_SCHEMA_VERSION: Final[int] = 23
MIN_SCHEMA_VERSION: Final[int] = 23
-DB_SCHEMA_VERSION: Final[int] = 24
+DB_SCHEMA_VERSION: Final[int] = 25
ROOT_LOGGER_NAME: Final[str] = "music_assistant"
if item.mbid:
match = {"mbid": item.mbid}
if db_row := await self.mass.music.database.get_row(self.db_table, match):
- cur_item = Album.from_db_row(db_row)
+ cur_item = Album.from_dict(self._parse_db_row(db_row))
# existing item found: update it
return await self.update_item_in_library(cur_item.item_id, item)
# fallback to search and match
match = {"sort_name": item.sort_name}
- for row in await self.mass.music.database.get_rows(self.db_table, match):
- row_album = Album.from_db_row(row)
+ for db_row in await self.mass.music.database.get_rows(self.db_table, match):
+ row_album = Album.from_dict(self._parse_db_row(db_row))
if compare_album(row_album, item):
cur_item = row_album
# existing item found: update it
new_item = await self.mass.music.database.insert(
self.db_table,
{
- **item.to_db_row(),
+ "name": item.name,
+ "sort_name": item.sort_name,
+ "version": item.version,
+ "favorite": item.favorite,
+ "album_type": item.album_type,
+ "year": item.year,
+ "mbid": item.mbid,
+ "metadata": serialize_to_json(item.metadata),
+ "provider_mappings": serialize_to_json(item.provider_mappings),
"artists": serialize_to_json(album_artists),
"sort_artist": sort_artist,
"timestamp_added": int(utc_timestamp()),
db_id = int(item_id) # ensure integer
db_album = await self.get_library_item(db_id)
result: list[AlbumTrack] = []
- async for album_track_row in self.mass.music.database.iter_items(
- DB_TABLE_ALBUM_TRACKS, {"album_id": db_id}
- ):
- # TODO: make this a nice join query
- track_id = album_track_row["track_id"]
- track_row = await self.mass.music.database.get_row(
- DB_TABLE_TRACKS, {"item_id": track_id}
- )
- album_track = AlbumTrack.from_db_row(
- {**track_row, **album_track_row, "album": db_album.to_dict()}
+ query = (
+ f"SELECT * FROM {DB_TABLE_TRACKS} INNER JOIN albumtracks "
+ "ON albumtracks.track_id = tracks.item_id WHERE albumtracks.album_id = :album_id"
+ )
+ track_rows = await self.mass.music.database.get_rows_from_query(query, {"album_id": db_id})
+ for album_track_row in track_rows:
+ album_track = AlbumTrack.from_dict(
+ self._parse_db_row({**album_track_row, "album": db_album.to_dict()})
)
if db_album.metadata.images:
album_track.metadata.images = db_album.metadata.images
import asyncio
import contextlib
from random import choice, random
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
from music_assistant.common.helpers.datetime import utc_timestamp
from music_assistant.common.helpers.json import serialize_to_json
limit: int = 500,
offset: int = 0,
order_by: str = "sort_name",
+ extra_query: str | None = None,
+ extra_query_params: dict[str, Any] | None = None,
album_artists_only: bool = False,
) -> PagedItems:
- """Get in-database album artists."""
+ """Get in-database (album) artists."""
+ if album_artists_only:
+ artist_query = "artists.sort_name in (select albums.sort_artist from albums)"
+ extra_query = f"{extra_query} AND {artist_query}" if extra_query else artist_query
return await super().library_items(
favorite=favorite,
search=search,
limit=limit,
offset=offset,
order_by=order_by,
- query_parts=["artists.sort_name in (select albums.sort_artist from albums)"]
- if album_artists_only
- else None,
+ extra_query=extra_query,
+ extra_query_params=extra_query_params,
)
async def tracks(
provider_instance_id_or_domain,
):
# TODO: adjust to json query instead of text search?
- query = f"SELECT * FROM tracks WHERE artists LIKE '%\"{db_artist.item_id}\"%'"
- query += f" AND provider_mappings LIKE '%\"{provider_instance_id_or_domain}\"%'"
- items = await self.mass.music.tracks.get_library_items_by_query(query)
+ query = f"WHERE tracks.artists LIKE '%\"{db_artist.item_id}\"%'"
+ query += (
+ f" AND tracks.provider_mappings LIKE '%\"{provider_instance_id_or_domain}\"%'"
+ )
+ if paged_list := await self.mass.music.tracks.library_items(extra_query=query):
+ items = paged_list.items
# store (serializable items) in cache
self.mass.create_task(
self.mass.cache.set(cache_key, [x.to_dict() for x in items], checksum=cache_checksum)
) -> list[Track]:
"""Return all tracks for an artist in the library."""
# TODO: adjust to json query instead of text search?
- query = f"SELECT * FROM tracks WHERE artists LIKE '%\"{item_id}\"%'"
- return await self.mass.music.tracks.get_library_items_by_query(query)
+ query = f"WHERE tracks.artists LIKE '%\"{item_id}\"%'"
+ paged_list = await self.mass.music.tracks.library_items(extra_query=query)
+ return paged_list.items
async def get_provider_artist_albums(
self,
provider_instance_id_or_domain,
):
# TODO: adjust to json query instead of text search?
- query = f"SELECT * FROM albums WHERE artists LIKE '%\"{db_artist.item_id}\"%'"
- query += f" AND provider_mappings LIKE '%\"{provider_instance_id_or_domain}\"%'"
- items = await self.mass.music.albums.get_library_items_by_query(query)
+ query = f"WHERE albums.artists LIKE '%\"{db_artist.item_id}\"%'"
+ query += (
+ f" AND albums.provider_mappings LIKE '%\"{provider_instance_id_or_domain}\"%'"
+ )
+ paged_list = await self.mass.music.albums.library_items(extra_query=query)
+ items = paged_list.items
else:
# edge case
items = []
) -> list[Album]:
"""Return all in-library albums for an artist."""
# TODO: adjust to json query instead of text search?
- query = f"SELECT * FROM albums WHERE artists LIKE '%\"{item_id}\"%'"
- return await self.mass.music.albums.get_library_items_by_query(query)
+ query = f"WHERE albums.artists LIKE '%\"{item_id}\"%'"
+ paged_list = await self.mass.music.albums.library_items(extra_query=query)
+ return paged_list.items
async def _add_library_item(self, item: Artist | ItemMapping) -> Artist:
"""Add a new item record to the database."""
match = {"mbid": mbid}
if db_row := await self.mass.music.database.get_row(self.db_table, match):
# existing item found: update it
- cur_item = Artist.from_db_row(db_row)
+ cur_item = Artist.from_dict(self._parse_db_row(db_row))
return await self.update_item_in_library(cur_item.item_id, item)
# fallback to exact name match
# NOTE: we match an artist by name which could theoretically lead to collisions
# but the chance is so small it is not worth the additional overhead of grabbing
# the musicbrainz id upfront
match = {"sort_name": item.sort_name}
- for row in await self.mass.music.database.get_rows(self.db_table, match):
- row_artist = Artist.from_db_row(row)
+ for db_row in await self.mass.music.database.get_rows(self.db_table, match):
+ row_artist = Artist.from_dict(self._parse_db_row(db_row))
if row_artist.sort_name == item.sort_name:
cur_item = row_artist
# existing item found: update it
# try to construct (a half baken) Artist object from it
if isinstance(item, ItemMapping):
item = Artist.from_dict(item.to_dict())
- new_item = await self.mass.music.database.insert(self.db_table, item.to_db_row())
+ new_item = await self.mass.music.database.insert(
+ self.db_table,
+ {
+ "name": item.name,
+ "sort_name": item.sort_name,
+ "favorite": item.favorite,
+ "mbid": item.mbid,
+ "metadata": serialize_to_json(item.metadata),
+ "provider_mappings": serialize_to_json(item.provider_mappings),
+ "timestamp_added": int(utc_timestamp()),
+ "timestamp_modified": int(utc_timestamp()),
+ },
+ )
db_id = new_item["item_id"]
# update/set provider_mappings table
await self._set_provider_mappings(db_id, item.provider_mappings)
import logging
from abc import ABCMeta, abstractmethod
-from collections.abc import AsyncGenerator, Iterable
+from collections.abc import AsyncGenerator, Iterable, Mapping
from contextlib import suppress
from time import time
-from typing import TYPE_CHECKING, Generic, TypeVar
+from typing import TYPE_CHECKING, Any, Generic, TypeVar
-from music_assistant.common.helpers.json import serialize_to_json
+from music_assistant.common.helpers.json import json_loads, serialize_to_json
from music_assistant.common.models.enums import EventType, MediaType, ProviderFeature
from music_assistant.common.models.errors import InvalidDataError, MediaNotFoundError
from music_assistant.common.models.media_items import (
ItemCls = TypeVar("ItemCls", bound="MediaItemType")
REFRESH_INTERVAL = 60 * 60 * 24 * 30
+JSON_KEYS = ("artists", "metadata", "provider_mappings")
class MediaControllerBase(Generic[ItemCls], metaclass=ABCMeta):
def __init__(self, mass: MusicAssistant):
"""Initialize class."""
self.mass = mass
+ self.base_query = f"SELECT * FROM {self.db_table}"
self.logger = logging.getLogger(f"{ROOT_LOGGER_NAME}.music.{self.media_type.value}")
@abstractmethod
limit: int = 500,
offset: int = 0,
order_by: str = "sort_name",
- query_parts: list[str] | None = None,
+ extra_query: str | None = None,
+ extra_query_params: dict[str, Any] | None = None,
) -> PagedItems:
"""Get in-database items."""
- sql_query = f"SELECT * FROM {self.db_table}"
- params = {}
- query_parts = query_parts or []
+ sql_query = self.base_query
+ params = extra_query_params or {}
+ query_parts: list[str] = []
+ if extra_query:
+ # prevent duplicate where statement
+ if extra_query.lower().startswith("where "):
+ extra_query = extra_query[5:]
+ query_parts.append(extra_query)
if search:
params["search"] = f"%{search}%"
if self.media_type in (MediaType.ALBUM, MediaType.TRACK):
- query_parts.append("(name LIKE :search or artists LIKE :search)")
+ query_parts.append(
+ f"({self.db_table}.name LIKE :search "
+ f" OR {self.db_table}.artists LIKE :search)"
+ )
else:
- query_parts.append("name LIKE :search")
+ query_parts.append(f"{self.db_table}.name LIKE :search")
if favorite is not None:
- query_parts.append("favorite = :favorite")
+ query_parts.append(f"{self.db_table}.favorite = :favorite")
params["favorite"] = favorite
if query_parts:
+ # concetenate all where queries
sql_query += " WHERE " + " AND ".join(query_parts)
sql_query += f" ORDER BY {order_by}"
- items = await self.get_library_items_by_query(sql_query, params, limit=limit, offset=offset)
+ items = await self._get_library_items_by_query(
+ sql_query, params, limit=limit, offset=offset
+ )
count = len(items)
if 0 < count < limit:
total = offset + count
search_query = search_query.replace("/", " ").replace("'", "")
if provider_instance_id_or_domain == "library":
return [
- self.item_cls.from_db_row(db_row)
+ self.item_cls.from_dict(self._parse_db_row(db_row))
for db_row in await self.mass.music.database.search(self.db_table, search_query)
]
prov = self.mass.get_provider(provider_instance_id_or_domain)
return (prov_mapping.provider_instance, prov_mapping.item_id)
return (None, None)
- async def get_library_items_by_query(
- self,
- custom_query: str | None = None,
- query_params: dict | None = None,
- limit: int = 500,
- offset: int = 0,
- ) -> list[ItemCls]:
- """Fetch MediaItem records from database given a custom query."""
- if custom_query is None:
- custom_query = f"SELECT * FROM {self.db_table}"
- return [
- self.item_cls.from_db_row(db_row)
- for db_row in await self.mass.music.database.get_rows_from_query(
- custom_query, query_params, limit=limit, offset=offset
- )
- ]
-
async def get_library_item(self, item_id: int | str) -> ItemCls:
"""Get record by id."""
db_id = int(item_id) # ensure integer
match = {"item_id": db_id}
if db_row := await self.mass.music.database.get_row(self.db_table, match):
- return self.item_cls.from_db_row(db_row)
+ return self.item_cls.from_dict(self._parse_db_row(db_row))
raise MediaNotFoundError(f"{self.media_type.value} not found in library: {db_id}")
async def get_library_item_by_prov_id(
) -> list[ItemCls]:
"""Fetch all records from library for given provider."""
if provider_instance_id_or_domain == "library":
- return await self.get_library_items_by_query(limit=limit, offset=offset)
+ if provider_item_ids is not None:
+ prov_ids_string = str(tuple(int(x) for x in provider_item_ids))
+ if prov_ids_string.endswith(",)"):
+ prov_ids_string = prov_ids_string.replace(",)", ")")
+ extra_query = f"item_id in {prov_ids_string}"
+ else:
+ extra_query = None
+ paged_list = await self.library_items(
+ limit=limit, offset=offset, extra_query=extra_query
+ )
+ return paged_list.items
# we use the separate provider_mappings table to perform quick lookups
# from provider id's to database id's because this is faster
# (and more compatible) than querying the provider_mappings json column
subquery = (
- f"SELECT item_id FROM {DB_TABLE_PROVIDER_MAPPINGS} "
- "WHERE ( "
+ f"SELECT item_id FROM {DB_TABLE_PROVIDER_MAPPINGS} WHERE "
+ f" media_type = '{self.media_type.value}' AND "
f"(provider_instance = '{provider_instance_id_or_domain}' "
- f"OR provider_domain = '{provider_instance_id_or_domain}') "
- ")"
+ f"OR provider_domain = '{provider_instance_id_or_domain}')"
)
if provider_item_ids is not None:
prov_ids = str(tuple(provider_item_ids))
if prov_ids.endswith(",)"):
prov_ids = prov_ids.replace(",)", ")")
subquery += f" AND provider_item_id in {prov_ids}"
- subquery += f" AND media_type = '{self.media_type.value}'"
- query = f"SELECT * FROM {self.db_table} WHERE item_id in ({subquery})"
- return await self.get_library_items_by_query(query, limit=limit, offset=offset)
+ # final query is a where query from the subquery
+ # that queries the provider_mappings table
+ query = f"WHERE {self.db_table}.item_id in ({subquery})"
+ paged_list = await self.library_items(limit=limit, offset=offset, extra_query=query)
+ return paged_list.items
async def iter_library_items_by_prov_id(
self,
async def _get_dynamic_tracks(self, media_item: ItemCls, limit: int = 25) -> list[Track]:
"""Get dynamic list of tracks for given item, fallback/default implementation."""
+ async def _get_library_items_by_query(
+ self,
+ query: str,
+ query_params: dict | None = None,
+ limit: int = 500,
+ offset: int = 0,
+ ) -> list[ItemCls]:
+ """Fetch MediaItem records from database given a custom (WHERE) clause."""
+ if query_params is None:
+ query_params = {}
+ return [
+ self.item_cls.from_dict(self._parse_db_row(db_row))
+ for db_row in await self.mass.music.database.get_rows_from_query(
+ query, query_params, limit=limit, offset=offset
+ )
+ ]
+
async def _set_provider_mappings(
self, item_id: str | int, provider_mappings: Iterable[ProviderMapping]
) -> None:
# this can happen for unavailable items
return artist
return ItemMapping.from_item(artist)
+
+ @staticmethod
+ def _parse_db_row(db_row: Mapping) -> dict[str, Any]:
+ """Parse raw db Mapping into a dict."""
+ db_row_dict = dict(db_row)
+ db_row_dict["provider"] = "library"
+ for key in JSON_KEYS:
+ if key in db_row_dict and db_row_dict[key] not in (None, ""):
+ db_row_dict[key] = json_loads(db_row_dict[key])
+ if "favorite" in db_row_dict:
+ db_row_dict["favorite"] = bool(db_row_dict["favorite"])
+ if "item_id" in db_row_dict:
+ db_row_dict["item_id"] = str(db_row_dict["item_id"])
+ if "album" not in db_row_dict and (album_id := db_row_dict.get("album_id")):
+ # handle joined result with (limited) album data as ItemMapping
+ db_row_dict["album"] = {
+ "media_type": "album",
+ "item_id": str(album_id),
+ "provider": "library",
+ "name": db_row_dict["album_name"],
+ "version": db_row_dict["album_version"],
+ }
+ db_row_dict["album"] = ItemMapping.from_dict(db_row_dict["album"])
+ if not db_row_dict["metadata"]["images"]:
+ # copy album image in case the track has no image
+ album_metadata = json_loads(db_row_dict["album_metadata"])
+ db_row_dict["metadata"]["images"] = album_metadata["images"]
+ return db_row_dict
ProviderUnavailableError,
UnsupportedFeaturedException,
)
-from music_assistant.common.models.media_items import Playlist, Track
+from music_assistant.common.models.media_items import Playlist, PlaylistTrack, Track
from music_assistant.constants import DB_TABLE_PLAYLISTS
from .base import MediaControllerBase
async def tracks(
self, item_id: str, provider_instance_id_or_domain: str, force_refresh: bool = False
- ) -> AsyncGenerator[Track, None]:
+ ) -> AsyncGenerator[PlaylistTrack, None]:
"""Return playlist tracks for the given provider playlist id."""
playlist = await self.get(
item_id, provider_instance_id_or_domain, force_refresh=force_refresh
# try name matching
match = {"name": item.name, "owner": item.owner}
if db_row := await self.mass.music.database.get_row(self.db_table, match):
- cur_item = Playlist.from_db_row(db_row)
+ cur_item = Playlist.from_dict(self._parse_db_row(db_row))
# existing item found: update it
return await self.update_item_in_library(cur_item.item_id, item)
# insert new item
item.timestamp_added = int(utc_timestamp())
item.timestamp_modified = int(utc_timestamp())
- new_item = await self.mass.music.database.insert(self.db_table, item.to_db_row())
+ new_item = await self.mass.music.database.insert(
+ self.db_table,
+ {
+ "name": item.name,
+ "sort_name": item.sort_name,
+ "owner": item.owner,
+ "is_editable": item.is_editable,
+ "favorite": item.favorite,
+ "metadata": serialize_to_json(item.metadata),
+ "provider_mappings": serialize_to_json(item.provider_mappings),
+ "timestamp_added": int(utc_timestamp()),
+ "timestamp_modified": int(utc_timestamp()),
+ },
+ )
db_id = new_item["item_id"]
# update/set provider_mappings table
await self._set_provider_mappings(db_id, item.provider_mappings)
item_id: str,
provider_instance_id_or_domain: str,
cache_checksum: Any = None,
- ) -> AsyncGenerator[Track, None]:
+ ) -> AsyncGenerator[PlaylistTrack, None]:
"""Return album tracks for the given provider album id."""
assert provider_instance_id_or_domain != "library"
provider = self.mass.get_provider(provider_instance_id_or_domain)
cache_key = f"{provider.instance_id}.playlist.{item_id}.tracks"
if cache := await self.mass.cache.get(cache_key, checksum=cache_checksum):
for track_dict in cache:
- yield Track.from_dict(track_dict)
+ yield PlaylistTrack.from_dict(track_dict)
return
# no items in cache - get listing from provider
all_items = []
# try name matching
match = {"name": item.name}
if db_row := await self.mass.music.database.get_row(self.db_table, match):
- cur_item = Radio.from_db_row(db_row)
+ cur_item = Radio.from_dict(self._parse_db_row(db_row))
# existing item found: update it
return await self.update_item_in_library(cur_item.item_id, item)
# insert new item
item.timestamp_added = int(utc_timestamp())
item.timestamp_modified = int(utc_timestamp())
- new_item = await self.mass.music.database.insert(self.db_table, item.to_db_row())
+ new_item = await self.mass.music.database.insert(
+ self.db_table,
+ {
+ "name": item.name,
+ "sort_name": item.sort_name,
+ "favorite": item.favorite,
+ "metadata": serialize_to_json(item.metadata),
+ "provider_mappings": serialize_to_json(item.provider_mappings),
+ "timestamp_added": int(utc_timestamp()),
+ "timestamp_modified": int(utc_timestamp()),
+ },
+ )
db_id = new_item["item_id"]
# update/set provider_mappings table
await self._set_provider_mappings(db_id, item.provider_mappings)
from music_assistant.common.helpers.datetime import utc_timestamp
from music_assistant.common.helpers.json import serialize_to_json
-from music_assistant.common.models.enums import EventType, MediaType, ProviderFeature
+from music_assistant.common.models.enums import AlbumType, EventType, MediaType, ProviderFeature
from music_assistant.common.models.errors import (
InvalidDataError,
MediaNotFoundError,
def __init__(self, *args, **kwargs):
"""Initialize class."""
super().__init__(*args, **kwargs)
+ self.base_query = (
+ "SELECT tracks.*, albums.item_id as album_id, "
+ "albums.name AS album_name, albums.version as album_version, "
+ "albums.metadata as album_metadata FROM tracks "
+ "LEFT JOIN albumtracks on albumtracks.track_id = tracks.item_id "
+ "LEFT JOIN albums on albums.item_id = albumtracks.album_id"
+ )
self._db_add_lock = asyncio.Lock()
# register api handlers
self.mass.register_api_command("music/tracks/library_items", self.library_items)
except MediaNotFoundError:
# edge case where playlist track has invalid albumdetails
self.logger.warning("Unable to fetch album details %s", track.album.uri)
- # prefer album image (otherwise it may look weird)
- if track.album and track.album.image and track.metadata.images:
- track.metadata.images = [track.album.image] + track.metadata.images
+ # prefer album image if album explicitly given or track has no image on its own
+ if (
+ (album_uri or not track.metadata.images)
+ and isinstance(track.album, Album)
+ and track.album.image
+ ):
+ track.metadata.images = [track.album.image]
# append full artist details to full track item
full_artists = []
for artist in track.artists:
# grab additional metadata
if not skip_metadata_lookup:
await self.mass.metadata.get_track_metadata(item)
- # fallback track image from album
- if not item.image and isinstance(item.album, Album) and item.album.image:
+ # fallback track image from album (only if albumtype = single)
+ if (
+ not item.image
+ and isinstance(item.album, Album)
+ and item.album.image
+ and item.album.album_type == AlbumType.SINGLE
+ ):
item.metadata.images.append(item.album.image)
# actually add (or update) the item in the library db
# use the lock to prevent a race condition of the same item being added twice
if item.mbid:
match = {"mbid": item.mbid}
if db_row := await self.mass.music.database.get_row(self.db_table, match):
- cur_item = Track.from_db_row(db_row)
+ cur_item = Track.from_dict(self._parse_db_row(db_row))
return await self.update_item_in_library(cur_item.item_id, item)
match = {"sort_name": item.sort_name}
- for row in await self.mass.music.database.get_rows(self.db_table, match):
- row_track = Track.from_db_row(row)
+ for db_row in await self.mass.music.database.get_rows(self.db_table, match):
+ row_track = Track.from_dict(self._parse_db_row(db_row))
track_albums = await self.albums(row_track.item_id, row_track.provider)
if compare_track(row_track, item, strict=True, track_albums=track_albums):
cur_item = row_track
new_item = await self.mass.music.database.insert(
self.db_table,
{
- **item.to_db_row(),
+ "name": item.name,
+ "sort_name": item.sort_name,
+ "version": item.version,
+ "duration": item.duration,
+ "favorite": item.favorite,
+ "mbid": item.mbid,
+ "metadata": serialize_to_json(item.metadata),
+ "provider_mappings": serialize_to_json(item.provider_mappings),
"artists": serialize_to_json(track_artists),
"sort_artist": sort_artist,
"timestamp_added": int(utc_timestamp()),
ProviderFeature,
ProviderType,
)
-from music_assistant.common.models.errors import MusicAssistantError
+from music_assistant.common.models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant.common.models.media_items import BrowseFolder, MediaItemType, SearchResults
from music_assistant.common.models.provider import SyncTask
from music_assistant.constants import (
prov = self.mass.get_provider(provider_instance)
return await prov.browse(path)
+ @api_command("music/recently_played_items")
+ async def recently_played(
+ self, limit: int = 10, media_types: list[MediaType] | None = None
+ ) -> list[MediaItemType]:
+ """Return a list of the last played items."""
+ if media_types is None:
+ media_types = [MediaType.TRACK, MediaType.RADIO]
+ media_types_str = "(" + ",".join(f'"{x}"' for x in media_types) + ")"
+ query = (
+ f"SELECT * FROM {DB_TABLE_PLAYLOG} WHERE media_type "
+ f"in {media_types_str} ORDER BY timestamp DESC"
+ )
+ db_rows = await self.mass.music.database.get_rows_from_query(query, limit=limit)
+ result: list[MediaItemType] = []
+ for db_row in db_rows:
+ with suppress(MediaNotFoundError):
+ media_type = MediaType(db_row["media_type"])
+ item = await self.get_item(media_type, db_row["item_id"], db_row["provider"])
+ result.append(item)
+ return result
+
@api_command("music/item_by_uri")
async def get_item_by_uri(self, uri: str) -> MediaItemType:
"""Fetch MediaItem by uri."""
return statistics.fmean(all_items)
return None
- async def mark_item_played(self, item_id: str, provider_instance_id_or_domain: str):
+ async def mark_item_played(
+ self, media_type: MediaType, item_id: str, provider_instance_id_or_domain: str
+ ):
"""Mark item as played in playlog."""
timestamp = utc_timestamp()
await self.database.insert(
{
"item_id": item_id,
"provider": provider_instance_id_or_domain,
+ "media_type": media_type.value,
"timestamp": timestamp,
},
allow_replace=True,
for item_id in item_ids_to_delete:
await self.database.delete(table, {"item_id": item_id})
+ if prev_version > 22 and prev_version < 25:
+ # extend playlog table with media_type column
+ await self.database.execute(
+ f"ALTER TABLE {DB_TABLE_PLAYLOG} "
+ "ADD COLUMN media_type TEXT NOT NULL DEFAULT 'track'"
+ )
+
self.logger.info(
"Database migration to version %s completed",
DB_SCHEMA_VERSION,
f"""CREATE TABLE IF NOT EXISTS {DB_TABLE_PLAYLOG}(
item_id INTEGER NOT NULL,
provider TEXT NOT NULL,
+ media_type TEXT NOT NULL DEFAULT 'track',
timestamp INTEGER DEFAULT 0,
- UNIQUE(item_id, provider));"""
+ UNIQUE(item_id, provider, media_type));"""
)
await self.database.execute(
f"""CREATE TABLE IF NOT EXISTS {DB_TABLE_ALBUMS}(
elif media_item.media_type == MediaType.PLAYLIST:
async for playlist_track in ctrl.tracks(media_item.item_id, media_item.provider):
tracks.append(playlist_track)
+ await self.mass.music.mark_item_played(
+ media_item.media_type, media_item.item_id, media_item.provider
+ )
elif media_item.media_type in (
MediaType.ARTIST,
MediaType.ALBUM,
):
tracks += await ctrl.tracks(media_item.item_id, media_item.provider)
+ await self.mass.music.mark_item_played(
+ media_item.media_type, media_item.item_id, media_item.provider
+ )
else:
# single track or radio item
tracks += [media_item]
raise err
else:
LOGGER.debug("finished media stream for: %s", streamdetails.uri)
- await mass.music.mark_item_played(streamdetails.item_id, streamdetails.provider)
finally:
# report playback
+ await mass.music.mark_item_played(
+ streamdetails.media_type, streamdetails.item_id, streamdetails.provider
+ )
if streamdetails.callback:
mass.create_task(streamdetails.callback, streamdetails)
# send analyze job to background worker
async def search(self, table: str, search: str, column: str = "name") -> list[Mapping]:
"""Search table by column."""
- sql_query = f"SELECT * FROM {table} WHERE {column} LIKE :search"
+ sql_query = f"SELECT * FROM {table} WHERE {table}.{column} LIKE :search"
params = {"search": f"%{search}%"}
return await self._db.execute_fetchall(sql_query, params)
async def get_row(self, table: str, match: dict[str, Any]) -> Mapping | None:
"""Get single row for given table where column matches keys/values."""
sql_query = f"SELECT * FROM {table} WHERE "
- sql_query += " AND ".join(f"{x} = :{x}" for x in match)
+ sql_query += " AND ".join(f"{table}.{x} = :{x}" for x in match)
async with self._db.execute(sql_query, match) as cursor:
return await cursor.fetchone()
"Player does not support next transport uri feature, "
"gapless playback is not possible."
)
- else:
- # log once if we detected that the player supports the next transport uri
- if dlna_player.supports_next_uri is None:
- dlna_player.supports_next_uri = True
- self.logger.debug("Player supports the next transport uri feature.")
self.logger.debug(
"Enqued next track (%s) to player %s",
self.mass.create_task(self._enqueue_next_track(dlna_player))
# if player does not support next uri, manual play it
if (
- not dlna_player.supports_next_uri
+ (
+ dlna_player.supports_next_uri is False
+ or (dlna_player.supports_next_uri is None and dlna_player.end_of_track_reached)
+ )
and prev_state == PlayerState.PLAYING
and current_state == PlayerState.IDLE
and dlna_player.next_url
await self.cmd_play_url(dlna_player.udn, dlna_player.next_url, dlna_player.next_item)
dlna_player.end_of_track_reached = False
dlna_player.next_url = None
+ dlna_player.supports_next_uri = False
StreamDetails,
Track,
)
-from music_assistant.constants import (
- DB_TABLE_ALBUM_TRACKS,
- DB_TABLE_TRACKS,
- VARIOUS_ARTISTS_ID_MBID,
- VARIOUS_ARTISTS_NAME,
-)
+from music_assistant.constants import VARIOUS_ARTISTS_ID_MBID, VARIOUS_ARTISTS_NAME
from music_assistant.server.controllers.cache import use_cache
from music_assistant.server.controllers.music import DB_SCHEMA_VERSION
from music_assistant.server.helpers.compare import compare_strings
}
# ruff: noqa: E501
if media_types is None or MediaType.TRACK in media_types:
- query = "SELECT * FROM tracks WHERE name LIKE :name AND provider_mappings LIKE :provider_instance"
- result.tracks = await self.mass.music.tracks.get_library_items_by_query(query, params)
+ query = (
+ "WHERE tracks.name LIKE :name AND tracks.provider_mappings LIKE :provider_instance"
+ )
+ result.tracks = (
+ await self.mass.music.tracks.library_items(
+ extra_query=query, extra_query_params=params
+ )
+ ).items
if media_types is None or MediaType.ALBUM in media_types:
- query = "SELECT * FROM albums WHERE name LIKE :name AND provider_mappings LIKE :provider_instance"
- result.albums = await self.mass.music.albums.get_library_items_by_query(query, params)
+ query = "WHERE name LIKE :name AND provider_mappings LIKE :provider_instance"
+ result.albums = (
+ await self.mass.music.albums.library_items(
+ extra_query=query, extra_query_params=params
+ )
+ ).items
if media_types is None or MediaType.ARTIST in media_types:
- query = "SELECT * FROM artists WHERE name LIKE :name AND provider_mappings LIKE :provider_instance"
- result.artists = await self.mass.music.artists.get_library_items_by_query(query, params)
+ query = "WHERE name LIKE :name AND provider_mappings LIKE :provider_instance"
+ result.artists = (
+ await self.mass.music.artists.library_items(
+ extra_query=query, extra_query_params=params
+ )
+ ).items
if media_types is None or MediaType.PLAYLIST in media_types:
- query = "SELECT * FROM playlists WHERE name LIKE :name AND provider_mappings LIKE :provider_instance"
- result.playlists = await self.mass.music.playlists.get_library_items_by_query(
- query, params
- )
+ query = "WHERE name LIKE :name AND provider_mappings LIKE :provider_instance"
+ result.playlists = (
+ await self.mass.music.playlists.library_items(
+ extra_query=query, extra_query_params=params
+ )
+ ).items
return result
async def browse(self, path: str) -> BrowseFolder:
if library_item.media_type == MediaType.TRACK:
if library_item.album:
album_ids.add(library_item.album.item_id)
- for artist in library_item.album.artists:
+ # need to fetch the library album to resolve the itemmapping
+ db_album = await self.mass.music.albums.get_library_item(
+ library_item.album.item_id
+ )
+ for artist in db_album.artists:
artist_ids.add(artist.item_id)
for artist in library_item.artists:
artist_ids.add(artist.item_id)
item_id=file_item.path,
provider=self.instance_id,
name=file_item.name.replace(f".{file_item.ext}", ""),
+ provider_mappings={
+ ProviderMapping(
+ item_id=file_item.path,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
)
playlist.is_editable = file_item.ext != "pls" # can only edit m3u playlists
-
- playlist.add_provider_mapping(
- ProviderMapping(
- item_id=file_item.path,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- )
- )
playlist.owner = self.name
checksum = f"{DB_SCHEMA_VERSION}.{file_item.checksum}"
playlist.metadata.checksum = checksum
)
if db_album is None:
raise MediaNotFoundError(f"Album not found: {prov_album_id}")
- result: list[AlbumTrack] = []
- async for album_track_row in self.mass.music.database.iter_items(
- DB_TABLE_ALBUM_TRACKS, {"album_id": db_album.item_id}
- ):
- track_row = await self.mass.music.database.get_row(
- DB_TABLE_TRACKS, {"item_id": album_track_row["track_id"]}
- )
- if f'"{self.instance_id}"' not in track_row["provider_mappings"]:
- continue
- album_track = AlbumTrack.from_db_row(
- {**track_row, **album_track_row, "album": db_album.to_dict()}
- )
- if db_album.metadata.images:
- album_track.metadata.images = db_album.metadata.images
- result.append(album_track)
- return sorted(result, key=lambda x: (x.disc_number, x.track_number))
+ album_tracks = await self.mass.music.albums.tracks(db_album.item_id, db_album.provider)
+ return [
+ track
+ for track in album_tracks
+ if any(x.provider_instance == self.instance_id for x in track.provider_mappings)
+ ]
async def get_playlist_tracks(self, prov_playlist_id: str) -> AsyncGenerator[Track, None]:
"""Get playlist tracks for given playlist id."""
"provider": self.instance_id,
"name": name,
"version": version,
+ "provider_mappings": {
+ ProviderMapping(
+ item_id=file_item.path,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(tags.format),
+ sample_rate=tags.sample_rate,
+ bit_depth=tags.bits_per_sample,
+ bit_rate=tags.bit_rate,
+ ),
+ isrc=tags.isrc,
+ )
+ },
}
if playlist_position is not None:
track = PlaylistTrack(
artist.mbid = tags.musicbrainz_artistids[index]
track.artists.append(artist)
- # cover image - prefer embedded image, fallback to album cover
+ # handle embedded cover image
if tags.has_cover_image:
# we do not actually embed the image in the metadata because that would consume too
# much space and bandwidth. Instead we set the filename as value so the image can
track.metadata.images = [
MediaItemImage(ImageType.THUMB, file_item.path, self.instance_id)
]
- elif track.album and track.album.image:
- track.metadata.images = [track.album.image]
if track.album and not track.album.metadata.images:
# set embedded cover on album if it does not have one yet
for artist in track.album.artists:
artist.metadata.checksum = track.metadata.checksum
- track.add_provider_mapping(
- ProviderMapping(
- item_id=file_item.path,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- audio_format=AudioFormat(
- content_type=ContentType.try_parse(tags.format),
- sample_rate=tags.sample_rate,
- bit_depth=tags.bits_per_sample,
- bit_rate=tags.bit_rate,
- ),
- isrc=tags.isrc,
- )
- )
return track
async def _parse_artist(
)
async def _get_or_create_artist_by_name(self, artist_name) -> Artist:
- query = (
- "SELECT * FROM artists WHERE name = :name AND provider_mappings = :provider_instance"
- )
+ query = "WHERE name = :name AND provider_mappings = :provider_instance"
params = {
"name": f"%{artist_name}%",
"provider_instance": f"%{self.instance_id}%",
}
- db_artists = await self.mass.music.artists.get_library_items_by_query(query, params)
- if db_artists:
- return ItemMapping.from_item(db_artists[0])
+ paged_list = await self.mass.music.artists.library_items(
+ extra_query=query, extra_query_params=params
+ )
+ if paged_list and paged_list.items:
+ return ItemMapping.from_item(paged_list.items[0])
artist_id = FAKE_ARTIST_PREFIX + artist_name
- artist = Artist(item_id=artist_id, name=artist_name, provider=self.domain)
- artist.add_provider_mapping(
- ProviderMapping(
- item_id=str(artist_id),
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- )
+ artist = Artist(
+ item_id=artist_id,
+ name=artist_name,
+ provider=self.domain,
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(artist_id),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
)
return artist
item_id=album_id,
provider=self.domain,
name=plex_album.title,
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(album_id),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=plex_album.getWebURL(),
+ )
+ },
)
if plex_album.year:
album.year = plex_album.year
album.artists.append(
self._get_item_mapping(MediaType.ARTIST, plex_album.parentKey, plex_album.parentTitle)
)
-
- album.add_provider_mapping(
- ProviderMapping(
- item_id=str(album_id),
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- url=plex_album.getWebURL(),
- )
- )
return album
async def _parse_artist(self, plex_artist: PlexArtist) -> Artist:
artist_id = plex_artist.key
if not artist_id:
raise InvalidDataError("Artist does not have a valid ID")
- artist = Artist(item_id=artist_id, name=plex_artist.title, provider=self.domain)
+ artist = Artist(
+ item_id=artist_id,
+ name=plex_artist.title,
+ provider=self.domain,
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(artist_id),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=plex_artist.getWebURL(),
+ )
+ },
+ )
if plex_artist.summary:
artist.metadata.description = plex_artist.summary
if thumb := plex_artist.firstAttr("thumb", "parentThumb", "grandparentThumb"):
artist.metadata.images = [MediaItemImage(ImageType.THUMB, thumb, self.instance_id)]
- artist.add_provider_mapping(
- ProviderMapping(
- item_id=str(artist_id),
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- url=plex_artist.getWebURL(),
- )
- )
return artist
async def _parse_playlist(self, plex_playlist: PlexPlaylist) -> Playlist:
"""Parse a Plex Playlist response to a Playlist object."""
playlist = Playlist(
- item_id=plex_playlist.key, provider=self.domain, name=plex_playlist.title
+ item_id=plex_playlist.key,
+ provider=self.domain,
+ name=plex_playlist.title,
+ provider_mappings={
+ ProviderMapping(
+ item_id=plex_playlist.key,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=plex_playlist.getWebURL(),
+ )
+ },
)
if plex_playlist.summary:
playlist.metadata.description = plex_playlist.summary
if thumb := plex_playlist.firstAttr("thumb", "parentThumb", "grandparentThumb"):
playlist.metadata.images = [MediaItemImage(ImageType.THUMB, thumb, self.instance_id)]
playlist.is_editable = True
- playlist.add_provider_mapping(
- ProviderMapping(
- item_id=plex_playlist.key,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- url=plex_playlist.getWebURL(),
- )
- )
return playlist
async def _parse_track(
track_class = AlbumTrack
else:
track_class = Track
+ if plex_track.media:
+ available = True
+ content = plex_track.media[0].container
+ else:
+ available = False
+ content = None
track = track_class(
item_id=plex_track.key,
provider=self.instance_id,
name=plex_track.title,
**extra_init_kwargs or {},
+ provider_mappings={
+ ProviderMapping(
+ item_id=plex_track.key,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ available=available,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(content)
+ if content
+ else ContentType.UNKNOWN,
+ ),
+ url=plex_track.getWebURL(),
+ )
+ },
)
if plex_track.originalTitle and plex_track.originalTitle != plex_track.grandparentTitle:
available = False
content = None
- if plex_track.media:
- available = True
- content = plex_track.media[0].container
-
- track.add_provider_mapping(
- ProviderMapping(
- item_id=plex_track.key,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- available=available,
- audio_format=AudioFormat(
- content_type=ContentType.try_parse(content) if content else ContentType.UNKNOWN,
- ),
- url=plex_track.getWebURL(),
- )
- )
return track
async def search(
async def _parse_artist(self, artist_obj: dict):
"""Parse qobuz artist object to generic layout."""
artist = Artist(
- item_id=str(artist_obj["id"]), provider=self.domain, name=artist_obj["name"]
+ item_id=str(artist_obj["id"]),
+ provider=self.domain,
+ name=artist_obj["name"],
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(artist_obj["id"]),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=artist_obj.get("url", f'https://open.qobuz.com/artist/{artist_obj["id"]}'),
+ )
+ },
)
if artist.item_id == VARIOUS_ARTISTS_ID:
artist.mbid = VARIOUS_ARTISTS_ID_MBID
artist.name = VARIOUS_ARTISTS_NAME
- artist.add_provider_mapping(
- ProviderMapping(
- item_id=str(artist_obj["id"]),
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- url=artist_obj.get("url", f'https://open.qobuz.com/artist/{artist_obj["id"]}'),
- )
- )
if img := self.__get_image(artist_obj):
artist.metadata.images = [MediaItemImage(ImageType.THUMB, img)]
if artist_obj.get("biography"):
provider=self.domain,
name=name,
version=version,
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(album_obj["id"]),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ available=album_obj["streamable"] and album_obj["displayable"],
+ barcode=album_obj["upc"],
+ audio_format=AudioFormat(
+ content_type=ContentType.FLAC,
+ sample_rate=album_obj["maximum_sampling_rate"] * 1000,
+ bit_depth=album_obj["maximum_bit_depth"],
+ ),
+ url=album_obj.get("url", f'https://open.qobuz.com/album/{album_obj["id"]}'),
+ )
+ },
)
- album.add_provider_mapping(
- ProviderMapping(
- item_id=str(album_obj["id"]),
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- available=album_obj["streamable"] and album_obj["displayable"],
- barcode=album_obj["upc"],
- audio_format=AudioFormat(
- content_type=ContentType.FLAC,
- sample_rate=album_obj["maximum_sampling_rate"] * 1000,
- bit_depth=album_obj["maximum_bit_depth"],
- ),
- url=album_obj.get("url", f'https://open.qobuz.com/album/{album_obj["id"]}'),
- )
- )
-
album.artists.append(await self._parse_artist(artist_obj or album_obj["artist"]))
if (
album_obj.get("product_type", "") == "single"
name=name,
version=version,
duration=track_obj["duration"],
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(track_obj["id"]),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ available=track_obj["streamable"] and track_obj["displayable"],
+ audio_format=AudioFormat(
+ content_type=ContentType.FLAC,
+ sample_rate=track_obj["maximum_sampling_rate"] * 1000,
+ bit_depth=track_obj["maximum_bit_depth"],
+ ),
+ url=track_obj.get("url", f'https://open.qobuz.com/track/{track_obj["id"]}'),
+ isrc=track_obj.get("isrc"),
+ )
+ },
**extra_init_kwargs,
)
if track_obj.get("performer") and "Various " not in track_obj["performer"]:
if img := self.__get_image(track_obj):
track.metadata.images = [MediaItemImage(ImageType.THUMB, img)]
- track.add_provider_mapping(
- ProviderMapping(
- item_id=str(track_obj["id"]),
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- available=track_obj["streamable"] and track_obj["displayable"],
- audio_format=AudioFormat(
- content_type=ContentType.FLAC,
- sample_rate=track_obj["maximum_sampling_rate"] * 1000,
- bit_depth=track_obj["maximum_bit_depth"],
- ),
- url=track_obj.get("url", f'https://open.qobuz.com/track/{track_obj["id"]}'),
- isrc=track_obj.get("isrc"),
- )
- )
return track
async def _parse_playlist(self, playlist_obj):
provider=self.domain,
name=playlist_obj["name"],
owner=playlist_obj["owner"]["name"],
- )
- playlist.add_provider_mapping(
- ProviderMapping(
- item_id=str(playlist_obj["id"]),
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- url=playlist_obj.get(
- "url", f'https://open.qobuz.com/playlist/{playlist_obj["id"]}'
- ),
- )
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(playlist_obj["id"]),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=playlist_obj.get(
+ "url", f'https://open.qobuz.com/playlist/{playlist_obj["id"]}'
+ ),
+ )
+ },
)
playlist.is_editable = (
playlist_obj["owner"]["id"] == self._user_auth_info["user"]["id"]
async def _parse_radio(self, radio_obj: dict) -> Radio:
"""Parse Radio object from json obj returned from api."""
- radio = Radio(item_id=radio_obj.uuid, provider=self.domain, name=radio_obj.name)
- radio.add_provider_mapping(
- ProviderMapping(
- item_id=radio_obj.uuid,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- )
+ radio = Radio(
+ item_id=radio_obj.uuid,
+ provider=self.domain,
+ name=radio_obj.name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=radio_obj.uuid,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
)
radio.metadata.label = radio_obj.tags
radio.metadata.popularity = radio_obj.votes
"""Process SlimClient Output Underrun Event."""
player = self.mass.players.get(client.player_id)
self.logger.error("Player %s ran out of buffer", player.display_name)
- if player.synced_to:
- # if player is synced, resync it
- await self.cmd_sync(player.player_id, player.synced_to)
- else:
- await self.cmd_stop(client.player_id)
+ player.state = PlayerState.IDLE
+ self.mass.players.update(client.player_id)
def _handle_client_sync(self, client: SlimClient) -> None:
"""Synchronize audio of a sync client."""
return
if sonos_player.need_elapsed_time_workaround:
# no pause allowed when radio/flow mode is active
- await self.cmd_stop()
+ await self.cmd_stop(player_id)
return
await asyncio.to_thread(sonos_player.soco.pause)
artist_id = artist_obj["id"]
if not artist_id:
raise InvalidDataError("Artist does not have a valid ID")
- artist = Artist(item_id=artist_id, name=artist_obj["username"], provider=self.domain)
+ artist = Artist(
+ item_id=artist_id,
+ name=artist_obj["username"],
+ provider=self.domain,
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(artist_id),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=f"https://soundcloud.com/{permalink}",
+ )
+ },
+ )
if artist_obj.get("description"):
artist.metadata.description = artist_obj["description"]
if artist_obj.get("avatar_url"):
img_url = artist_obj["avatar_url"]
artist.metadata.images = [MediaItemImage(ImageType.THUMB, img_url)]
- artist.add_provider_mapping(
- ProviderMapping(
- item_id=str(artist_id),
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- url=f"https://soundcloud.com/{permalink}",
- )
- )
return artist
async def _parse_playlist(self, playlist_obj: dict) -> Playlist:
item_id=playlist_obj["id"],
provider=self.domain,
name=playlist_obj["title"],
- )
- playlist.add_provider_mapping(
- ProviderMapping(
- item_id=playlist_obj["id"],
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- )
+ provider_mappings={
+ ProviderMapping(
+ item_id=playlist_obj["id"],
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
)
playlist.is_editable = False
if playlist_obj.get("description"):
name=name,
version=version,
duration=track_obj["duration"] / 1000,
+ provider_mappings={
+ ProviderMapping(
+ item_id=track_obj["id"],
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.MP3,
+ ),
+ url=track_obj["permalink_url"],
+ )
+ },
**{"position": playlist_position} if playlist_position else {},
)
user_id = track_obj["user"]["id"]
track.metadata.genres = track_obj["genre"]
if track_obj.get("tag_list"):
track.metadata.style = track_obj["tag_list"]
- track.add_provider_mapping(
- ProviderMapping(
- item_id=track_obj["id"],
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- audio_format=AudioFormat(
- content_type=ContentType.MP3,
- ),
- url=track_obj["permalink_url"],
- )
- )
return track
async def _parse_artist(self, artist_obj):
"""Parse spotify artist object to generic layout."""
- artist = Artist(item_id=artist_obj["id"], provider=self.domain, name=artist_obj["name"])
- artist.add_provider_mapping(
- ProviderMapping(
- item_id=artist_obj["id"],
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- url=artist_obj["external_urls"]["spotify"],
- )
+ artist = Artist(
+ item_id=artist_obj["id"],
+ provider=self.domain,
+ name=artist_obj["name"],
+ provider_mappings={
+ ProviderMapping(
+ item_id=artist_obj["id"],
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=artist_obj["external_urls"]["spotify"],
+ )
+ },
)
if "genres" in artist_obj:
artist.metadata.genres = set(artist_obj["genres"])
async def _parse_album(self, album_obj: dict):
"""Parse spotify album object to generic layout."""
name, version = parse_title_and_version(album_obj["name"])
- album = Album(item_id=album_obj["id"], provider=self.domain, name=name, version=version)
+ barcode = None
+ if "external_ids" in album_obj and album_obj["external_ids"].get("upc"):
+ barcode = album_obj["external_ids"]["upc"]
+ if "external_ids" in album_obj and album_obj["external_ids"].get("ean"):
+ barcode = album_obj["external_ids"]["ean"]
+ album = Album(
+ item_id=album_obj["id"],
+ provider=self.domain,
+ name=name,
+ version=version,
+ provider_mappings={
+ ProviderMapping(
+ item_id=album_obj["id"],
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
+ url=album_obj["external_urls"]["spotify"],
+ barcode=barcode,
+ )
+ },
+ )
for artist_obj in album_obj["artists"]:
album.artists.append(await self._parse_artist(artist_obj))
album.metadata.copyright = album_obj["copyrights"][0]["text"]
if album_obj.get("explicit"):
album.metadata.explicit = album_obj["explicit"]
- barcode = None
- if "external_ids" in album_obj and album_obj["external_ids"].get("upc"):
- barcode = album_obj["external_ids"]["upc"]
- if "external_ids" in album_obj and album_obj["external_ids"].get("ean"):
- barcode = album_obj["external_ids"]["ean"]
- album.add_provider_mapping(
- ProviderMapping(
- item_id=album_obj["id"],
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
- url=album_obj["external_urls"]["spotify"],
- barcode=barcode,
- )
- )
return album
async def _parse_track(
name=name,
version=version,
duration=track_obj["duration_ms"] / 1000,
+ provider_mappings={
+ ProviderMapping(
+ item_id=track_obj["id"],
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.OGG,
+ bit_rate=320,
+ ),
+ isrc=track_obj.get("external_ids", {}).get("isrc"),
+ url=track_obj["external_urls"]["spotify"],
+ available=not track_obj["is_local"] and track_obj["is_playable"],
+ )
+ },
**extra_init_kwargs,
)
track.metadata.explicit = True
if track_obj.get("popularity"):
track.metadata.popularity = track_obj["popularity"]
- track.add_provider_mapping(
- ProviderMapping(
- item_id=track_obj["id"],
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- audio_format=AudioFormat(
- content_type=ContentType.OGG,
- bit_rate=320,
- ),
- isrc=track_obj.get("external_ids", {}).get("isrc"),
- url=track_obj["external_urls"]["spotify"],
- available=not track_obj["is_local"] and track_obj["is_playable"],
- )
- )
return track
async def _parse_playlist(self, playlist_obj):
provider=self.domain,
name=playlist_obj["name"],
owner=playlist_obj["owner"]["display_name"],
- )
- playlist.add_provider_mapping(
- ProviderMapping(
- item_id=playlist_obj["id"],
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- url=playlist_obj["external_urls"]["spotify"],
- )
+ provider_mappings={
+ ProviderMapping(
+ item_id=playlist_obj["id"],
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=playlist_obj["external_urls"]["spotify"],
+ )
+ },
)
playlist.is_editable = (
playlist_obj["owner"]["id"] == self._sp_user["id"] or playlist_obj["collaborative"]
async def _parse_artist(self, artist_obj: TidalArtist, full_details: bool = False) -> Artist:
"""Parse tidal artist object to generic layout."""
artist_id = artist_obj.id
- artist = Artist(item_id=artist_id, provider=self.instance_id, name=artist_obj.name)
- artist.add_provider_mapping(
- ProviderMapping(
- item_id=str(artist_id),
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- url=f"http://www.tidal.com/artist/{artist_id}",
- )
+ artist = Artist(
+ item_id=artist_id,
+ provider=self.instance_id,
+ name=artist_obj.name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(artist_id),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=f"http://www.tidal.com/artist/{artist_id}",
+ )
+ },
)
# metadata
if full_details and artist_obj.name != "Various Artists":
name = album_obj.name
version = album_obj.version if album_obj.version is not None else None
album_id = album_obj.id
- album = Album(item_id=album_id, provider=self.instance_id, name=name, version=version)
+ album = Album(
+ item_id=album_id,
+ provider=self.instance_id,
+ name=name,
+ version=version,
+ provider_mappings={
+ ProviderMapping(
+ item_id=album_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.FLAC,
+ ),
+ url=f"http://www.tidal.com/album/{album_id}",
+ available=album_obj.available,
+ )
+ },
+ )
for artist_obj in album_obj.artists:
album.artists.append(await self._parse_artist(artist_obj=artist_obj))
if album_obj.type == "ALBUM":
album.upc = album_obj.universal_product_number
album.year = int(album_obj.year)
- available = album_obj.available
- album.add_provider_mapping(
- ProviderMapping(
- item_id=album_id,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- audio_format=AudioFormat(
- content_type=ContentType.FLAC,
- ),
- url=f"http://www.tidal.com/album/{album_id}",
- available=available,
- )
- )
# metadata
album.metadata.copyright = album_obj.copyright
album.metadata.explicit = album_obj.explicit
name=track_obj.name,
version=version,
duration=track_obj.duration,
- **extra_init_kwargs,
+ provider_mappings={
+ ProviderMapping(
+ item_id=track_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.FLAC,
+ sample_rate=44100,
+ bit_depth=16,
+ ),
+ isrc=track_obj.isrc,
+ url=f"http://www.tidal.com/tracks/{track_id}",
+ available=track_obj.available,
+ )
+ }
+ ** extra_init_kwargs,
)
track.album = self.get_item_mapping(
media_type=MediaType.ALBUM,
for track_artist in track_obj.artists:
artist = await self._parse_artist(artist_obj=track_artist)
track.artists.append(artist)
- available = track_obj.available
- track.add_provider_mapping(
- ProviderMapping(
- item_id=track_id,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- audio_format=AudioFormat(
- content_type=ContentType.FLAC,
- sample_rate=44100,
- bit_depth=16,
- ),
- isrc=track_obj.isrc,
- url=f"http://www.tidal.com/tracks/{track_id}",
- available=available,
- )
- )
# metadata
track.metadata.explicit = track_obj.explicit
track.metadata.popularity = track_obj.popularity
provider=self.instance_id,
name=playlist_obj.name,
owner=creator_name,
- )
- playlist.add_provider_mapping(
- ProviderMapping(
- item_id=playlist_id,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- url=f"http://www.tidal.com/playlists/{playlist_id}",
- )
+ provider_mappings={
+ ProviderMapping(
+ item_id=playlist_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=f"http://www.tidal.com/playlists/{playlist_id}",
+ )
+ },
)
is_editable = bool(creator_id and str(creator_id) == self._tidal_user_id)
playlist.is_editable = is_editable
content_type = ContentType.try_parse(stream["media_type"])
bit_rate = stream.get("bitrate", 128) # TODO !
- radio = Radio(item_id=item_id, provider=self.domain, name=name)
- radio.add_provider_mapping(
- ProviderMapping(
- item_id=item_id,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- audio_format=AudioFormat(
- content_type=content_type,
- bit_rate=bit_rate,
- ),
- details=url,
- )
+ radio = Radio(
+ item_id=item_id,
+ provider=self.domain,
+ name=name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=item_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=content_type,
+ bit_rate=bit_rate,
+ ),
+ details=url,
+ )
+ },
)
# preset number is used for sorting (not present at stream time)
preset_number = details.get("preset_number")
media_type=MediaType.RADIO,
data=item_id,
)
- item_id, media_type = item_id.split("--", 1)
- stream_info = await self.__get_data("Tune.ashx", id=item_id)
+ stream_item_id, media_type = item_id.split("--", 1)
+ stream_info = await self.__get_data("Tune.ashx", id=stream_item_id)
for stream in stream_info["body"]:
if stream["media_type"] != media_type:
continue
all_members = self._get_active_members(
player_id, only_powered=False, skip_sync_childs=False
)
- group_player.max_sample_rate = max(x.max_sample_rate for x in all_members)
+ if all_members:
+ group_player.max_sample_rate = max(x.max_sample_rate for x in all_members)
group_player.group_childs = list(x.player_id for x in all_members)
# read the state from the first active group member
for member in all_members:
"""Parse plain URL to MediaItem of type Radio or Track."""
item_id, url, media_info = await self._get_media_info(item_id_or_url, force_refresh)
is_radio = media_info.get("icy-name") or not media_info.duration
+ provider_mappings = {
+ ProviderMapping(
+ item_id=item_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(media_info.format),
+ sample_rate=media_info.sample_rate,
+ bit_depth=media_info.bits_per_sample,
+ bit_rate=media_info.bit_rate,
+ ),
+ )
+ }
if is_radio or force_radio:
# treat as radio
media_item = Radio(
item_id=item_id,
provider=self.domain,
name=media_info.get("icy-name") or media_info.title,
+ provider_mappings=provider_mappings,
)
else:
media_item = Track(
name=media_info.title,
duration=int(media_info.duration or 0),
artists=[await self.get_artist(artist) for artist in media_info.artists],
+ provider_mappings=provider_mappings,
)
- media_item.provider_mappings = {
- ProviderMapping(
- item_id=item_id,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- audio_format=AudioFormat(
- content_type=ContentType.try_parse(media_info.format),
- sample_rate=media_info.sample_rate,
- bit_depth=media_info.bits_per_sample,
- bit_rate=media_info.bit_rate,
- ),
- )
- }
if media_info.has_cover_image:
media_item.metadata.images = [MediaItemImage(ImageType.THUMB, url, True)]
return media_item
item_id=album_id,
name=name,
provider=self.domain,
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(album_id),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
)
if album_obj.get("year") and album_obj["year"].isdigit():
album.year = album_obj["year"]
else:
album_type = AlbumType.UNKNOWN
album.album_type = album_type
- album.add_provider_mapping(
- ProviderMapping(
- item_id=str(album_id),
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- )
- )
return album
async def _parse_artist(self, artist_obj: dict) -> Artist:
artist_id = VARIOUS_ARTISTS_YTM_ID
if not artist_id:
raise InvalidDataError("Artist does not have a valid ID")
- artist = Artist(item_id=artist_id, name=artist_obj["name"], provider=self.domain)
+ artist = Artist(
+ item_id=artist_id,
+ name=artist_obj["name"],
+ provider=self.domain,
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(artist_id),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=f"https://music.youtube.com/channel/{artist_id}",
+ )
+ },
+ )
if "description" in artist_obj:
artist.metadata.description = artist_obj["description"]
if "thumbnails" in artist_obj and artist_obj["thumbnails"]:
artist.metadata.images = await self._parse_thumbnails(artist_obj["thumbnails"])
- artist.add_provider_mapping(
- ProviderMapping(
- item_id=str(artist_id),
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- url=f"https://music.youtube.com/channel/{artist_id}",
- )
- )
return artist
async def _parse_playlist(self, playlist_obj: dict) -> Playlist:
if playlist_id in YT_PERSONAL_PLAYLISTS:
playlist_id = f"{playlist_id}{YT_PLAYLIST_ID_DELIMITER}{self.instance_id}"
playlist_name = f"{playlist_name} ({self.name})"
- playlist = Playlist(item_id=playlist_id, provider=self.domain, name=playlist_name)
+ playlist = Playlist(
+ item_id=playlist_id,
+ provider=self.domain,
+ name=playlist_name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=playlist_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
if "description" in playlist_obj:
playlist.metadata.description = playlist_obj["description"]
if "thumbnails" in playlist_obj and playlist_obj["thumbnails"]:
if playlist_obj.get("privacy") and playlist_obj.get("privacy") == "PRIVATE":
is_editable = True
playlist.is_editable = is_editable
- playlist.add_provider_mapping(
- ProviderMapping(
- item_id=playlist_id,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- )
- )
if authors := playlist_obj.get("author"):
if isinstance(authors, str):
playlist.owner = authors
item_id=track_obj["videoId"],
provider=self.domain,
name=track_obj["title"],
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(track_obj["videoId"]),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ available=track_obj.get("isAvailable", True),
+ audio_format=AudioFormat(
+ content_type=ContentType.M4A,
+ ),
+ )
+ },
**extra_init_kwargs,
)
track.duration = int(track_obj["duration"])
elif "duration_seconds" in track_obj and str(track_obj["duration_seconds"]).isdigit():
track.duration = int(track_obj["duration_seconds"])
- available = True
- if "isAvailable" in track_obj:
- available = track_obj["isAvailable"]
- track.add_provider_mapping(
- ProviderMapping(
- item_id=str(track_obj["videoId"]),
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- available=available,
- audio_format=AudioFormat(
- content_type=ContentType.M4A,
- ),
- )
- )
return track
async def _get_signature_timestamp(self):