result = await self._get_data("album-mb.php", i=album.musicbrainz_id)
if result and result.get("album"):
adb_album = result["album"][0]
- else:
+ elif album.artist:
# lookup by name
result = await self._get_data(
"searchalbum.php", s=album.artist.name, a=album.name
album.year = int(adb_album.get("intYearReleased", "0"))
if not album.musicbrainz_id:
album.musicbrainz_id = adb_album["strMusicBrainzID"]
- if not album.artist.musicbrainz_id:
+ if album.artist and not album.artist.musicbrainz_id:
album.artist.musicbrainz_id = adb_album["strMusicBrainzArtistID"]
if album.album_type == AlbumType.UNKNOWN:
album.album_type = ALBUMTYPE_MAPPING.get(
if adb_track:
if not track.musicbrainz_id:
track.musicbrainz_id = adb_track["strMusicBrainzID"]
- if not track.album.musicbrainz_id:
+ if track.album and not track.album.musicbrainz_id:
track.album.musicbrainz_id = adb_track["strMusicBrainzAlbumID"]
if not track_artist.musicbrainz_id:
track_artist.musicbrainz_id = adb_track["strMusicBrainzArtistID"]
media_type: MediaType,
provider_id: str,
provider_item_id: str,
- db: Optional[Db] = None, # pylint: disable=invalid-name
+ db: Optional[Db] = None,
) -> int | None:
"""Lookup database id for media item from provider id."""
if result := await self.mass.database.get_row(
return result["item_id"]
return None
+ async def get_provider_mappings(
+ self,
+ media_type: MediaType,
+ provider_id: str,
+ db: Optional[Db] = None,
+ ) -> List[int]:
+ """Lookup all database id's for media type for given provider id."""
+ if result := await self.mass.database.get_rows(
+ TABLE_PROV_MAPPINGS,
+ {
+ "media_type": media_type.value,
+ "provider": provider_id,
+ },
+ db=db,
+ ):
+ return [x["item_id"] for x in result]
+ return None
+
async def set_provider_mappings(
self,
item_id: int,
media_type: MediaType,
prov_ids: List[MediaItemProviderId],
- db: Optional[Db] = None, # pylint: disable=invalid-name
+ db: Optional[Db] = None,
):
"""Store provider ids for media item to database."""
async with self.mass.database.get_db(db) as _db:
cur_ids = set()
for prov_item in await music_provider.get_library_items(media_type):
prov_item: MediaItemType = prov_item
+
db_item: MediaItemType = await controller.get_db_item_by_prov_id(
prov_item.provider, prov_item.item_id
)
elif not db_item:
# for other mediatypes its enough to simply dump the item in the db
db_item = await controller.add_db_item(prov_item)
- elif (
- media_type == MediaType.PLAYLIST
- and db_item.checksum != prov_item.checksum
- ):
- # playlist checksum changed
+ elif db_item.metadata.checksum != prov_item.metadata.checksum:
+ # item checksum changed
+ # used by filesystem tracks and playlist items
db_item = await controller.add_db_item(prov_item)
cur_ids.add(db_item.item_id)
from __future__ import annotations
import asyncio
-from typing import List
+from typing import List, Optional
from music_assistant.helpers.compare import compare_album, compare_strings
from music_assistant.helpers.database import TABLE_ALBUMS
return await self.update_db_item(cur_item.item_id, album)
# insert new album
- assert album.artist
- if album.artist.musicbrainz_id and album.artist.provider != "database":
- album_artist = await self.mass.music.artists.add_db_item(album.artist)
- else:
- album_artist = (
- await self.mass.music.artists.get_db_item_by_prov_id(
- album.artist.provider, album.artist.item_id, db=_db
- )
- or album.artist
- )
+ album_artist = await self._get_album_artist(album, cur_item)
new_item = await self.mass.database.insert_or_replace(
self.db_table,
{
**album.to_db_row(),
- "artist": json_serializer(ItemMapping.from_item(album_artist)),
+ "artist": json_serializer(album_artist) or None,
},
db=_db,
)
"""Update Album record in the database."""
async with self.mass.database.get_db() as _db:
cur_item = await self.get_db_item(item_id)
- if (
- not isinstance(album.artist, ItemMapping)
- and album.artist.musicbrainz_id
- and album.artist.provider != "database"
- ):
- album_artist = await self.mass.music.artists.add_db_item(album.artist)
- else:
- album_artist = (
- await self.mass.music.artists.get_db_item_by_prov_id(
- album.artist.provider, album.artist.item_id, db=_db
- )
- or album.artist
- )
+ album_artist = await self._get_album_artist(album, cur_item)
if overwrite:
metadata = album.metadata
provider_ids = album.provider_ids
"year": album.year or cur_item.year,
"upc": album.upc or cur_item.upc,
"album_type": album_type.value,
- "artist": json_serializer(ItemMapping.from_item(album_artist)),
+ "artist": json_serializer(album_artist) or None,
"metadata": json_serializer(metadata),
"provider_ids": json_serializer(provider_ids),
},
continue
if MediaType.ALBUM in provider.supported_mediatypes:
await find_prov_match(provider)
+
+ async def _get_album_artist(
+ self, db_album: Album, updated_album: Optional[Album] = None
+ ) -> ItemMapping | None:
+ """Extract album artist as ItemMapping, prefer database ID."""
+ for album in (updated_album, db_album):
+ if not album or not album.artist:
+ continue
+
+ if isinstance(album.artist, ItemMapping):
+ return album.artist
+
+ if album.artist.provider == "database":
+ return ItemMapping.from_item(album.artist)
+
+ if album.artist.musicbrainz_id:
+ album_artist = await self.mass.music.artists.add_db_item(album.artist)
+ return ItemMapping.from_item(album_artist)
+
+ if album_artist := await self.mass.music.artists.get_db_item_by_prov_id(
+ album.artist.provider, album.artist.item_id
+ ):
+ return ItemMapping.from_item(album_artist)
+
+ return ItemMapping.from_item(album.artist)
+ return None
"sort_name": playlist.sort_name,
"owner": playlist.owner,
"is_editable": playlist.is_editable,
- "checksum": playlist.checksum,
"metadata": json_serializer(metadata),
"provider_ids": json_serializer(provider_ids),
},
from __future__ import annotations
import asyncio
-from typing import List
+from typing import List, Optional
from music_assistant.helpers.compare import (
compare_artists,
track.sort_name = create_sort_name(track.name)
cur_item = None
async with self.mass.database.get_db() as _db:
- track_album = track.album
- if track_album and not isinstance(track.album, ItemMapping):
- track_album = ItemMapping.from_item(
- await self.get_db_item_by_prov_id(
- track.album.provider, track.album.item_id, db=_db
- )
- or await self.mass.music.albums.add_db_item(track.album)
- )
+ track_album = await self._get_track_album(track)
+
# always try to grab existing item by external_id
if track.musicbrainz_id:
match = {"musicbrainz_id": track.musicbrainz_id}
{
**track.to_db_row(),
"artists": json_serializer(track_artists),
- "album": json_serializer(track_album),
+ "album": json_serializer(track_album) or None,
},
db=_db,
)
"""Update Track record in the database, merging data."""
async with self.mass.database.get_db() as _db:
cur_item = await self.get_db_item(item_id, db=_db)
+ track_album = await self._get_track_album(track)
if overwrite:
provider_ids = track.provider_ids
track_artists = track.artists
):
track_artists.append(ItemMapping.from_item(track_artist))
return track_artists
+
+ async def _get_track_album(
+ self, db_track: Track, updated_track: Optional[Track] = None
+ ) -> ItemMapping | None:
+ """Extract track album as ItemMapping, prefer database ID."""
+ for track in (updated_track, db_track):
+ if not track or not track.album:
+ continue
+
+ if isinstance(track.album, ItemMapping):
+ return track.album
+
+ if track.album.provider == "database":
+ return ItemMapping.from_item(track.album)
+
+ if track.album.musicbrainz_id:
+ track_album = await self.mass.music.albums.add_db_item(track.album)
+ return ItemMapping.from_item(track_album)
+
+ if track_album := await self.mass.music.albums.get_db_item_by_prov_id(
+ track.album.provider, track.album.item_id
+ ):
+ return ItemMapping.from_item(track_album)
+
+ return ItemMapping.from_item(track.album)
+ return None
from __future__ import annotations
import asyncio
+import urllib.parse
from asyncio import Task
from time import time
from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional, Set
track = await self.mass.music.tracks.get_provider_item(track_id, provider)
if preview := track.metadata.preview:
return preview
- return f"http://{self._ip}:{self._port}/preview/{provider}/{track_id}.mp3"
+ enc_track_id = urllib.parse.quote(track_id)
+ return f"http://{self._ip}:{self._port}/preview?provider={provider}&item_id={enc_track_id}"
async def setup(self) -> None:
"""Async initialize of module."""
app = web.Application()
- app.router.add_get("/preview/{provider}/{item_id}.mp3", self.serve_preview)
+ app.router.add_get("/preview", self.serve_preview)
app.router.add_get(
"/{queue_id}/{player_id}.{format}",
self.serve_multi_client_queue_stream,
async def serve_preview(self, request: web.Request):
"""Serve short preview sample."""
- provider = request.match_info["provider"]
- item_id = request.match_info["item_id"]
+ provider = request.query["provider"]
+ item_id = urllib.parse.unquote(request.query["item_id"])
resp = web.StreamResponse(
status=200, reason="OK", headers={"Content-Type": "audio/mp3"}
)
"-i",
streamdetails.path,
]
- output_args = ["-ss", "30", "-to", "60", "-f", "mp3", "-q:a", "6", "-"]
+ output_args = ["-ss", "30", "-to", "60", "-f", "mp3", "-"]
async with AsyncProcess(input_args + output_args) as proc:
# yield chunks from stdout
from __future__ import annotations
import re
-from typing import List
+from typing import List, Union
import unidecode
return False
-def compare_album(left_album: Album, right_album: Album):
+def compare_album(
+ left_album: Union[Album, ItemMapping], right_album: Union[Album, ItemMapping]
+):
"""Compare two album items and return True if they match."""
if left_album is None or right_album is None:
return False
and left_album.item_id == right_album.item_id
):
return True
- # make sure we have a full album and not a simplified ItemMapping
- assert not isinstance(left_album, ItemMapping), "Full Album object required"
- assert not isinstance(right_album, ItemMapping), "Full Album object required"
- # prefer match on UPC
- if left_album.upc and right_album.upc:
- if (left_album.upc in right_album.upc) or (right_album.upc in left_album.upc):
- return True
- # prefer match on musicbrainz_id
- if left_album.musicbrainz_id and right_album.musicbrainz_id:
- if left_album.musicbrainz_id == right_album.musicbrainz_id:
- return True
+ if isinstance(left_album, Album) and isinstance(right_album, Album):
+ # prefer match on UPC
+ if left_album.upc and right_album.upc:
+ if (left_album.upc in right_album.upc) or (
+ right_album.upc in left_album.upc
+ ):
+ return True
+ # prefer match on musicbrainz_id
+ if left_album.musicbrainz_id and right_album.musicbrainz_id:
+ if left_album.musicbrainz_id == right_album.musicbrainz_id:
+
+ return True
# fallback to comparing
if not compare_strings(left_album.name, right_album.name):
return False
if not compare_version(left_album.version, right_album.version):
return False
+ if not left_album.artist or not right_album.artist:
+ return False
if not compare_strings(left_album.artist.name, right_album.artist.name):
return False
# 100% match, all criteria passed
# track if both tracks are (not) explicit
if not compare_explicit(left_track.metadata, right_track.metadata):
return False
- # album match OR (near) exact duration match
+ # exact album match OR (near) exact duration match
if isinstance(left_track.album, Album) and isinstance(right_track.album, Album):
if compare_album(left_track.album, right_track.album):
return True
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
-# pylint: disable=invalid-name
SCHEMA_VERSION = 7
sort_name TEXT NOT NULL,
owner TEXT NOT NULL,
is_editable BOOLEAN NOT NULL,
- checksum TEXT NOT NULL,
in_library BOOLEAN DEFAULT 0,
metadata json,
provider_ids json,
return (prov.provider, prov.item_id)
return None, None
- async def get_db_items(self, custom_query: Optional[str] = None) -> List[ItemCls]:
+ async def get_db_items(
+ self, custom_query: Optional[str] = None, db: Optional[Db] = None
+ ) -> List[ItemCls]:
"""Fetch all records from database."""
if custom_query is not None:
- func = self.mass.database.get_rows_from_query(custom_query)
+ func = self.mass.database.get_rows_from_query(custom_query, db=db)
else:
- func = self.mass.database.get_rows(self.db_table)
+ func = self.mass.database.get_rows(self.db_table, db=db)
return [self.item_cls.from_db_row(db_row) for db_row in await func]
async def get_db_item(self, item_id: int, db: Optional[Db] = None) -> ItemCls:
- # pylint: disable = invalid-name
"""Get record by id."""
match = {"item_id": int(item_id)}
if db_row := await self.mass.database.get_row(self.db_table, match, db=db):
self,
provider_id: str,
provider_item_id: str,
- db: Optional[Db] = None, # pylint: disable = invalid-name
+ db: Optional[Db] = None,
) -> ItemCls | None:
"""Get the database album for the given prov_id."""
if provider_id == "database":
return await self.get_db_item(item_id, db=db)
return None
+ async def get_db_items_by_prov_id(
+ self, provider_id: str, db: Optional[Db] = None
+ ) -> List[ItemCls]:
+ """Fetch all records from database for given provider."""
+ db_ids = await self.mass.music.get_provider_mappings(
+ self.media_type, provider_id, db=db
+ )
+ query = f"SELECT * FROM tracks WHERE item_id in {str(tuple(db_ids))}"
+ return await self.get_db_items(query, db=db)
+
async def set_db_library(self, item_id: int, in_library: bool) -> None:
"""Set the in-library bool on a database item."""
match = {"item_id": item_id}
popularity: Optional[int] = None
# last_refresh: timestamp the (full) metadata was last collected
last_refresh: Optional[int] = None
+ # checksum: optional value to detect changes (e.g. playlists)
+ checksum: Optional[str] = None
def update(
self,
item_id: str
provider: str
name: str = ""
+ version: str = ""
media_type: MediaType = MediaType.ARTIST
uri: str = ""
media_type: MediaType = MediaType.ALBUM
version: str = ""
year: Optional[int] = None
- artist: Union[ItemMapping, Artist, None] = None
+ artist: Union[Artist, ItemMapping, None] = None
album_type: AlbumType = AlbumType.UNKNOWN
upc: Optional[str] = None
musicbrainz_id: Optional[str] = None # release group id
version: str = ""
isrc: Optional[str] = None
musicbrainz_id: Optional[str] = None # Recording ID
- artists: List[Union[ItemMapping, Artist]] = field(default_factory=list)
+ artists: List[Union[Artist, ItemMapping]] = field(default_factory=list)
# album track only
- album: Union[ItemMapping, Album, None] = None
+ album: Union[Album, ItemMapping, None] = None
disc_number: Optional[int] = None
track_number: Optional[int] = None
# playlist track only
media_type: MediaType = MediaType.PLAYLIST
owner: str = ""
- checksum: str = "" # some value to detect playlist track changes
is_editable: bool = False
def __post_serialize__(self, d: Dict[Any, Any]) -> Dict[Any, Any]:
"""Exclude internal fields from dict."""
- # pylint: disable=invalid-name,no-self-use
+ # pylint: disable=no-self-use
d.pop("path")
d.pop("details")
return d
@classmethod
def __pre_deserialize__(cls, d: Dict[Any, Any]) -> Dict[Any, Any]:
"""Run actions before serialization."""
- # pylint: disable=invalid-name
d.pop("streamdetails", None)
return d
async def queue_stream_prepare(self) -> StreamDetails:
"""Call when queue_streamer is about to start playing."""
- start_from_index = self._next_start_index
+ start_from_index = self._next_start_index or 0
try:
next_item = self._items[start_from_index]
except (IndexError, TypeError) as err:
async def queue_stream_start(self) -> int:
"""Call when queue_streamer starts playing the queue stream."""
- start_from_index = self._next_start_index
+ start_from_index = self._next_start_index or 0
self._current_item_elapsed_time = 0
self._current_index = start_from_index
self._start_index = start_from_index
import base64
import os
-from typing import List, Optional, Tuple
+from typing import Dict, List, Optional, Tuple
import aiofiles
-from tinytag.tinytag import TinyTag
+from tinytag.tinytag import TinyTag, TinyTagException
from music_assistant.helpers.compare import compare_strings
from music_assistant.helpers.util import parse_title_and_version, try_parse_int
cur_ids.add(track.album.item_id)
return result
- async def get_library_tracks(self, allow_cache=False) -> List[Track]:
+ async def get_library_tracks(self, use_cache=False) -> List[Track]:
"""Get all tracks recursively."""
- # pylint: disable = arguments-differ
- # we cache this listing in memory for performance and convenience reasons
+ # pylint: disable=arguments-differ
+ # we cache the entire tracks listing for performance and convenience reasons
# so we can easy retrieve the library artists and albums from the tracks listing
- # if this may ever lead to memory issues, we can do the caching in db instead.
- if allow_cache and self._cached_tracks:
- return self._cached_tracks
- result = []
- cur_ids = set()
+ cache_key = f"{self.id}.tracks"
+ cache_result: Dict[str, dict] = await self.mass.cache.get(
+ cache_key, checksum=self._music_dir
+ )
+ if cache_result is not None and use_cache:
+ return [Track.from_dict(x) for x in cache_result.values()]
+ if cache_result is None:
+ cache_result = {}
+
# find all music files in the music directory and all subfolders
+ result = []
for _root, _dirs, _files in os.walk(self._music_dir):
for file in _files:
filename = os.path.join(_root, file)
- if track := await self._parse_track(filename):
+ checksum = self._get_checksum(filename)
+ prov_item_id = self._get_item_id(filename)
+ cache_track = cache_result.get(prov_item_id)
+ # we do not want to parse tags if there are no changes to the file
+ # so we speedup the sync by comparing a checksum
+ if cache_track and cache_track["metadata"].get("checksum") == checksum:
+ # checksum did not change, use cached track
+ result.append(Track.from_dict(cache_track))
+ elif track := await self._parse_track(filename):
+ cache_result[prov_item_id] = track.to_dict()
result.append(track)
- cur_ids.add(track.item_id)
- self._cached_tracks = result
+ # store cache listing in cache
+ await self.mass.cache.set(cache_key, cache_result, self._music_dir)
return result
async def get_library_playlists(self) -> List[Playlist]:
track
for track in await self.get_library_tracks(True)
if track.artists is not None
- and prov_artist_id in (x.item_id for x in track.artists)
+ and (
+ (prov_artist_id in (x.item_id for x in track.artists))
+ or (
+ track.album is not None
+ and track.album.artist is not None
+ and track.album.artist.item_id == prov_artist_id
+ )
+ )
]
async def get_stream_details(self, item_id: str) -> StreamDetails:
return TinyTag.get(filename, image=True, ignore_errors=True)
# parse ID3 tags with TinyTag
- tags = await self.mass.loop.run_in_executor(None, parse_tags)
+ try:
+ tags = await self.mass.loop.run_in_executor(None, parse_tags)
+ except TinyTagException as err:
+ self.logger.error("Error processing %s: %s", filename, str(err))
- # use the relative filename as item_id
- filename_base = filename.replace(self._music_dir, "")
- if filename_base.startswith(os.sep):
- filename_base = filename_base[1:]
- prov_item_id = filename_base
+ prov_item_id = self._get_item_id(filename)
# work out if we have an artist/album/track.ext structure
filename_base = filename.replace(self._music_dir, "")
if filename_base.startswith(os.sep):
filename_base = filename_base[1:]
track_parts = filename_base.rsplit(os.sep)
- if track_parts == 3:
+ if len(track_parts) == 3:
album_artist_name = track_parts[0]
album_name = track_parts[1]
- album_artist_name = tags.albumartist
- album_name = tags.album
+ else:
+ album_artist_name = tags.albumartist
+ album_name = tags.album
# prefer title from tag, fallback to filename
if tags.title:
track.metadata.copyright = tags.extra["copyright"]
if "lyrics" in tags.extra:
track.metadata.lyrics = tags.extra["lyrics"]
+ # store last modified time as checksum
+ track.metadata.checksum = self._get_checksum(filename)
quality_details = ""
if filename.endswith(".flac"):
if playlist:
return os.path.join(self._playlists_dir, item_id)
return os.path.join(self._music_dir, item_id)
+
+ def _get_item_id(self, filename: str, playlist: bool = False) -> str:
+ """Return item_id for given filename."""
+ # we simply use the base filename as item_id
+ base_path = self._playlists_dir if playlist else self._music_dir
+ filename_base = filename.replace(base_path, "")
+ if filename_base.startswith(os.sep):
+ filename_base = filename_base[1:]
+ return filename_base
+
+ @staticmethod
+ def _get_checksum(filename: str) -> str:
+ """Get checksum for file."""
+ # use last modified time as checksum
+ return str(os.path.getmtime(filename))
)
if img := self.__get_image(playlist_obj):
playlist.metadata.images = {MediaItemImage(ImageType.THUMB, img)}
- playlist.checksum = str(playlist_obj["updated_at"])
+ playlist.metadata.checksum = str(playlist_obj["updated_at"])
return playlist
async def _auth_token(self):
playlist.metadata.images = {
MediaItemImage(ImageType.THUMB, playlist_obj["images"][0]["url"])
}
- playlist.checksum = str(playlist_obj["snapshot_id"])
+ playlist.metadata.checksum = str(playlist_obj["snapshot_id"])
return playlist
async def get_token(self):
extension-pkg-whitelist=taglib
[BASIC]
-good-names=id,i,j,k,ex,Run,_,fp,T,ev
+good-names=id,i,j,k,ex,Run,_,fp,T,ev,db,d
[MESSAGES CONTROL]
# Reasons disabled:
extension-pkg-whitelist=taglib,orjson
[pylint.basic]
-good-names=id,i,j,k,ex,Run,_,fp,T,ev
+good-names=id,i,j,k,ex,Run,_,fp,T,ev,db,d
[pylint.messages_control]
# Reasons disabled: