SINGLE = "single"
COMPILATION = "compilation"
EP = "ep"
+ PODCAST = "podcast"
+ AUDIOBOOK = "audiobook"
UNKNOWN = "unknown"
AIFF = "aiff"
WMA = "wma"
M4A = "m4a"
+ M4B = "m4b"
DSF = "dsf"
WAVPACK = "wv"
PCM_S16LE = "s16le" # PCM signed 16-bit little-endian
MetadataTypes = int | bool | str | list[str]
JSON_KEYS = ("artists", "artist", "albums", "metadata", "provider_mappings")
+JOINED_KEYS = ("barcode", "isrc")
@dataclass(frozen=True)
type: ImageType
url: str
- is_file: bool = False # indicator that image is local filepath instead of url
+ source: str = "http" # set to instance_id of file provider if path is local
def __hash__(self):
"""Return custom hash."""
return hash(self.url)
+@dataclass(frozen=True)
+class MediaItemChapter(DataClassDictMixin):
+ """Model for a chapter."""
+
+ chapter_id: int
+ position_start: float
+ position_end: float | None = None
+ title: str | None = None
+
+ def __hash__(self):
+ """Return custom hash."""
+ return hash(self.number)
+
+
@dataclass
class MediaItemMetadata(DataClassDictMixin):
"""Model for a MediaItem's metadata."""
ean: str | None = None
label: str | None = None
links: set[MediaItemLink] | None = None
+ chapters: list[MediaItemChapter] | None = None
performers: set[str] | None = None
preview: str | None = None
replaygain: float | None = None
if new_val is None:
continue
cur_val = getattr(self, fld.name)
- if isinstance(cur_val, list):
+ if cur_val is None or allow_overwrite: # noqa: SIM114
+ setattr(self, fld.name, new_val)
+ elif isinstance(cur_val, list):
new_val = merge_lists(cur_val, new_val)
setattr(self, fld.name, new_val)
elif isinstance(cur_val, set):
new_val = cur_val.update(new_val)
setattr(self, fld.name, new_val)
- elif cur_val is None or allow_overwrite: # noqa: SIM114
- setattr(self, fld.name, new_val)
elif new_val and fld.name in ("checksum", "popularity", "last_refresh"):
# some fields are always allowed to be overwritten
# (such as checksum and last_refresh)
# sort_name and uri are auto generated, do not override unless really needed
sort_name: str | None = None
uri: str | None = None
- # timestamp is used to determine when the item was added to the library
- timestamp: int = 0
+ # timestamps to determine when the item was added/modified to the db
+ timestamp_added: int = 0
+ timestamp_modified: int = 0
def __post_init__(self):
"""Call after init."""
for key in JSON_KEYS:
if key in db_row and db_row[key] is not None:
db_row[key] = json_loads(db_row[key])
+ for key in JOINED_KEYS:
+ if key not in db_row:
+ continue
+ db_row[key] = db_row[key].strip()
+ db_row[key] = db_row[key].split(";") if db_row[key] else []
if "in_library" in db_row:
db_row["in_library"] = bool(db_row["in_library"])
if db_row.get("albums"):
def to_db_row(self) -> dict:
"""Create dict from item suitable for db."""
+
+ def get_db_value(key, value) -> Any:
+ """Transform value for db storage."""
+ if key in JSON_KEYS:
+ return json_dumps(value)
+ if key in JOINED_KEYS:
+ return ";".join(value)
+ return value
+
return {
- key: json_dumps(value) if key in JSON_KEYS else value
+ key: get_db_value(key, value)
for key, value in self.to_dict().items()
if key
not in [
}
self.provider_mappings.add(prov_mapping)
- @property
- def last_refresh(self) -> int:
- """Return timestamp the metadata was last refreshed (0 if full data never retrieved)."""
- return self.metadata.last_refresh or 0
-
def __hash__(self):
"""Return custom hash."""
return hash((self.media_type, self.provider, self.item_id))
year: int | None = None
artists: list[Artist | ItemMapping] = field(default_factory=list)
album_type: AlbumType = AlbumType.UNKNOWN
- upc: str | None = None
+ barcode: set[str] = field(default_factory=set)
musicbrainz_id: str | None = None # release group id
@property
media_type: MediaType = MediaType.TRACK
duration: int = 0
version: str = ""
- isrc: str | None = None
+ isrc: set[str] = field(default_factory=set)
musicbrainz_id: str | None = None # Recording ID
artists: list[Artist | ItemMapping] = field(default_factory=list)
# album track only
return getattr(self.album, "image", None)
return None
- @property
- def isrcs(self) -> tuple[str, ...]:
- """Split multiple values in isrc field."""
- # sometimes the isrc contains multiple values, split by semicolon
- if not self.isrc:
- return tuple()
- return tuple(self.isrc.split(";"))
-
@property
def artist(self) -> Artist | ItemMapping | None:
"""Return (first) artist of track."""
"""Set (first/only) artist of track."""
self.artists = [artist]
+ @property
+ def has_chapters(self) -> bool:
+ """
+ Return boolean if this Track has chapters.
+
+ This is often an indicator that this track is an episode from a
+ Podcast or AudioBook.
+ """
+ return self.metadata and self.metadata.chapters and len(self.metadata.chapters) > 1
+
@dataclass
class Playlist(MediaItem):
__version__: Final[str] = "2.0.0b12"
-SCHEMA_VERSION: Final[int] = 19
+SCHEMA_VERSION: Final[int] = 20
ROOT_LOGGER_NAME: Final[str] = "music_assistant"
DB_TABLE_SETTINGS,
DEFAULT_DB_CACHE,
ROOT_LOGGER_NAME,
+ SCHEMA_VERSION,
)
from music_assistant.server.helpers.database import DatabaseConnection
from music_assistant.server import MusicAssistant
LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.cache")
-SCHEMA_VERSION = 1
class CacheController:
from random import choice, random
from typing import TYPE_CHECKING
+from music_assistant.common.helpers.datetime import utc_timestamp
from music_assistant.common.helpers.json import serialize_to_json
from music_assistant.common.models.enums import EventType, ProviderFeature
from music_assistant.common.models.errors import MediaNotFoundError, UnsupportedFeaturedException
)
return db_item
- async def add_db_item(self, item: Album, overwrite_existing: bool = False) -> Album:
+ async def add_db_item(self, item: Album) -> Album:
"""Add a new record to the database."""
- assert item.provider_mappings, f"Album {item.name} is missing provider id(s)"
+ assert item.provider_mappings, "Item is missing provider mapping(s)"
assert item.artist, f"Album {item.name} is missing artist"
async with self._db_add_lock:
cur_item = None
- # always try to grab existing item by musicbrainz_id/upc
+ # always try to grab existing item by musicbrainz_id
if item.musicbrainz_id:
match = {"musicbrainz_id": item.musicbrainz_id}
cur_item = await self.mass.music.database.get_row(self.db_table, match)
- if not cur_item and item.upc:
- match = {"upc": item.upc}
- cur_item = await self.mass.music.database.get_row(self.db_table, match)
+ # try barcode/upc
+ if not cur_item and item.barcode:
+ for barcode in item.barcode:
+ if search_result := await self.mass.music.database.search(
+ self.db_table, barcode, "barcode"
+ ):
+ cur_item = Album.from_db_row(search_result[0])
+ break
if not cur_item:
# fallback to search and match
for row in await self.mass.music.database.search(self.db_table, item.name):
break
if cur_item:
# update existing
- return await self.update_db_item(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
+ return await self.update_db_item(cur_item.item_id, item)
# insert new item
album_artists = await self._get_album_artists(item, cur_item)
**item.to_db_row(),
"artists": serialize_to_json(album_artists) or None,
"sort_artist": sort_artist,
+ "timestamp_added": int(utc_timestamp()),
+ "timestamp_modified": int(utc_timestamp()),
},
)
item_id = new_item["item_id"]
self,
item_id: int,
item: Album,
- overwrite: bool = False,
) -> Album:
"""Update Album record in the database."""
- assert item.provider_mappings, f"Album {item.name} is missing provider id(s)"
+ assert item.provider_mappings, "Item is missing provider mapping(s)"
assert item.artist, f"Album {item.name} is missing artist"
cur_item = await self.get_db_item(item_id)
-
- if overwrite:
- metadata = item.metadata
- metadata.last_refresh = None
- provider_mappings = item.provider_mappings
- album_artists = await self._get_album_artists(item, overwrite=True)
+ is_file_provider = item.provider.startswith("filesystem")
+ metadata = cur_item.metadata.update(item.metadata, is_file_provider)
+ provider_mappings = {*cur_item.provider_mappings, *item.provider_mappings}
+ if is_file_provider:
+ album_artists = await self._get_album_artists(cur_item)
else:
- is_file_provider = item.provider.startswith("filesystem")
- metadata = cur_item.metadata.update(item.metadata, is_file_provider)
- provider_mappings = {*cur_item.provider_mappings, *item.provider_mappings}
- album_artists = await self._get_album_artists(item, cur_item)
-
+ album_artists = await self._get_album_artists(cur_item, item)
+ cur_item.barcode.update(item.barcode)
if item.album_type != AlbumType.UNKNOWN:
album_type = item.album_type
else:
self.db_table,
{"item_id": item_id},
{
- "name": item.name if overwrite else cur_item.name,
- "sort_name": item.sort_name if overwrite else cur_item.sort_name,
+ "name": item.name if is_file_provider else cur_item.name,
+ "sort_name": item.sort_name if is_file_provider else cur_item.sort_name,
"sort_artist": sort_artist,
- "version": item.version if overwrite else cur_item.version,
+ "version": item.version if is_file_provider else cur_item.version,
"year": item.year or cur_item.year,
- "upc": item.upc or cur_item.upc,
- "album_type": album_type,
+ "barcode": ";".join(cur_item.barcode),
+ "album_type": album_type.value,
"artists": serialize_to_json(album_artists) or None,
"metadata": serialize_to_json(metadata),
"provider_mappings": serialize_to_json(provider_mappings),
"musicbrainz_id": item.musicbrainz_id or cur_item.musicbrainz_id,
+ "timestamp_modified": int(utc_timestamp()),
},
)
# update/set provider_mappings table
self,
db_album: Album,
updated_album: Album | None = None,
- overwrite: bool = False,
) -> list[ItemMapping]:
"""Extract (database) album artist(s) as ItemMapping."""
album_artists = set()
if not album:
continue
for artist in album.artists:
- album_artists.add(await self._get_artist_mapping(artist, overwrite))
+ album_artists.add(await self._get_artist_mapping(artist))
# use intermediate set to prevent duplicates
# filter various artists if multiple artists
if len(album_artists) > 1:
album_artists = {x for x in album_artists if (x.name != VARIOUS_ARTISTS)}
return list(album_artists)
- async def _get_artist_mapping(
- self, artist: Artist | ItemMapping, overwrite: bool = False
- ) -> ItemMapping:
+ async def _get_artist_mapping(self, artist: Artist | ItemMapping) -> ItemMapping:
"""Extract (database) track artist as ItemMapping."""
- if overwrite:
- artist = await self.mass.music.artists.add_db_item(artist, overwrite_existing=True)
if artist.provider == "database":
if isinstance(artist, ItemMapping):
return artist
import contextlib
import itertools
from random import choice, random
-from time import time
from typing import TYPE_CHECKING, Any
+from music_assistant.common.helpers.datetime import utc_timestamp
from music_assistant.common.helpers.json import serialize_to_json
from music_assistant.common.models.enums import EventType, ProviderFeature
from music_assistant.common.models.errors import MediaNotFoundError, UnsupportedFeaturedException
)
return items
- async def add_db_item(self, item: Artist, overwrite_existing: bool = False) -> Artist:
+ async def add_db_item(self, item: Artist) -> Artist:
"""Add a new item record to the database."""
assert isinstance(item, Artist), "Not a full Artist object"
- assert item.provider_mappings, "Artist is missing provider id(s)"
+ assert item.provider_mappings, "Item is missing provider mapping(s)"
# enforce various artists name + id
if compare_strings(item.name, VARIOUS_ARTISTS):
item.musicbrainz_id = VARIOUS_ARTISTS_ID
break
if cur_item:
# update existing
- return await self.update_db_item(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
+ return await self.update_db_item(cur_item.item_id, item)
# insert item
- if item.in_library and not item.timestamp:
- item.timestamp = int(time())
+ item.timestamp_added = int(utc_timestamp())
+ item.timestamp_modified = int(utc_timestamp())
new_item = await self.mass.music.database.insert(self.db_table, item.to_db_row())
item_id = new_item["item_id"]
# update/set provider_mappings table
self,
item_id: int,
item: Artist,
- overwrite: bool = False,
) -> Artist:
"""Update Artist record in the database."""
+ assert item.provider_mappings, "Item is missing provider mapping(s)"
cur_item = await self.get_db_item(item_id)
- if overwrite:
- metadata = item.metadata
- provider_mappings = item.provider_mappings
- else:
- is_file_provider = item.provider.startswith("filesystem")
- metadata = cur_item.metadata.update(item.metadata, is_file_provider)
- provider_mappings = {*cur_item.provider_mappings, *item.provider_mappings}
+ is_file_provider = item.provider.startswith("filesystem")
+ metadata = cur_item.metadata.update(item.metadata, is_file_provider)
+ provider_mappings = {*cur_item.provider_mappings, *item.provider_mappings}
# enforce various artists name + id
if compare_strings(item.name, VARIOUS_ARTISTS):
self.db_table,
{"item_id": item_id},
{
- "name": item.name if overwrite else cur_item.name,
- "sort_name": item.sort_name if overwrite else cur_item.sort_name,
+ "name": item.name if is_file_provider else cur_item.name,
+ "sort_name": item.sort_name if is_file_provider else cur_item.sort_name,
"musicbrainz_id": item.musicbrainz_id or cur_item.musicbrainz_id,
"metadata": serialize_to_json(metadata),
"provider_mappings": serialize_to_json(provider_mappings),
+ "timestamp_modified": int(utc_timestamp()),
},
)
# update/set provider_mappings table
import logging
from abc import ABCMeta, abstractmethod
from collections.abc import AsyncGenerator
+from contextlib import suppress
from time import time
from typing import TYPE_CHECKING, Generic, TypeVar
provider_domain=provider_domain,
provider_instance=provider_instance,
)
- if db_item and (time() - db_item.last_refresh) > REFRESH_INTERVAL:
+ if db_item and (time() - (db_item.metadata.last_refresh or 0)) > REFRESH_INTERVAL:
# it's been too long since the full metadata was last retrieved (or never at all)
force_refresh = True
if db_item and force_refresh:
async def set_db_library(self, item_id: int, in_library: bool) -> None:
"""Set the in-library bool on a database item."""
match = {"item_id": item_id}
- timestamp = int(time()) if in_library else 0
- await self.mass.music.database.update(
- self.db_table, match, {"in_library": in_library, "timestamp": timestamp}
- )
+ await self.mass.music.database.update(self.db_table, match, {"in_library": in_library})
db_item = await self.get_db_item(item_id)
self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
x for x in db_item.provider_mappings if x.provider_instance != provider_instance
}
match = {"item_id": item_id}
- await self.mass.music.database.update(
- self.db_table,
- match,
- {"provider_mappings": serialize_to_json(db_item.provider_mappings)},
- )
- self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
-
- # NOTE: If the item has no providers left we leave an orphan item in the db
- # to easily reinstate when a new provider attaches to it.
-
- self.logger.debug("removed provider %s from item id %s", provider_instance, item_id)
+ if db_item.provider_mappings:
+ await self.mass.music.database.update(
+ self.db_table,
+ match,
+ {"provider_mappings": serialize_to_json(db_item.provider_mappings)},
+ )
+ self.logger.debug("removed provider %s from item id %s", provider_instance, item_id)
+ self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
+ else:
+ # delete item if it has no more providers
+ with suppress(AssertionError):
+ await self.delete_db_item(item_id)
async def delete_db_item(self, item_id: int, recursive: bool = False) -> None: # noqa: ARG002
"""Delete record from the database."""
from time import time
from typing import Any
+from music_assistant.common.helpers.datetime import utc_timestamp
from music_assistant.common.helpers.json import serialize_to_json
from music_assistant.common.helpers.uri import create_uri
from music_assistant.common.models.enums import EventType, MediaType, ProviderFeature
# invalidate cache by updating the checksum
await self.get(db_playlist_id, "database", force_refresh=True)
- async def add_db_item(self, item: Playlist, overwrite_existing: bool = False) -> Playlist:
+ async def add_db_item(self, item: Playlist) -> Playlist:
"""Add a new record to the database."""
+ assert item.provider_mappings, "Item is missing provider mapping(s)"
async with self._db_add_lock:
match = {"name": item.name, "owner": item.owner}
if cur_item := await self.mass.music.database.get_row(self.db_table, match):
# update existing
- return await self.update_db_item(
- cur_item["item_id"], item, overwrite=overwrite_existing
- )
+ return await self.update_db_item(cur_item["item_id"], item)
# insert new item
+ item.timestamp_added = int(utc_timestamp())
+ item.timestamp_modified = int(utc_timestamp())
new_item = await self.mass.music.database.insert(self.db_table, item.to_db_row())
item_id = new_item["item_id"]
# update/set provider_mappings table
self,
item_id: int,
item: Playlist,
- overwrite: bool = False,
) -> Playlist:
"""Update Playlist record in the database."""
+ assert item.provider_mappings, "Item is missing provider mapping(s)"
cur_item = await self.get_db_item(item_id)
- if overwrite:
- metadata = item.metadata
- provider_mappings = item.provider_mappings
- else:
- metadata = cur_item.metadata.update(item.metadata)
- provider_mappings = {*cur_item.provider_mappings, *item.provider_mappings}
-
+ metadata = cur_item.metadata.update(item.metadata)
+ provider_mappings = {*cur_item.provider_mappings, *item.provider_mappings}
await self.mass.music.database.update(
self.db_table,
{"item_id": item_id},
"is_editable": item.is_editable,
"metadata": serialize_to_json(metadata),
"provider_mappings": serialize_to_json(provider_mappings),
+ "timestamp_modified": int(utc_timestamp()),
},
)
# update/set provider_mappings table
import asyncio
from time import time
+from music_assistant.common.helpers.datetime import utc_timestamp
from music_assistant.common.helpers.json import serialize_to_json
from music_assistant.common.models.enums import EventType, MediaType
from music_assistant.common.models.media_items import Radio, Track
)
return db_item
- async def add_db_item(self, item: Radio, overwrite_existing: bool = False) -> Radio:
+ async def add_db_item(self, item: Radio) -> Radio:
"""Add a new item record to the database."""
- assert item.provider_mappings
+ assert item.provider_mappings, "Item is missing provider mapping(s)"
async with self._db_add_lock:
match = {"name": item.name}
if cur_item := await self.mass.music.database.get_row(self.db_table, match):
# update existing
- return await self.update_db_item(
- cur_item["item_id"], item, overwrite=overwrite_existing
- )
+ return await self.update_db_item(cur_item["item_id"], item)
# insert new item
+ item.timestamp_added = int(utc_timestamp())
+ item.timestamp_modified = int(utc_timestamp())
new_item = await self.mass.music.database.insert(self.db_table, item.to_db_row())
item_id = new_item["item_id"]
# update/set provider_mappings table
self,
item_id: int,
item: Radio,
- overwrite: bool = False,
) -> Radio:
"""Update Radio record in the database."""
+ assert item.provider_mappings, "Item is missing provider mapping(s)"
cur_item = await self.get_db_item(item_id)
- if overwrite:
- metadata = item.metadata
- provider_mappings = item.provider_mappings
- else:
- metadata = cur_item.metadata.update(item.metadata)
- provider_mappings = {*cur_item.provider_mappings, *item.provider_mappings}
-
+ metadata = cur_item.metadata.update(item.metadata)
+ provider_mappings = {*cur_item.provider_mappings, *item.provider_mappings}
match = {"item_id": item_id}
await self.mass.music.database.update(
self.db_table,
"sort_name": item.sort_name,
"metadata": serialize_to_json(metadata),
"provider_mappings": serialize_to_json(provider_mappings),
+ "timestamp_modified": int(utc_timestamp()),
},
)
# update/set provider_mappings table
import asyncio
+from music_assistant.common.helpers.datetime import utc_timestamp
from music_assistant.common.helpers.json import serialize_to_json
from music_assistant.common.models.enums import EventType, MediaType, ProviderFeature
from music_assistant.common.models.errors import MediaNotFoundError, UnsupportedFeaturedException
"No Music Provider found that supports requesting similar tracks."
)
- async def add_db_item(self, item: Track, overwrite_existing: bool = False) -> Track:
+ async def add_db_item(self, item: Track) -> Track:
"""Add a new item record to the database."""
assert isinstance(item, Track), "Not a full Track object"
assert item.artists, "Track is missing artist(s)"
if item.musicbrainz_id:
match = {"musicbrainz_id": item.musicbrainz_id}
cur_item = await self.mass.music.database.get_row(self.db_table, match)
- for isrc in item.isrcs:
- match = {"isrc": isrc}
- cur_item = await self.mass.music.database.get_row(self.db_table, match)
+ for isrc in item.isrc:
+ if search_result := await self.mass.music.database.search(
+ self.db_table, isrc, "isrc"
+ ):
+ cur_item = Track.from_db_row(search_result[0])
+ break
if not cur_item:
# fallback to matching
match = {"sort_name": item.sort_name}
break
if cur_item:
# update existing
- return await self.update_db_item(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
+ return await self.update_db_item(cur_item.item_id, item)
# no existing match found: insert new item
track_artists = await self._get_track_artists(item)
- track_albums = await self._get_track_albums(item, overwrite=overwrite_existing)
+ track_albums = await self._get_track_albums(item)
sort_artist = track_artists[0].sort_name if track_artists else ""
sort_album = track_albums[0].sort_name if track_albums else ""
new_item = await self.mass.music.database.insert(
"albums": serialize_to_json(track_albums),
"sort_artist": sort_artist,
"sort_album": sort_album,
+ "timestamp_added": int(utc_timestamp()),
+ "timestamp_modified": int(utc_timestamp()),
},
)
item_id = new_item["item_id"]
self,
item_id: int,
item: Track,
- overwrite: bool = False,
) -> Track:
"""Update Track record in the database, merging data."""
cur_item = await self.get_db_item(item_id)
-
- if overwrite:
- metadata = item.metadata
- provider_mappings = item.provider_mappings
- metadata.last_refresh = None
- # we store a mapping to artists/albums on the item for easier access/listings
- track_artists = await self._get_track_artists(item, overwrite=True)
- track_albums = await self._get_track_albums(item, overwrite=True)
+ is_file_provider = item.provider.startswith("filesystem")
+ metadata = cur_item.metadata.update(item.metadata, is_file_provider)
+ provider_mappings = {*cur_item.provider_mappings, *item.provider_mappings}
+ cur_item.isrc.update(item.isrc)
+ # ID3 tags from file providers are leading for core metadata
+ if is_file_provider:
+ track_artists = await self._get_track_artists(item)
+ track_albums = await self._get_track_albums(item)
else:
- metadata = cur_item.metadata.update(item.metadata, "file" in item.provider)
- provider_mappings = {*cur_item.provider_mappings, *item.provider_mappings}
track_artists = await self._get_track_artists(cur_item, item)
track_albums = await self._get_track_albums(cur_item, item)
self.db_table,
{"item_id": item_id},
{
- "name": item.name if overwrite else cur_item.name,
- "sort_name": item.sort_name if overwrite else cur_item.sort_name,
- "version": item.version if overwrite else cur_item.version,
- "duration": item.duration if overwrite else cur_item.duration,
+ "name": item.name if is_file_provider else cur_item.name,
+ "sort_name": item.sort_name if is_file_provider else cur_item.sort_name,
+ "version": item.version if is_file_provider else cur_item.version,
+ "duration": item.duration or cur_item.duration,
"artists": serialize_to_json(track_artists),
"albums": serialize_to_json(track_albums),
"metadata": serialize_to_json(metadata),
"provider_mappings": serialize_to_json(provider_mappings),
- "isrc": item.isrc or cur_item.isrc,
+ "isrc": ";".join(cur_item.isrc),
+ "timestamp_modified": int(utc_timestamp()),
},
)
# update/set provider_mappings table
self,
base_track: Track,
upd_track: Track | None = None,
- overwrite: bool = False,
) -> list[ItemMapping]:
"""Extract all (unique) artists of track as ItemMapping."""
track_artists = upd_track.artists if upd_track and upd_track.artists else base_track.artists
# use intermediate set to clear out duplicates
- return list({await self._get_artist_mapping(x, overwrite) for x in track_artists})
+ return list({await self._get_artist_mapping(x) for x in track_artists})
async def _get_track_albums(
self,
base_track: Track,
upd_track: Track | None = None,
- overwrite: bool = False,
) -> list[TrackAlbumMapping]:
"""Extract all (unique) albums of track as TrackAlbumMapping."""
track_albums: list[TrackAlbumMapping] = []
track_albums = upd_track.albums
# append update item album if needed
if upd_track and upd_track.album:
- mapping = await self._get_album_mapping(upd_track.album, overwrite=overwrite)
+ mapping = await self._get_album_mapping(upd_track.album)
mapping = TrackAlbumMapping.from_dict(
{
**mapping.to_dict(),
track_albums.append(mapping)
# append base item album if needed
elif base_track and base_track.album:
- mapping = await self._get_album_mapping(base_track.album, overwrite=overwrite)
+ mapping = await self._get_album_mapping(base_track.album)
mapping = TrackAlbumMapping.from_dict(
{
**mapping.to_dict(),
async def _get_album_mapping(
self,
album: Album | ItemMapping,
- overwrite: bool = False,
) -> ItemMapping:
"""Extract (database) album as ItemMapping."""
if album.provider == "database":
return album
return ItemMapping.from_item(album)
- if overwrite:
- db_album = await self.mass.music.albums.add_db_item(album, overwrite_existing=True)
-
if db_album := await self.mass.music.albums.get_db_item_by_prov_id(
album.item_id, provider_domain=album.provider
):
return ItemMapping.from_item(db_album)
- db_album = await self.mass.music.albums.add_db_item(album, overwrite_existing=overwrite)
+ db_album = await self.mass.music.albums.add_db_item(album)
return ItemMapping.from_item(db_album)
- async def _get_artist_mapping(
- self, artist: Artist | ItemMapping, overwrite: bool = False
- ) -> ItemMapping:
+ async def _get_artist_mapping(self, artist: Artist | ItemMapping) -> ItemMapping:
"""Extract (database) track artist as ItemMapping."""
if artist.provider == "database":
if isinstance(artist, ItemMapping):
return artist
return ItemMapping.from_item(artist)
- if overwrite:
- artist = await self.mass.music.artists.add_db_item(artist, overwrite_existing=True)
-
if db_artist := await self.mass.music.artists.get_db_item_by_prov_id(
artist.item_id, provider_domain=artist.provider
):
for img in media_item.metadata.images:
if img.type != img_type:
continue
- if img.is_file and not allow_local:
+ if img.source != "http" and not allow_local:
continue
- if img.is_file and resolve_local:
+ if img.source != "http" and resolve_local:
# return imageproxy url for local filesystem items
# the original path is double encoded
encoded_url = urllib.parse.quote(urllib.parse.quote(img.url))
return None
async def get_thumbnail(
- self, path: str, size: int | None = None, base64: bool = False
+ self, path_or_url: str, size: int | None = None, source: str = "http", base64: bool = False
) -> bytes | str:
"""Get/create thumbnail image for path (image url or local path)."""
- thumbnail = await get_image_thumb(self.mass, path, size)
+ thumbnail = await get_image_thumb(self.mass, path_or_url, size=size, source=source)
if base64:
enc_image = b64encode(thumbnail).decode()
thumbnail = f"data:image/png;base64,{enc_image}"
async def _handle_imageproxy(self, request: web.Request) -> web.Response:
"""Handle request for image proxy."""
path = request.query["path"]
+ source = request.query.get("source", "http")
size = int(request.query.get("size", "0"))
if "%" in path:
# assume (double) encoded url, decode it
path = urllib.parse.unquote(path)
- try:
- image_data = await self.get_thumbnail(path, size)
- except Exception as err:
- LOGGER.exception(str(err), exc_info=err)
- image_data = None
-
- if not image_data:
- return web.Response(status=404)
-
- # we set the cache header to 1 year (forever)
- # the client can use the checksum value to refresh when content changes
- return web.Response(
- body=image_data,
- headers={"Cache-Control": "max-age=31536000"},
- content_type="image/png",
- )
+ with suppress(FileNotFoundError):
+ image_data = await self.get_thumbnail(path, size=size, source=source)
+ # we set the cache header to 1 year (forever)
+ # the client can use the checksum value to refresh when content changes
+ return web.Response(
+ body=image_data,
+ headers={"Cache-Control": "max-age=31536000"},
+ content_type="image/png",
+ )
+ return web.Response(status=404)
DB_TABLE_SETTINGS,
{"key": "version", "value": str(SCHEMA_VERSION), "type": "str"},
)
+ # create indexes if needed
+ await self.__create_database_indexes()
# compact db
await self.database.execute("VACUUM")
year INTEGER,
version TEXT,
in_library BOOLEAN DEFAULT 0,
- upc TEXT,
+ barcode TEXT,
musicbrainz_id TEXT,
artists json,
metadata json,
provider_mappings json,
- timestamp INTEGER DEFAULT 0
+ timestamp_added INTEGER NOT NULL,
+ timestamp_modified INTEGER NOT NULL
);"""
)
await self.database.execute(
in_library BOOLEAN DEFAULT 0,
metadata json,
provider_mappings json,
- timestamp INTEGER DEFAULT 0
+ timestamp_added INTEGER NOT NULL,
+ timestamp_modified INTEGER NOT NULL
);"""
)
await self.database.execute(
albums json,
metadata json,
provider_mappings json,
- timestamp INTEGER DEFAULT 0
+ timestamp_added INTEGER NOT NULL,
+ timestamp_modified INTEGER NOT NULL
);"""
)
await self.database.execute(
in_library BOOLEAN DEFAULT 0,
metadata json,
provider_mappings json,
- timestamp INTEGER DEFAULT 0,
- UNIQUE(name, owner)
+ timestamp_added INTEGER NOT NULL,
+ timestamp_modified INTEGER NOT NULL
);"""
)
await self.database.execute(
in_library BOOLEAN DEFAULT 0,
metadata json,
provider_mappings json,
- timestamp INTEGER DEFAULT 0
+ timestamp_added INTEGER NOT NULL,
+ timestamp_modified INTEGER NOT NULL
);"""
)
await self.database.execute(
);"""
)
- # create indexes
+ async def __create_database_indexes(self) -> None:
+ """Create database indexes."""
await self.database.execute(
"CREATE INDEX IF NOT EXISTS artists_in_library_idx on artists(in_library);"
)
"CREATE INDEX IF NOT EXISTS tracks_musicbrainz_id_idx on tracks(musicbrainz_id);"
)
await self.database.execute("CREATE INDEX IF NOT EXISTS tracks_isrc_idx on tracks(isrc);")
- await self.database.execute("CREATE INDEX IF NOT EXISTS albums_upc_idx on albums(upc);")
+ await self.database.execute(
+ "CREATE INDEX IF NOT EXISTS albums_barcode_idx on albums(barcode);"
+ )
# generic args
generic_args = [
"ffmpeg",
- "-hide_banner",
- "-loglevel",
- "quiet",
- "-ignore_unknown",
+ # "-hide_banner",
+ # "-loglevel",
+ # "quiet",
+ # "-ignore_unknown",
+ "-movflags",
+ "faststart",
]
# collect input args
input_args = []
return False
+def compare_barcode(
+ left_barcodes: set[str],
+ right_barcodes: set[str],
+):
+ """Compare two sets of barcodes and return True if a match was found."""
+ for left_barcode in left_barcodes:
+ for right_barcode in right_barcodes:
+ # convert EAN-13 to UPC-A by stripping off the leading zero
+ left_upc = left_barcode[1:] if left_barcode.startswith("0") else left_barcode
+ right_upc = right_barcode[1:] if right_barcode.startswith("0") else right_barcode
+ if compare_strings(left_upc, right_upc):
+ return True
+ return False
+
+
+def compare_isrc(
+ left_isrcs: set[str],
+ right_isrcs: set[str],
+):
+ """Compare two sets of isrc codes and return True if a match was found."""
+ for left_isrc in left_isrcs:
+ for right_isrc in right_isrcs:
+ if compare_strings(left_isrc, right_isrc):
+ return True
+ return False
+
+
def compare_album(
left_album: Album | ItemMapping,
right_album: Album | ItemMapping,
# return early on exact item_id match
if compare_item_ids(left_album, right_album):
return True
-
- # prefer match on UPC
+ # prefer match on barcode/upc
+ # not present on ItemMapping
if (
- isinstance(left_album, Album)
- and isinstance(right_album, Album)
- and left_album.upc
- and right_album.upc
- and ((left_album.upc in right_album.upc) or (right_album.upc in left_album.upc))
+ getattr(left_album, "barcode", None)
+ and getattr(right_album, "barcode", None)
+ and compare_barcode(left_album.barcode, right_album.barcode)
):
return True
# prefer match on musicbrainz_id
return False
if not compare_version(left_album.version, right_album.version):
return False
+ if (
+ hasattr(left_album, "metadata")
+ and hasattr(right_album, "metadata")
+ and not compare_explicit(left_album.metadata, right_album.metadata)
+ ):
+ return False
# compare album artist
# Note: Not present on ItemMapping
if (
"""Compare two track items and return True if they match."""
if left_track is None or right_track is None:
return False
+ assert isinstance(left_track, Track) and isinstance(right_track, Track)
# return early on exact item_id match
if compare_item_ids(left_track, right_track):
return True
- for left_isrc in left_track.isrcs:
- for right_isrc in right_track.isrcs:
- # ISRC is always 100% accurate match
- if left_isrc == right_isrc:
- return True
- if (
- left_track.musicbrainz_id
- and right_track.musicbrainz_id
- and left_track.musicbrainz_id == right_track.musicbrainz_id
- ):
- # musicbrainz_id is always 100% accurate match
+ if compare_isrc(left_track.isrc, right_track.isrc):
+ return True
+ if compare_strings(left_track.musicbrainz_id, right_track.musicbrainz_id):
return True
# album is required for track linking
if left_track.album is None or right_track.album is None:
compare_album(left_track.album, right_track.album)
and left_track.track_number
and right_track.track_number
- and left_track.disc_number == right_track.disc_number
+ and ((left_track.disc_number or 1) == (right_track.disc_number or 1))
and left_track.track_number == right_track.track_number
):
return True
import asyncio
import random
-from base64 import b64encode
+from base64 import b64decode, b64encode
from io import BytesIO
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from music_assistant.server import MusicAssistant
+ from music_assistant.server.providers.filesystem_local.base import FileSystemProviderBase
-async def get_image_data(mass: MusicAssistant, path: str) -> bytes:
+async def get_image_data(mass: MusicAssistant, path_or_url: str, source: str = "http") -> bytes:
"""Create thumbnail from image url."""
- # always try ffmpeg first to get the image because it supports
+ if source != "http" and (prov := mass.get_provider(source)):
+ prov: FileSystemProviderBase
+ file_item = await prov.resolve(path_or_url)
+ # store images in cache db if file larger than 5mb and we have no direct access to the file
+ use_cache = not file_item.local_path and (
+ not file_item.file_size or file_item.file_size > 5000000
+ )
+ cache_key = f"embedded_image.{path_or_url}.{source}"
+ if use_cache and (
+ cache_data := await mass.cache.get(cache_key, checksum=file_item.checksum)
+ ):
+ return b64decode(cache_data)
+ # read from file
+ input_file = file_item.local_path or prov.read_file_content(file_item.absolute_path)
+ if img_data := await get_embedded_image(input_file):
+ if use_cache:
+ await mass.cache.set(
+ cache_key, b64encode(img_data).decode(), checksum=file_item.checksum
+ )
+ return img_data
+ # always use ffmpeg to get the image because it supports
# both online and offline image files as well as embedded images in media files
- img_data = await get_embedded_image(path)
- if img_data:
+ if img_data := await get_embedded_image(path_or_url):
return img_data
- # assume file from file provider, we need to fetch it here...
- for prov in mass.music.providers:
- if not prov.domain.startswith("filesystem"):
- continue
- if not await prov.exists(path):
- continue
- img_data = await get_embedded_image(prov.read_file_content(path))
- if img_data:
- return img_data
- raise FileNotFoundError(f"Image not found: {path}")
+ raise FileNotFoundError(f"Image not found: {path_or_url}")
-async def get_image_thumb(mass: MusicAssistant, path: str, size: int | None) -> bytes:
+async def get_image_thumb(
+ mass: MusicAssistant, path_or_url: str, size: int | None, source: str = "http"
+) -> bytes:
"""Get (optimized) PNG thumbnail from image url."""
- img_data = await get_image_data(mass, path)
+ img_data = await get_image_data(mass, path_or_url, source)
def _create_image():
data = BytesIO()
stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
close_fds=True,
+ limit=32000000,
)
else:
self._proc = await asyncio.create_subprocess_exec(
stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
close_fds=True,
+ limit=32000000,
)
# Fix BrokenPipeError due to a race condition
from typing import Any
from music_assistant.common.helpers.util import try_parse_int
+from music_assistant.common.models.enums import AlbumType
from music_assistant.common.models.errors import InvalidDataError
+from music_assistant.common.models.media_items import MediaItemChapter
from music_assistant.constants import UNKNOWN_ARTIST
from music_assistant.server.helpers.process import AsyncProcess
TAG_SPLITTER = ";"
-def split_items(org_str: str) -> tuple[str, ...]:
+def split_items(org_str: str, split_slash: bool = False) -> tuple[str, ...]:
"""Split up a tags string by common splitter."""
- if not org_str:
+ if org_str is None:
return tuple()
if isinstance(org_str, list):
- return org_str
- return tuple(x.strip() for x in org_str.split(TAG_SPLITTER))
+ return (x.strip() for x in org_str)
+ org_str = org_str.strip()
+ if TAG_SPLITTER in org_str:
+ return tuple(x.strip() for x in org_str.split(TAG_SPLITTER))
+ if split_slash and "/" in org_str:
+ return tuple(x.strip() for x in org_str.split("/"))
+ return (org_str.strip(),)
def split_artists(org_artists: str | tuple[str, ...]) -> tuple[str, ...]:
return title_parts[1].strip()
return title
+ @property
+ def version(self) -> str:
+ """Return version tag (as-is)."""
+ if tag := self.tags.get("version"):
+ return tag
+ if (tag := self.tags.get("album_type")) and "live" in tag.lower():
+ # yes, this can happen
+ return "Live"
+ return ""
+
@property
def album(self) -> str:
"""Return album tag (as-is) if present."""
return split_items(tag)
# fallback to regular artist string
if tag := self.tags.get("artist"):
- if ";" in tag:
+ if TAG_SPLITTER in tag:
return split_items(tag)
return split_artists(tag)
# fallback to parsing from filename
return split_items(tag)
# fallback to regular artist string
if tag := self.tags.get("albumartist"):
- if ";" in tag:
+ if TAG_SPLITTER in tag:
return split_items(tag)
return split_artists(tag)
return tuple()
@property
def musicbrainz_artistids(self) -> tuple[str, ...]:
"""Return musicbrainz_artistid tag(s) if present."""
- return split_items(self.tags.get("musicbrainzartistid"))
+ return split_items(self.tags.get("musicbrainzartistid"), True)
@property
def musicbrainz_albumartistids(self) -> tuple[str, ...]:
"""Return musicbrainz_albumartistid tag if present."""
- return split_items(self.tags.get("musicbrainzalbumartistid"))
+ return split_items(self.tags.get("musicbrainzalbumartistid"), True)
@property
def musicbrainz_releasegroupid(self) -> str | None:
return self.tags.get("musicbrainzreleasetrackid")
@property
- def album_type(self) -> str | None:
+ def album_type(self) -> AlbumType:
"""Return albumtype tag if present."""
- if tag := self.tags.get("musicbrainzalbumtype"):
- return tag
- return self.tags.get("releasetype")
+ # handle audiobook/podcast
+ if self.filename.endswith("m4b") and len(self.chapters) > 1:
+ return AlbumType.AUDIOBOOK
+ if "podcast" in self.tags.get("genre", "").lower() and len(self.chapters) > 1:
+ return AlbumType.PODCAST
+ tag = self.tags.get("musicbrainzalbumtype", self.tags.get("musicbrainzalbumtype"))
+ if tag is None:
+ return AlbumType.UNKNOWN
+ # the album type tag is messy within id3 and may even contain multiple types
+ # try to parse one in order of preference
+ for album_type in (
+ AlbumType.PODCAST,
+ AlbumType.AUDIOBOOK,
+ AlbumType.COMPILATION,
+ AlbumType.EP,
+ AlbumType.SINGLE,
+ AlbumType.ALBUM,
+ ):
+ if album_type.value in tag.lower():
+ return album_type
+
+ return AlbumType.UNKNOWN
+
+ @property
+ def isrc(self) -> tuple[str, ...]:
+ """Return isrc tag(s)."""
+ if tag := self.tags.get("isrc"):
+ return split_items(tag, True)
+ if tag := self.tags.get("tsrc"):
+ return split_items(tag, True)
+ return tuple()
+
+ @property
+ def barcode(self) -> tuple[str, ...]:
+ """Return barcode (upc/ean) tag(s)."""
+ # prefer multi-artist tag
+ if tag := self.tags.get("barcode"):
+ return split_items(tag, True)
+ if tag := self.tags.get("upc"):
+ return split_items(tag, True)
+ if tag := self.tags.get("ean"):
+ return split_items(tag, True)
+ return tuple()
+
+ @property
+ def chapters(self) -> list[MediaItemChapter]:
+ """Return chapters in MediaItem (if any)."""
+ chapters: list[MediaItemChapter] = []
+ if raw_chapters := self.raw.get("chapters"):
+ for chapter_data in raw_chapters:
+ chapters.append(
+ MediaItemChapter(
+ chapter_id=chapter_data["id"],
+ position_start=chapter_data["start"],
+ position_end=chapter_data["end"],
+ title=chapter_data.get("tags", {}).get("title"),
+ )
+ )
+ return chapters
@classmethod
def parse(cls, raw: dict) -> AudioTags:
"-hide_banner",
"-loglevel",
"fatal",
+ "-threads",
+ "0",
"-show_error",
"-show_format",
"-show_streams",
+ "-show_chapters",
"-print_format",
"json",
"-i",
) as proc:
if file_path == "-":
# feed the file contents to the process
+
async def chunk_feeder():
- bytes_written = 0
async for chunk in input_file:
- try:
- await proc.write(chunk)
- except BrokenPipeError:
- break # race-condition: read enough data for tags
-
- # grabbing the first 5MB is enough to get the embedded tags
- bytes_written += len(chunk)
- if bytes_written > (5 * 1024000):
- break
+ await proc.write(chunk)
+
proc.write_eof()
proc.attach_task(chunk_feeder())
if file_path == "-":
# feed the file contents to the process
async def chunk_feeder():
- bytes_written = 0
async for chunk in input_file:
- try:
- await proc.write(chunk)
- except BrokenPipeError:
- break # race-condition: read enough data for tags
-
- # grabbing the first 5MB is enough to get the embedded image
- bytes_written += len(chunk)
- if bytes_written > (5 * 1024000):
- break
+ await proc.write(chunk)
+
proc.write_eof()
proc.attach_task(chunk_feeder())
CONF_ALT_APP = "alt_app"
+BASE_PLAYER_CONFIG_ENTRIES = (
+ ConfigEntry(
+ key=CONF_ALT_APP,
+ type=ConfigEntryType.BOOLEAN,
+ label="Use alternate Media app",
+ default_value=False,
+ description="Using the BubbleUPNP Media controller for playback improves "
+ "the playback experience but may not work on non-Google hardware.",
+ advanced=True,
+ ),
+)
+
async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
cast_player = self.castplayers.get(player_id)
- entries = (
- ConfigEntry(
- key=CONF_ALT_APP,
- type=ConfigEntryType.BOOLEAN,
- label="Use alternate Media app",
- default_value=cast_player
- and not cast_player.cast_info.is_audio_group
- and cast_player.cast_info.manufacturer == "Google Inc.",
- description="Using the BubbleUPNP Media controller for playback improves "
- "the playback experience but may not work on non-Google hardware.",
- advanced=True,
- ),
- )
+ entries = BASE_PLAYER_CONFIG_ENTRIES
if (
cast_player
and cast_player.cast_info.is_audio_group
from abc import abstractmethod
from collections.abc import AsyncGenerator
from dataclasses import dataclass
-from time import time
import xmltodict
)
from music_assistant.common.models.media_items import (
Album,
- AlbumType,
Artist,
BrowseFolder,
ContentType,
),
)
-TRACK_EXTENSIONS = ("mp3", "m4a", "mp4", "flac", "wav", "ogg", "aiff", "wma", "dsf")
+TRACK_EXTENSIONS = ("mp3", "m4a", "m4b", "mp4", "flac", "wav", "ogg", "aiff", "wma", "dsf")
PLAYLIST_EXTENSIONS = ("m3u", "pls")
SUPPORTED_EXTENSIONS = TRACK_EXTENSIONS + PLAYLIST_EXTENSIONS
IMAGE_EXTENSIONS = ("jpg", "jpeg", "JPG", "JPEG", "png", "PNG", "gif", "GIF")
AsyncGenerator yielding FileSystemItem objects.
"""
+ yield
@abstractmethod
async def resolve(self, file_path: str) -> FileSystemItem:
if item.ext in TRACK_EXTENSIONS:
# add/update track to db
- track = await self.get_track(item.path)
- # if the track was edited on disk, always overwrite existing db details
- overwrite_existing = item.path in prev_checksums
- await self.mass.music.tracks.add_db_item(
- track, overwrite_existing=overwrite_existing
- )
+ track = await self._parse_track(item)
+ await self.mass.music.tracks.add_db_item(track)
elif item.ext in PLAYLIST_EXTENSIONS:
playlist = await self.get_playlist(item.path)
# add/update] playlist to db
# playlist is always in-library
playlist.in_library = True
await self.mass.music.playlists.add_db_item(playlist)
- except MusicAssistantError as err:
- self.logger.error("Error processing %s - %s", item.path, str(err))
except Exception as err: # pylint: disable=broad-except
# we don't want the whole sync to crash on one file so we catch all exceptions here
self.logger.exception("Error processing %s - %s", item.path, str(err))
async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
- db_album = await self.mass.music.albums.get_db_item_by_prov_id(
- item_id=prov_album_id, provider_instance=self.instance_id
- )
- if db_album is None:
- raise MediaNotFoundError(f"Album not found: {prov_album_id}")
- if await self.exists(prov_album_id):
- # if path exists on disk allow parsing full details to allow refresh of metadata
- return await self._parse_album(db_album.name, prov_album_id, db_album.artists)
- return db_album
+ # all data is originated from the actual files (tracks) so grab the data from there
+ for track in await self.get_album_tracks(prov_album_id):
+ for prov_mapping in track.provider_mappings:
+ if prov_mapping.provider_instance == self.instance_id:
+ full_track = await self.get_track(prov_mapping.item_id)
+ return full_track.album
+ raise MediaNotFoundError(f"Album not found: {prov_album_id}")
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
raise MediaNotFoundError(f"Track path does not exist: {prov_track_id}")
file_item = await self.resolve(prov_track_id)
-
- # parse tags
- input_file = file_item.local_path or self.read_file_content(file_item.absolute_path)
- tags = await parse_tags(input_file, file_item.file_size)
-
- name, version = parse_title_and_version(tags.title)
- track = Track(
- item_id=file_item.path,
- provider=self.domain,
- name=name,
- version=version,
- )
-
- # album
- if tags.album:
- # work out if we have an album folder
- album_dir = get_parentdir(file_item.path, tags.album)
-
- # album artist(s)
- if tags.album_artists:
- album_artists = []
- for index, album_artist_str in enumerate(tags.album_artists):
- # work out if we have an artist folder
- artist_dir = get_parentdir(file_item.path, album_artist_str)
- artist = await self._parse_artist(album_artist_str, artist_path=artist_dir)
- if not artist.musicbrainz_id:
- with contextlib.suppress(IndexError):
- artist.musicbrainz_id = tags.musicbrainz_albumartistids[index]
- album_artists.append(artist)
- else:
- # album artist tag is missing, determine fallback
- fallback_action = self.config.get_value(CONF_MISSING_ALBUM_ARTIST_ACTION)
- if fallback_action == "various_artists":
- self.logger.warning(
- "%s is missing ID3 tag [albumartist], using %s as fallback",
- file_item.path,
- VARIOUS_ARTISTS,
- )
- album_artists = [await self._parse_artist(name=VARIOUS_ARTISTS)]
- elif fallback_action == "track_artist":
- self.logger.warning(
- "%s is missing ID3 tag [albumartist], using track artist(s) as fallback",
- file_item.path,
- )
- album_artists = [
- await self._parse_artist(name=track_artist_str)
- for track_artist_str in tags.artists
- ]
- else:
- # default action is to skip the track
- raise InvalidDataError("missing ID3 tag [albumartist]")
-
- track.album = await self._parse_album(
- tags.album,
- album_dir,
- artists=album_artists,
- )
- else:
- self.logger.warning("%s is missing ID3 tag [album]", file_item.path)
-
- # track artist(s)
- for index, track_artist_str in enumerate(tags.artists):
- # re-use album artist details if possible
- if track.album and (
- artist := next((x for x in track.album.artists if x.name == track_artist_str), None)
- ):
- track.artists.append(artist)
- else:
- artist = await self._parse_artist(track_artist_str)
- if not artist.musicbrainz_id:
- with contextlib.suppress(IndexError):
- artist.musicbrainz_id = tags.musicbrainz_artistids[index]
- track.artists.append(artist)
-
- # cover image - prefer album image, fallback to embedded
- if track.album and track.album.image:
- track.metadata.images = [track.album.image]
- elif tags.has_cover_image:
- # we do not actually embed the image in the metadata because that would consume too
- # much space and bandwidth. Instead we set the filename as value so the image can
- # be retrieved later in realtime.
- track.metadata.images = [MediaItemImage(ImageType.THUMB, file_item.path, True)]
- if track.album:
- # set embedded cover on album
- track.album.metadata.images = track.metadata.images
-
- # parse other info
- track.duration = tags.duration or 0
- track.metadata.genres = tags.genres
- track.disc_number = tags.disc
- track.track_number = tags.track
- track.isrc = tags.get("isrc")
- track.metadata.copyright = tags.get("copyright")
- track.metadata.lyrics = tags.get("lyrics")
- track.musicbrainz_id = tags.musicbrainz_trackid
- if track.album:
- if not track.album.musicbrainz_id:
- track.album.musicbrainz_id = tags.musicbrainz_releasegroupid
- if not track.album.year:
- track.album.year = tags.year
- if not track.album.upc:
- track.album.upc = tags.get("barcode")
- # try to parse albumtype
- if track.album and track.album.album_type == AlbumType.UNKNOWN:
- album_type = tags.album_type
- try:
- track.album.album_type = AlbumType(album_type)
- except (ValueError, KeyError):
- if track.album.sort_name in track.sort_name:
- track.album.album_type = AlbumType.SINGLE
-
- # set checksum to invalidate any cached listings
- checksum_timestamp = str(int(time()))
- track.metadata.checksum = checksum_timestamp
- if track.album:
- track.album.metadata.checksum = checksum_timestamp
- for artist in track.album.artists:
- artist.metadata.checksum = checksum_timestamp
-
- track.add_provider_mapping(
- ProviderMapping(
- item_id=file_item.path,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- content_type=ContentType.try_parse(tags.format),
- sample_rate=tags.sample_rate,
- bit_depth=tags.bits_per_sample,
- bit_rate=tags.bit_rate,
- )
- )
- return track
+ return await self._parse_track(file_item)
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
async for chunk in self.read_file_content(streamdetails.item_id, seek_bytes):
yield chunk
+ async def _parse_track(self, file_item: FileSystemItem) -> Track:
+ """Get full track details by id."""
+ # ruff: noqa: PLR0915, PLR0912
+
+ # m4a files are nasty because in 99% of the cases the metadata is
+ # at the end of the file (moov atom) so in order to read tags
+ # we need to read the entire file, which is not practically do-able with
+ # remote connections, so we ignore those files
+ large_m4a_file = (
+ file_item.ext in ("m4a", "m4b")
+ and not file_item.local_path
+ and file_item.file_size > 100000000
+ )
+ if large_m4a_file:
+ self.logger.warning(
+ "Large m4a file detected which is unsuitable for remote storage: %s"
+ " - consider converting this file to another file format or make sure "
+ "that `moov atom` metadata is at the beginning of the file. - "
+ "loading info for this file is going to take a long time!",
+ file_item.path,
+ )
+
+ # parse tags
+ input_file = file_item.local_path or self.read_file_content(file_item.absolute_path)
+ tags = await parse_tags(input_file, file_item.file_size)
+ if large_m4a_file:
+ tags.has_cover_image = False
+
+ name, version = parse_title_and_version(tags.title, tags.version)
+ track = Track(
+ item_id=file_item.path,
+ provider=self.domain,
+ name=name,
+ version=version,
+ )
+
+ # album
+ if tags.album:
+ # work out if we have an album folder
+ album_dir = get_parentdir(file_item.path, tags.album)
+
+ # album artist(s)
+ if tags.album_artists:
+ album_artists = []
+ for index, album_artist_str in enumerate(tags.album_artists):
+ # work out if we have an artist folder
+ artist_dir = get_parentdir(album_dir, album_artist_str, 1)
+ artist = await self._parse_artist(album_artist_str, artist_path=artist_dir)
+ if not artist.musicbrainz_id:
+ with contextlib.suppress(IndexError):
+ artist.musicbrainz_id = tags.musicbrainz_albumartistids[index]
+ album_artists.append(artist)
+ else:
+ # album artist tag is missing, determine fallback
+ fallback_action = self.config.get_value(CONF_MISSING_ALBUM_ARTIST_ACTION)
+ if fallback_action == "various_artists":
+ self.logger.warning(
+ "%s is missing ID3 tag [albumartist], using %s as fallback",
+ file_item.path,
+ VARIOUS_ARTISTS,
+ )
+ album_artists = [await self._parse_artist(name=VARIOUS_ARTISTS)]
+ elif fallback_action == "track_artist":
+ self.logger.warning(
+ "%s is missing ID3 tag [albumartist], using track artist(s) as fallback",
+ file_item.path,
+ )
+ album_artists = [
+ await self._parse_artist(name=track_artist_str)
+ for track_artist_str in tags.artists
+ ]
+ else:
+ # default action is to skip the track
+ raise InvalidDataError("missing ID3 tag [albumartist]")
+
+ track.album = await self._parse_album(
+ tags.album,
+ album_dir,
+ artists=album_artists,
+ )
+ else:
+ self.logger.warning("%s is missing ID3 tag [album]", file_item.path)
+
+ # track artist(s)
+ for index, track_artist_str in enumerate(tags.artists):
+ # re-use album artist details if possible
+ if track.album and (
+ artist := next((x for x in track.album.artists if x.name == track_artist_str), None)
+ ):
+ track.artists.append(artist)
+ else:
+ artist = await self._parse_artist(track_artist_str)
+ if not artist.musicbrainz_id:
+ with contextlib.suppress(IndexError):
+ artist.musicbrainz_id = tags.musicbrainz_artistids[index]
+ track.artists.append(artist)
+
+ # cover image - prefer album image, fallback to embedded
+ if track.album and track.album.image:
+ track.metadata.images = [track.album.image]
+ elif tags.has_cover_image:
+ # we do not actually embed the image in the metadata because that would consume too
+ # much space and bandwidth. Instead we set the filename as value so the image can
+ # be retrieved later in realtime.
+ track.metadata.images = [
+ MediaItemImage(ImageType.THUMB, file_item.path, self.instance_id)
+ ]
+ if track.album:
+ # set embedded cover on album
+ track.album.metadata.images = track.metadata.images
+
+ # parse other info
+ track.duration = tags.duration or 0
+ track.metadata.genres = set(tags.genres)
+ track.disc_number = tags.disc
+ track.track_number = tags.track
+ track.isrc.update(tags.isrc)
+ track.metadata.copyright = tags.get("copyright")
+ track.metadata.lyrics = tags.get("lyrics")
+ explicit_tag = tags.get("itunesadvisory")
+ if explicit_tag is not None:
+ track.metadata.explicit = explicit_tag == "1"
+ track.musicbrainz_id = tags.musicbrainz_trackid
+ track.metadata.chapters = tags.chapters
+ if track.album:
+ if not track.album.musicbrainz_id:
+ track.album.musicbrainz_id = tags.musicbrainz_releasegroupid
+ if not track.album.year:
+ track.album.year = tags.year
+ track.album.barcode.update(tags.barcode)
+ track.album.album_type = tags.album_type
+ track.album.metadata.explicit = track.metadata.explicit
+ # set checksum to invalidate any cached listings
+ track.metadata.checksum = file_item.checksum
+ if track.album:
+ # use track checksum for album(artists) too
+ track.album.metadata.checksum = track.metadata.checksum
+ for artist in track.album.artists:
+ artist.metadata.checksum = track.metadata.checksum
+
+ track.add_provider_mapping(
+ ProviderMapping(
+ item_id=file_item.path,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ content_type=ContentType.try_parse(tags.format),
+ sample_rate=tags.sample_rate,
+ bit_depth=tags.bits_per_sample,
+ bit_rate=tags.bit_rate,
+ )
+ )
+ return track
+
async def _parse_artist(
self,
name: str | None = None,
if item.ext != ext:
continue
try:
- images.append(MediaItemImage(ImageType(item.name), item.path, True))
+ images.append(MediaItemImage(ImageType(item.name), item.path, self.instance_id))
except ValueError:
- if "folder" in item.name or "AlbumArt" in item.name or "Artist" in item.name:
- images.append(MediaItemImage(ImageType.THUMB, item.path, True))
+ for filename in ("folder", "cover", "albumart", "artist"):
+ if item.name.lower().startswith(filename):
+ images.append(
+ MediaItemImage(ImageType.THUMB, item.path, self.instance_id)
+ )
+ break
return images
from music_assistant.server.helpers.compare import compare_strings
-def get_parentdir(base_path: str, name: str) -> str | None:
+def get_parentdir(base_path: str, name: str, skip: int = 0) -> str | None:
"""Look for folder name in path (to find dedicated artist or album folder)."""
+ if not base_path:
+ return None
parentdir = os.path.dirname(base_path)
- for _ in range(3):
+ for _ in range(skip, 3):
dirname = parentdir.rsplit(os.sep)[-1]
dirname = dirname.split("(")[0].split("[")[0].strip()
if compare_strings(name, dirname, False):
artistname=artist.name, album_mbid=ref_album.musicbrainz_id
):
return musicbrainz_id
- # try matching on album upc
- if ref_album.upc and (
- musicbrainz_id := await self._search_artist_by_album(
+ # try matching on album barcode
+ for barcode in ref_album.barcode:
+ if musicbrainz_id := await self._search_artist_by_album(
artistname=artist.name,
- album_upc=ref_album.upc,
- )
- ):
- return musicbrainz_id
+ album_barcode=barcode,
+ ):
+ return musicbrainz_id
# try again with matching on track isrc
for ref_track in ref_tracks:
- for isrc in ref_track.isrcs:
+ for isrc in ref_track.isrc:
if musicbrainz_id := await self._search_artist_by_track(
artistname=artist.name,
track_isrc=isrc,
self,
artistname: str,
albumname: str | None = None,
- album_upc: str | None = None,
+ album_barcode: str | None = None,
) -> str | None:
- """Retrieve musicbrainz artist id by providing the artist name and albumname or upc."""
- assert albumname or album_upc
+ """Retrieve musicbrainz artist id by providing the artist name and albumname or barcode."""
+ assert albumname or album_barcode
for searchartist in (
artistname,
re.sub(LUCENE_SPECIAL, r"\\\1", create_sort_name(artistname)),
):
- if album_upc:
- # search by album UPC (barcode)
- query = f"barcode:{album_upc}"
+ if album_barcode:
+ # search by album barcode (EAN or UPC)
+ query = f"barcode:{album_barcode}"
elif albumname:
# search by name
searchalbum = re.sub(LUCENE_SPECIAL, r"\\\1", albumname)
for strict in (True, False):
for item in result["releases"]:
if not (
- album_upc
+ album_barcode
or (albumname and compare_strings(item["title"], albumname, strict))
):
continue
return None
async def _search_artist_by_album_mbid(self, artistname: str, album_mbid: str) -> str | None:
- """Retrieve musicbrainz artist id by providing the artist name and albumname or upc."""
+ """Retrieve musicbrainz artist id by providing the artist name or album id."""
result = await self.get_data(f"release-group/{album_mbid}?inc=artist-credits")
if result and "artist-credit" in result:
for item in result["artist-credit"]:
album.metadata.genres = {album_obj["genre"]["name"]}
if img := self.__get_image(album_obj):
album.metadata.images = [MediaItemImage(ImageType.THUMB, img)]
- if len(album_obj["upc"]) == 13:
- # qobuz writes ean as upc ?!
- album.upc = album_obj["upc"][1:]
- else:
- album.upc = album_obj["upc"]
+ album.barcode.add(album_obj["upc"])
if "label" in album_obj:
album.metadata.label = album_obj["label"]["name"]
if album_obj.get("released_at"):
if album:
track.album = album
if track_obj.get("isrc"):
- track.isrc = track_obj["isrc"]
+ track.isrc.add(track_obj["isrc"])
if track_obj.get("performers"):
track.metadata.performers = {x.strip() for x in track_obj["performers"].split("-")}
if track_obj.get("copyright"):
if TYPE_CHECKING:
from aiohttp.client import ClientSession
+# TODO: Fix docstring
+# TODO: Add annotations
+
class SoundcloudAsyncAPI:
"""Soundcloud."""
async def get(self, url, headers=None, params=None):
"""Async get."""
- async with self.http_session as session:
- async with session.get(url=url, params=params, headers=headers) as response:
- return await response.json()
+ async with self.http_session.get(url=url, params=params, headers=headers) as response:
+ return await response.json()
async def login(self):
"""Login to soundcloud."""
if album_obj.get("images"):
album.metadata.images = [MediaItemImage(ImageType.THUMB, album_obj["images"][0]["url"])]
if "external_ids" in album_obj and album_obj["external_ids"].get("upc"):
- album.upc = album_obj["external_ids"]["upc"]
+ album.barcode.add(album_obj["external_ids"]["upc"])
+ if "external_ids" in album_obj and album_obj["external_ids"].get("ean"):
+ album.barcode.add(album_obj["external_ids"]["ean"])
if "label" in album_obj:
album.metadata.label = album_obj["label"]
if album_obj.get("release_date"):
if "preview_url" in track_obj:
track.metadata.preview = track_obj["preview_url"]
if "external_ids" in track_obj and "isrc" in track_obj["external_ids"]:
- track.isrc = track_obj["external_ids"]["isrc"]
+ track.isrc.add(track_obj["external_ids"]["isrc"])
if "album" in track_obj:
track.album = await self._parse_album(track_obj["album"])
if track_obj["album"].get("images"):
album.metadata.images = await self._parse_thumbnails(album_obj["thumbnails"])
if "description" in album_obj:
album.metadata.description = unquote(album_obj["description"])
+ if "isExplicit" in album_obj:
+ album.metadata.explicit = album_obj["isExplicit"]
if "artists" in album_obj:
album.artists = [
await self._parse_artist(artist)
"python-slugify==8.0.1",
"mashumaro==3.5.0",
"memory-tempfile==2.2.3",
- "music-assistant-frontend==20230323.1",
+ "music-assistant-frontend==20230324.0",
"pillow==9.4.0",
"unidecode==1.3.6",
"xmltodict==0.13.0",
git+https://github.com/pytube/pytube.git@refs/pull/1501/head
mashumaro==3.5.0
memory-tempfile==2.2.3
-music-assistant-frontend==20230323.1
+music-assistant-frontend==20230324.0
orjson==3.8.7
pillow==9.4.0
PyChromecast==13.0.5