lazy: bool = True,
) -> MediaItemType:
"""Get single music item by id and media type."""
- ctrl = self._get_controller(media_type)
+ ctrl = self.get_controller(media_type)
return await ctrl.get(
item_id, provider_id, force_refresh=force_refresh, lazy=lazy
)
return result["item_id"]
return None
- async def add_provider_mappings(
+ async def set_provider_mappings(
self,
item_id: int,
media_type: MediaType,
prov_ids: List[MediaItemProviderId],
):
- """Add provider ids for media item to database."""
- for prov in prov_ids:
- await self.add_provider_mapping(item_id, media_type, prov)
-
- async def add_provider_mapping(
- self,
- item_id: int,
- media_type: MediaType,
- prov_id: MediaItemProviderId,
- ):
- """Add provider id for media item to database."""
- await self.mass.database.insert_or_replace(
- DB_PROV_MAPPINGS,
- {
- "item_id": item_id,
- "media_type": media_type.value,
- "prov_item_id": prov_id.item_id,
- "provider": prov_id.provider,
- "quality": prov_id.quality.value if prov_id.quality else None,
- "details": prov_id.details,
- "url": prov_id.url,
- },
+ """Store provider ids for media item to database."""
+ # make sure that existing items are deleted first
+ await self.mass.database.delete(
+ DB_PROV_MAPPINGS, {"item_id": int(item_id), "media_type": media_type.value}
)
+ for prov_id in prov_ids:
+ await self.mass.database.insert_or_replace(
+ DB_PROV_MAPPINGS,
+ {
+ "item_id": item_id,
+ "media_type": media_type.value,
+ "prov_item_id": prov_id.item_id,
+ "provider": prov_id.provider,
+ "quality": prov_id.quality.value if prov_id.quality else None,
+ "details": prov_id.details,
+ "url": prov_id.url,
+ },
+ )
async def add_to_library(
self, media_type: MediaType, provider_item_id: str, provider_id: str
) -> None:
"""Add an item to the library."""
- ctrl = self._get_controller(media_type)
+ ctrl = self.get_controller(media_type)
await ctrl.add_to_library(provider_item_id, provider_id)
async def remove_from_library(
self, media_type: MediaType, provider_item_id: str, provider_id: str
) -> None:
"""Remove item from the library."""
- ctrl = self._get_controller(media_type)
+ ctrl = self.get_controller(media_type)
await ctrl.remove_from_library(provider_item_id, provider_id)
async def set_track_loudness(self, item_id: str, provider_id: str, loudness: int):
music_provider = self.get_provider(provider_id)
if not music_provider or not music_provider.available:
return
- controller = self._get_controller(media_type)
+ controller = self.get_controller(media_type)
# create a set of all previous and current db id's
prev_ids = set()
for db_item in await controller.library():
# sync playlist tracks
if media_type == MediaType.PLAYLIST:
await self._sync_playlist_tracks(db_item)
- # cool down a bit as we don't want to sync process to consume all IO
- await asyncio.sleep(0.05)
# process deletions
for item_id in prev_ids:
if item_id not in cur_ids:
await controller.set_db_library(item_id, False)
+ # in case of filestem, removal from library means the whole item is
+ # moved/deleted so we remove the prov mapping from db.
+ if provider_id == "filesystem":
+ if db_item := controller.get_db_item(item_id):
+ db_item.provider_ids = {
+ x
+ for x in db_item.provider_ids
+ if not (x.provider == provider_id)
+ }
+ await controller.update_db_item(item_id, db_item, True)
async def _sync_album_tracks(self, db_album: Album) -> None:
"""Store album tracks of in-library album in database."""
album_track.disc_number,
album_track.track_number,
)
- # cool down a bit as we don't want to sync process to consume all IO
- await asyncio.sleep(0.05)
async def _sync_playlist_tracks(self, db_playlist: Playlist) -> None:
"""Store playlist tracks of in-library playlist in database."""
db_track.item_id,
playlist_track.position,
)
- # cool down a bit as we don't want to sync process to consume all IO
- await asyncio.sleep(0.05)
- def _get_controller(
+ def get_controller(
self, media_type: MediaType
) -> ArtistsController | AlbumsController | TracksController | RadioController | PlaylistController:
"""Return controller for MediaType."""
from music_assistant.helpers.cache import cached
from music_assistant.helpers.compare import compare_album, compare_strings
from music_assistant.helpers.json import json_serializer
-from music_assistant.helpers.util import create_sort_name, merge_dict, merge_list
+from music_assistant.helpers.util import create_sort_name, merge_dict
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
Album,
cur_item = None
if not album.sort_name:
album.sort_name = create_sort_name(album.name)
+ assert album.provider_ids
# always try to grab existing item by external_id
if album.upc:
match = {"upc": album.upc}
break
if cur_item:
# update existing
- return await self.update_db_album(cur_item.item_id, album)
+ return await self.update_db_item(cur_item.item_id, album)
# insert new album
album_artist = ItemMapping.from_item(
)
item_id = new_item["item_id"]
# store provider mappings
- await self.mass.music.add_provider_mappings(
+ await self.mass.music.set_provider_mappings(
item_id, MediaType.ALBUM, album.provider_ids
)
self.logger.debug("added %s to database", album.name)
# return created object
return await self.get_db_item(item_id)
- async def update_db_album(self, item_id: int, album: Album) -> Album:
+ async def update_db_item(
+ self, item_id: int, album: Album, overwrite: bool = False
+ ) -> Album:
"""Update Album record in the database."""
cur_item = await self.get_db_item(item_id)
- metadata = merge_dict(cur_item.metadata, album.metadata)
- provider_ids = merge_list(cur_item.provider_ids, album.provider_ids)
- album_artist = ItemMapping.from_item(
- await self.mass.music.artists.get_db_item_by_prov_id(
- cur_item.artist.provider, cur_item.artist.item_id
+ if overwrite:
+ metadata = album.metadata
+ provider_ids = album.provider_ids
+ album_artist = ItemMapping.from_item(
+ await self.mass.music.artists.get_db_item_by_prov_id(
+ album.artist.provider, album.artist.item_id
+ )
+ or album.artist
+ )
+ else:
+ metadata = merge_dict(cur_item.metadata, album.metadata)
+ provider_ids = {*cur_item.provider_ids, *album.provider_ids}
+ album_artist = ItemMapping.from_item(
+ await self.mass.music.artists.get_db_item_by_prov_id(
+ cur_item.artist.provider, cur_item.artist.item_id
+ )
+ or cur_item.artist
)
- or cur_item.artist
- )
if cur_item.album_type == AlbumType.UNKNOWN:
album_type = album.album_type
else:
album_type = cur_item.album_type
- match = {"item_id": item_id}
await self.mass.database.update(
self.db_table,
- match,
+ {"item_id": item_id},
{
+ **album.to_db_row(),
"artist": json_serializer(album_artist),
"album_type": album_type.value,
"metadata": json_serializer(metadata),
"provider_ids": json_serializer(provider_ids),
},
)
- await self.mass.music.add_provider_mappings(
+ await self.mass.music.set_provider_mappings(
item_id, MediaType.ALBUM, album.provider_ids
)
self.logger.debug("updated %s in database: %s", album.name, item_id)
)
if compare_album(prov_album, db_album):
# 100% match, we can simply update the db with additional provider ids
- await self.update_db_album(db_album.item_id, prov_album)
+ await self.update_db_item(db_album.item_id, prov_album)
match_found = True
# while we're here, also match the artist
if db_album.artist.provider == "database":
compare_track,
)
from music_assistant.helpers.json import json_serializer
-from music_assistant.helpers.util import create_sort_name, merge_dict, merge_list
+from music_assistant.helpers.util import create_sort_name, merge_dict
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
Album,
"""Add a new artist record to the database."""
assert artist.musicbrainz_id
assert artist.name
+ assert artist.provider_ids
match = {"musicbrainz_id": artist.musicbrainz_id}
if cur_item := await self.mass.database.get_row(self.db_table, match):
# update existing
- return await self.update_db_artist(cur_item["item_id"], artist)
+ return await self.update_db_item(cur_item["item_id"], artist)
# insert artist
if not artist.sort_name:
artist.sort_name = create_sort_name(artist.name)
)
item_id = new_item["item_id"]
# store provider mappings
- await self.mass.music.add_provider_mappings(
+ await self.mass.music.set_provider_mappings(
item_id, MediaType.ARTIST, artist.provider_ids
)
self.logger.debug("added %s to database", artist.name)
# return created object
return await self.get_db_item(item_id)
- async def update_db_artist(self, item_id: int, artist: Artist) -> Artist:
+ async def update_db_item(
+ self, item_id: int, artist: Artist, overwrite: bool = False
+ ) -> Artist:
"""Update Artist record in the database."""
cur_item = await self.get_db_item(item_id)
- metadata = merge_dict(cur_item.metadata, artist.metadata)
- provider_ids = merge_list(cur_item.provider_ids, artist.provider_ids)
- match = {"item_id": item_id}
+ if overwrite:
+ metadata = artist.metadata
+ provider_ids = artist.provider_ids
+ else:
+ metadata = merge_dict(cur_item.metadata, artist.metadata)
+ provider_ids = {*cur_item.provider_ids, *artist.provider_ids}
+
await self.mass.database.update(
self.db_table,
- match,
+ {"item_id": item_id},
{
+ **artist.to_db_row(),
"metadata": json_serializer(metadata),
"provider_ids": json_serializer(provider_ids),
},
)
- await self.mass.music.add_provider_mappings(
+ await self.mass.music.set_provider_mappings(
item_id, MediaType.ARTIST, artist.provider_ids
)
self.logger.debug("updated %s in database: %s", artist.name, item_id)
prov_artist = await self.get_provider_item(
search_item_artist.item_id, search_item_artist.provider
)
- await self.update_db_artist(db_artist.item_id, prov_artist)
+ await self.update_db_item(db_artist.item_id, prov_artist)
return True
# try to get a match with some reference albums of this artist
artist_albums = await self.albums(db_artist.item_id, db_artist.provider)
search_result_item.artist.item_id,
search_result_item.artist.provider,
)
- await self.update_db_artist(db_artist.item_id, prov_artist)
+ await self.update_db_item(db_artist.item_id, prov_artist)
return True
return False
from music_assistant.constants import EventType, MassEvent
from music_assistant.helpers.cache import cached
from music_assistant.helpers.json import json_serializer
-from music_assistant.helpers.util import create_sort_name, merge_dict, merge_list
+from music_assistant.helpers.util import create_sort_name, merge_dict
from music_assistant.models.errors import InvalidDataError, MediaNotFoundError
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import MediaType, Playlist, Track
match = {"name": playlist.name, "owner": playlist.owner}
if cur_item := await self.mass.database.get_row(self.db_table, match):
# update existing
- return await self.update_db_playlist(cur_item["item_id"], playlist)
+ return await self.update_db_item(cur_item["item_id"], playlist)
# insert new playlist
new_item = await self.mass.database.insert_or_replace(
)
item_id = new_item["item_id"]
# store provider mappings
- await self.mass.music.add_provider_mappings(
+ await self.mass.music.set_provider_mappings(
item_id, MediaType.PLAYLIST, playlist.provider_ids
)
self.logger.debug("added %s to database", playlist.name)
# return created object
return await self.get_db_item(item_id)
- async def update_db_playlist(self, item_id: int, playlist: Playlist) -> Playlist:
+ async def update_db_item(
+ self, item_id: int, playlist: Playlist, overwrite: bool = False
+ ) -> Playlist:
"""Update Playlist record in the database."""
cur_item = await self.get_db_item(item_id)
- metadata = merge_dict(cur_item.metadata, playlist.metadata)
- provider_ids = merge_list(cur_item.provider_ids, playlist.provider_ids)
+ if overwrite:
+ metadata = playlist.metadata
+ provider_ids = playlist.provider_ids
+ else:
+ metadata = merge_dict(cur_item.metadata, playlist.metadata)
+ provider_ids = {*cur_item.provider_ids, *playlist.provider_ids}
if not playlist.sort_name:
playlist.sort_name = create_sort_name(playlist.name)
- match = {"item_id": item_id}
await self.mass.database.update(
self.db_table,
- match,
+ {"item_id": item_id},
{
"name": playlist.name,
"sort_name": playlist.sort_name,
"provider_ids": json_serializer(provider_ids),
},
)
- await self.mass.music.add_provider_mappings(
+ await self.mass.music.set_provider_mappings(
item_id, MediaType.PLAYLIST, playlist.provider_ids
)
self.logger.debug("updated %s in database: %s", playlist.name, item_id)
from music_assistant.constants import EventType, MassEvent
from music_assistant.helpers.json import json_serializer
-from music_assistant.helpers.util import create_sort_name, merge_dict, merge_list
+from music_assistant.helpers.util import create_sort_name, merge_dict
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import MediaType, Radio
"""Add a new radio record to the database."""
if not radio.sort_name:
radio.sort_name = create_sort_name(radio.name)
+ assert radio.provider_ids
match = {"sort_name": radio.sort_name}
if cur_item := await self.mass.database.get_row(self.db_table, match):
# update existing
- return await self.update_db_radio(cur_item["item_id"], radio)
+ return await self.update_db_item(cur_item["item_id"], radio)
# insert new radio
new_item = await self.mass.database.insert_or_replace(
)
item_id = new_item["item_id"]
# store provider mappings
- await self.mass.music.add_provider_mappings(
+ await self.mass.music.set_provider_mappings(
item_id, MediaType.RADIO, radio.provider_ids
)
self.logger.debug("added %s to database", radio.name)
# return created object
return await self.get_db_item(item_id)
- async def update_db_radio(self, item_id: int, radio: Radio) -> Radio:
+ async def update_db_item(
+ self, item_id: int, radio: Radio, overwrite: bool = False
+ ) -> Radio:
"""Update Radio record in the database."""
cur_item = await self.get_db_item(item_id)
- metadata = merge_dict(cur_item.metadata, radio.metadata)
- provider_ids = merge_list(cur_item.provider_ids, radio.provider_ids)
+ if overwrite:
+ metadata = radio.metadata
+ provider_ids = radio.provider_ids
+ else:
+ metadata = merge_dict(cur_item.metadata, radio.metadata)
+ provider_ids = {*cur_item.provider_ids, *radio.provider_ids}
if not radio.sort_name:
radio.sort_name = create_sort_name(radio.name)
self.db_table,
match,
{
+ **radio.to_db_row(),
"name": radio.name,
"sort_name": radio.sort_name,
"metadata": json_serializer(metadata),
"provider_ids": json_serializer(provider_ids),
},
)
- await self.mass.music.add_provider_mappings(
+ await self.mass.music.set_provider_mappings(
item_id, MediaType.RADIO, radio.provider_ids
)
self.logger.debug("updated %s in database: %s", radio.name, item_id)
compare_track,
)
from music_assistant.helpers.json import json_serializer
-from music_assistant.helpers.util import create_sort_name, merge_dict, merge_list
+from music_assistant.helpers.util import create_sort_name, merge_dict
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import ItemMapping, MediaType, Track
if compare_track(search_result_item, db_track):
# 100% match, we can simply update the db with additional provider ids
match_found = True
- await self.update_db_track(db_track.item_id, search_result_item)
+ await self.update_db_item(db_track.item_id, search_result_item)
# while we're here, also match the artist
if db_track_artist.provider == "database":
for artist in search_result_item.artists:
artist.item_id, artist.provider
)
)
- await self.mass.music.artists.update_db_artist(
+ await self.mass.music.artists.update_db_item(
db_track_artist.item_id, prov_artist
)
async def add_db_item(self, track: Track) -> Track:
"""Add a new track record to the database."""
assert track.artists, "Track is missing artist(s)"
+ assert track.provider_ids
if not track.sort_name:
track.sort_name = create_sort_name(track.name)
cur_item = None
break
if cur_item:
# update existing
- return await self.update_db_track(cur_item.item_id, track)
+ return await self.update_db_item(cur_item.item_id, track)
# no existing match found: insert new track
track_artists = await self._get_track_artists(track)
)
item_id = new_item["item_id"]
# store provider mappings
- await self.mass.music.add_provider_mappings(
+ await self.mass.music.set_provider_mappings(
item_id, MediaType.TRACK, track.provider_ids
)
# return created object
return await self.get_db_item(item_id)
- async def update_db_track(self, item_id: int, track: Track) -> Track:
+ async def update_db_item(
+ self, item_id: int, track: Track, overwrite: bool = False
+ ) -> Track:
"""Update Track record in the database, merging data."""
cur_item = await self.get_db_item(item_id)
- metadata = merge_dict(cur_item.metadata, track.metadata)
- provider_ids = merge_list(cur_item.provider_ids, track.provider_ids)
+ if overwrite:
+ metadata = track.metadata
+ provider_ids = track.provider_ids
+ else:
+ metadata = merge_dict(cur_item.metadata, track.metadata)
+ provider_ids = {*cur_item.provider_ids, *track.provider_ids}
+
# we store a mapping to artists on the track for easier access/listings
track_artists = await self._get_track_artists(track, cur_item.artists)
- match = {"item_id": item_id}
await self.mass.database.update(
self.db_table,
- match,
+ {"item_id": item_id},
{
+ **track.to_db_row(),
"artists": json_serializer(track_artists),
"metadata": json_serializer(metadata),
"provider_ids": json_serializer(provider_ids),
- "isrc": cur_item.isrc or track.isrc,
- "duration": cur_item.duration or track.duration,
+ "isrc": track.isrc or cur_item.isrc,
+ "duration": track.duration or cur_item.duration,
},
)
- await self.mass.music.add_provider_mappings(
+ await self.mass.music.set_provider_mappings(
item_id, MediaType.TRACK, track.provider_ids
)
# add track to album_tracks
self.logger.debug("updated %s in database: %s", track.name, item_id)
return await self.get_db_item(item_id)
- async def edit_db_track(self, item_id: int, track: Track) -> Track:
- """Update Track record in the database, overwriting data."""
- cur_item = await self.get_db_item(item_id)
- # delete any existing provider mappings
- await self.mass.database.delete(
- "provider_mappings",
- {"item_id": item_id, "media_type": MediaType.TRACK.value},
- )
- # overwrite the entire row with new data
- track_artists = await self._get_track_artists(track, cur_item.artists)
- await self.mass.database.update(
- self.db_table,
- {"item_id": item_id},
- {
- **track.to_db_row(),
- "artists": json_serializer(track_artists),
- },
- )
- # use regular update logic for the albumtracks logic etc.
- return await self.update_db_track(item_id, track)
-
async def _get_track_artists(
self, track: Track, cur_artists: List[ItemMapping] | None = None
) -> List[ItemMapping]:
import platform
import socket
import tempfile
-from typing import Any, Callable, Dict, List, Optional, Set, TypeVar
+from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar
import memory_tempfile
for key, value in new_dict.items():
if final_dict.get(key) and isinstance(value, dict):
final_dict[key] = merge_dict(final_dict[key], value)
+ if final_dict.get(key) and isinstance(value, tuple):
+ final_dict[key] = merge_tuples(final_dict[key], value)
if final_dict.get(key) and isinstance(value, list):
- final_dict[key] = merge_list(final_dict[key], value)
+ final_dict[key] = merge_lists(final_dict[key], value)
elif not final_dict.get(key) or allow_overwite:
final_dict[key] = value
return final_dict
-def merge_list(base_list: list, new_list: list) -> List:
+def merge_tuples(base: tuple, new: tuple) -> Tuple:
+ """Merge 2 tuples."""
+ return tuple(x for x in base if x not in new) + tuple(new)
+
+
+def merge_lists(base: list, new: list) -> list:
"""Merge 2 lists."""
- final_list = set(base_list)
- for item in new_list:
- if hasattr(item, "item_id"):
- for prov_item in final_list:
- if prov_item.item_id == item.item_id:
- prov_item = item
- if item not in final_list:
- final_list.add(item)
- return list(final_list)
+ return list(x for x in base if x not in new) + list(new)
def create_tempfile():
from dataclasses import dataclass, field
from enum import Enum, IntEnum
-from typing import Any, Dict, List, Mapping, Optional, Union
+from typing import Any, Dict, List, Mapping, Optional, Set, Union
from mashumaro import DataClassDictMixin
MetadataTypes = Union[int, bool, str, List[str]]
+JSON_KEYS = ("artists", "artist", "metadata", "provider_ids")
+
class MediaType(Enum):
"""Enum for MediaType."""
item_id: str
provider: str
name: str
+ # optional fields below
+ provider_ids: Set[MediaItemProviderId] = field(default_factory=set)
sort_name: Optional[str] = None
metadata: Dict[str, MetadataTypes] = field(default_factory=dict)
- provider_ids: List[MediaItemProviderId] = field(default_factory=list)
in_library: bool = False
media_type: MediaType = MediaType.UNKNOWN
uri: str = ""
self.uri = create_uri(self.media_type, self.provider, self.item_id)
if not self.sort_name:
self.sort_name = create_sort_name(self.name)
+ if not self.provider_ids:
+ self.add_provider_id(MediaItemProviderId(self.provider, self.item_id))
@classmethod
def from_db_row(cls, db_row: Mapping):
"""Create MediaItem object from database row."""
db_row = dict(db_row)
- for key in ["artists", "artist", "metadata", "provider_ids"]:
+ db_row["provider"] = "database"
+ for key in JSON_KEYS:
if key in db_row:
db_row[key] = json.loads(db_row[key])
- db_row["provider"] = "database"
if "in_library" in db_row:
db_row["in_library"] = bool(db_row["in_library"])
if db_row.get("albums"):
def to_db_row(self) -> dict:
"""Create dict from item suitable for db."""
return {
- key: json.dumps(val) if isinstance(val, (list, dict)) else val
- for key, val in self.to_dict().items()
+ key: json.dumps(value) if key in JSON_KEYS else value
+ for key, value in self.to_dict().items()
if key
not in [
"item_id",
"""Return (calculated) availability."""
return any(x.available for x in self.provider_ids)
+ def add_provider_id(self, prov_id: MediaItemProviderId) -> None:
+ """Add provider ID, overwrite existing entry."""
+ self.provider_ids = {
+ x
+ for x in self.provider_ids
+ if not (x.item_id == prov_id.item_id and x.provider == prov_id.provider)
+ }
+ self.provider_ids.add(prov_id)
+
@dataclass
class ItemMapping(DataClassDictMixin):
"""Filesystem musicprovider support for MusicAssistant."""
from __future__ import annotations
-import base64
import os
from typing import List, Optional, Tuple
import aiofiles
from tinytag import TinyTag
-from music_assistant.helpers.compare import compare_strings, get_compare_string
+from music_assistant.helpers.compare import compare_strings
from music_assistant.helpers.util import parse_title_and_version, try_parse_int
+from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant.models.media_items import (
Album,
AlbumType,
return (org_str,)
+DB_TABLE = "filesystem_mappings"
+
+
class FileSystemProvider(MusicProvider):
"""
Very basic implementation of a musicprovider for local files.
raise FileNotFoundError(
f"Playlist Directory {self._playlists_dir} does not exist"
)
+ # simple db table to keep a mapping of filename to id
+ async with self.mass.database.get_db() as _db:
+ await _db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {DB_TABLE}(
+ item_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ filename TEXT NOT NULL,
+ media_type TEXT NOT NULL,
+ UNIQUE(filename, media_type)
+ );"""
+ )
async def search(
self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5
result.append(track.album.artist)
return result
- async def get_library_artists(self, allow_cache=False) -> List[Artist]:
+ async def get_library_artists(self) -> List[Artist]:
"""Retrieve all library artists."""
- # pylint: disable = arguments-differ
- cache_key = f"{self.id}.library_artists"
- if allow_cache:
- if cache_result := await self.mass.cache.get(cache_key):
- return cache_result
result = []
- prev_ids = set()
- for track in await self.get_library_tracks(allow_cache):
+ cur_ids = set()
+ for track in await self.get_library_tracks(False):
if track.album is not None and track.album.artist is not None:
- if track.album.artist.item_id not in prev_ids:
+ if track.album.artist.item_id not in cur_ids:
result.append(track.album.artist)
- prev_ids.add(track.album.artist.item_id)
- await self.mass.cache.set(cache_key, result)
+ cur_ids.add(track.album.artist.item_id)
return result
- async def get_library_albums(self, allow_cache=False) -> List[Album]:
+ async def get_library_albums(self) -> List[Album]:
"""Get album folders recursively."""
- # pylint: disable = arguments-differ
- cache_key = f"{self.id}.library_albums"
- if allow_cache:
- if cache_result := await self.mass.cache.get(cache_key):
- return cache_result
result = []
- prev_ids = set()
- for track in await self.get_library_tracks(allow_cache):
+ cur_ids = set()
+ for track in await self.get_library_tracks(False):
if track.album is not None:
- if track.album.item_id not in prev_ids:
+ if track.album.item_id not in cur_ids:
result.append(track.album)
- prev_ids.add(track.album.item_id)
- await self.mass.cache.set(cache_key, result)
+ cur_ids.add(track.album.item_id)
return result
async def get_library_tracks(self, allow_cache=False) -> List[Track]:
if cache_result := await self.mass.cache.get(cache_key):
return cache_result
result = []
+ cur_ids = set()
for _root, _dirs, _files in os.walk(self._music_dir):
for file in _files:
filename = os.path.join(_root, file)
- if TinyTag.is_supported(filename):
- if track := await self._parse_track(filename):
- result.append(track)
+ if track := await self._parse_track(filename):
+ result.append(track)
+ cur_ids.add(track.item_id)
await self.mass.cache.set(cache_key, result)
return result
- async def get_library_playlists(self, allow_cache=False) -> List[Playlist]:
+ async def get_library_playlists(self) -> List[Playlist]:
"""Retrieve playlists from disk."""
- # pylint: disable = arguments-differ
if not self._playlists_dir:
return []
- cache_key = f"{self.id}.library_playlists"
- if allow_cache:
- if cache_result := await self.mass.cache.get(cache_key):
- return cache_result
result = []
+ cur_ids = set()
for filename in os.listdir(self._playlists_dir):
filepath = os.path.join(self._playlists_dir, filename)
if (
and not filename.startswith(".")
and filename.lower().endswith(".m3u")
):
- playlist = await self.get_playlist(filepath)
+ playlist = await self._parse_playlist(filepath)
if playlist:
result.append(playlist)
- await self.mass.cache.set(cache_key, result)
+ cur_ids.add(playlist.item_id)
return result
async def get_artist(self, prov_artist_id: str) -> Artist:
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
- itempath = self._music_dir + base64.b64decode(prov_track_id).decode("utf-8")
if os.sep in prov_track_id:
+ # this is already a filename
itempath = prov_track_id
+ else:
+ itempath = await self._get_filename(prov_track_id, MediaType.TRACK)
if not os.path.isfile(itempath):
- self.logger.error("track path does not exist: %s", itempath)
- return None
+ raise MediaNotFoundError(f"Track path does not exist: {itempath}")
return await self._parse_track(itempath)
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
- if os.sep not in prov_playlist_id:
- itempath = base64.b64decode(prov_playlist_id).decode("utf-8")
- else:
+ if os.sep in prov_playlist_id:
+ # this is already a filename
itempath = prov_playlist_id
- prov_playlist_id = base64.b64encode(itempath.encode("utf-8")).decode(
- "utf-8"
- )
+ else:
+ itempath = await self._get_filename(prov_playlist_id, MediaType.PLAYLIST)
if not os.path.isfile(itempath):
- self.logger.error("playlist path does not exist: %s", itempath)
- return None
- name = itempath.split(os.sep)[-1].replace(".m3u", "")
- playlist = Playlist(prov_playlist_id, provider=self.id, name=name)
- playlist.is_editable = True
- playlist.provider_ids.append(
- MediaItemProviderId(provider=self.id, item_id=prov_playlist_id)
- )
- playlist.owner = self._attr_name
- playlist.checksum = os.path.getmtime(itempath)
- return playlist
+ raise MediaNotFoundError(f"playlist path does not exist: {itempath}")
+ return await self._parse_playlist(itempath)
async def get_album_tracks(self, prov_album_id) -> List[Track]:
"""Get album tracks for given album id."""
async def get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]:
"""Get playlist tracks for given playlist id."""
result = []
- if os.sep not in prov_playlist_id:
- itempath = base64.b64decode(prov_playlist_id).decode("utf-8")
- else:
+ if os.sep in prov_playlist_id:
+ # this is already a filename
itempath = prov_playlist_id
+ else:
+ itempath = await self._get_filename(prov_playlist_id, MediaType.PLAYLIST)
if not os.path.isfile(itempath):
- self.logger.error("playlist path does not exist: %s", itempath)
- return result
+ raise MediaNotFoundError(f"playlist path does not exist: {itempath}")
index = 0
async with aiofiles.open(itempath, "r") as _file:
for line in await _file.readlines():
line = line.strip()
if line and not line.startswith("#"):
- track = await self._parse_track_from_uri(line)
- if track:
+ if track := await self._parse_track_from_uri(line):
result.append(track)
index += 1
return result
track
for track in await self.get_library_tracks(True)
if track.artists is not None
- and prov_artist_id in [x.item_id for x in track.provider_ids]
+ and prov_artist_id in (x.item_id for x in track.provider_ids)
]
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
filename = item_id
- if os.sep not in item_id:
- filename = base64.b64decode(item_id).decode("utf-8")
- if not os.path.isfile(filename):
- return None
+ if os.sep in item_id:
+ # this is already a filename
+ itempath = item_id
+ else:
+ itempath = await self._get_filename(item_id, MediaType.TRACK)
+ if not os.path.isfile(itempath):
+ raise MediaNotFoundError(f"Track path does not exist: {itempath}")
def parse_tag():
return TinyTag.get(filename)
# TODO: Fall back to parsing base details from filename if no tags found/supported
tag = await self.mass.loop.run_in_executor(None, parse_tag)
- filename_short = filename.split(self._music_dir)[1]
- prov_item_id = base64.b64encode(filename_short.encode("utf-8")).decode("utf-8")
+ prov_item_id = await self._get_item_id(filename, MediaType.TRACK)
name, version = parse_title_and_version(tag.title)
track = Track(
item_id=prov_item_id, provider=self.id, name=name, version=version
# parse track artists
track.artists = [
Artist(
- item_id=get_compare_string(item),
+ item_id=await self._get_item_id(item, MediaType.ARTIST),
provider=self._attr_id,
name=item,
)
# parse album
if tag.album is not None:
track.album = Album(
- item_id=get_compare_string(tag.album),
+ item_id=await self._get_item_id(tag.album, MediaType.ALBUM),
provider=self._attr_id,
name=tag.album,
year=try_parse_int(tag.year),
)
if tag.albumartist is not None:
track.album.artist = Artist(
- item_id=get_compare_string(tag.albumartist),
+ item_id=await self._get_item_id(tag.albumartist, MediaType.ARTIST),
provider=self._attr_id,
name=tag.albumartist,
)
else:
track.album.album_type = AlbumType.ALBUM
# parse other info
- track.metadata["genres"] = split_items(tag.genre)
+ track.metadata["genres"] = list(split_items(tag.genre))
track.disc_number = try_parse_int(tag.disc)
track.track_number = try_parse_int(tag.track)
track.isrc = tag.extra.get("isrc", "")
else:
quality = MediaQuality.LOSSY_MP3
quality_details = f"{tag.bitrate} kbps"
- track.provider_ids.append(
+ track.add_provider_id(
MediaItemProviderId(
provider=self.id,
item_id=prov_item_id,
quality=quality,
details=quality_details,
+ url=filename,
)
)
return track
+ async def _parse_playlist(self, filename: str) -> Playlist | None:
+ """Parse playlist from file."""
+ name = filename.split(os.sep)[-1].replace(".m3u", "")
+ prov_item_id = await self._get_item_id(filename, MediaType.PLAYLIST)
+ playlist = Playlist(prov_item_id, provider=self.id, name=name)
+ playlist.is_editable = True
+ playlist.add_provider_id(
+ MediaItemProviderId(provider=self.id, item_id=prov_item_id, url=filename)
+ )
+ playlist.owner = self._attr_name
+ playlist.checksum = os.path.getmtime(filename)
+ return playlist
+
async def _parse_track_from_uri(self, uri):
"""Try to parse a track from an uri found in playlist."""
- # pylint: disable=broad-except
if "://" in uri:
# track is uri from external provider?
try:
return await self.mass.music.get_item_by_uri(uri)
- except Exception as exc:
+ except MusicAssistantError as err:
self.logger.warning(
- "Could not parse uri %s to track: %s", uri, str(exc)
+ "Could not parse uri %s to track: %s", uri, str(err)
)
return None
# try to treat uri as filename
- # TODO: filename could be related to musicdir or full path
- track = await self.get_track(uri)
- if track:
- return track
- track = await self.get_track(os.path.join(self._music_dir, uri))
- if track:
- return track
- return None
+ try:
+ return await self.get_track(uri)
+ except MediaNotFoundError:
+ return None
+
+ async def _get_item_id(self, filename: str, media_type: MediaType) -> str:
+ """Get/create item ID for given filename."""
+ # we store the relative path in db
+ filename_base = filename.replace(self._music_dir, "")
+ if filename_base.startswith(os.sep):
+ filename_base = filename_base[1:]
+ match = {"filename": filename_base, "media_type": media_type.value}
+ if db_row := await self.mass.database.get_row(DB_TABLE, match):
+ return str(db_row["item_id"])
+ # filename not yet known in db, create new record
+ db_row = await self.mass.database.insert_or_replace(DB_TABLE, match)
+ return str(db_row["item_id"])
+
+ async def _get_filename(self, item_id: str, media_type: MediaType) -> str:
+ """Get/create ID for given filename."""
+ match = {"item_id": int(item_id), "media_type": media_type.value}
+ db_row = await self.mass.database.get_row(DB_TABLE, match)
+ if not db_row:
+ raise MediaNotFoundError(f"Item not found: {item_id}")
+ if media_type == MediaType.PLAYLIST:
+ return os.path.join(self._playlists_dir, db_row["filename"])
+ return os.path.join(self._music_dir, db_row["filename"])
artist = Artist(
item_id=str(artist_obj["id"]), provider=self.id, name=artist_obj["name"]
)
- artist.provider_ids.append(
+ artist.add_provider_id(
MediaItemProviderId(
provider=self.id,
item_id=str(artist_obj["id"]),
quality = MediaQuality.LOSSY_AAC
else:
quality = MediaQuality.FLAC_LOSSLESS
- album.provider_ids.append(
+ album.add_provider_id(
MediaItemProviderId(
provider=self.id,
item_id=str(album_obj["id"]),
quality = MediaQuality.LOSSY_AAC
else:
quality = MediaQuality.FLAC_LOSSLESS
- track.provider_ids.append(
+ track.add_provider_id(
MediaItemProviderId(
provider=self.id,
item_id=str(track_obj["id"]),
name=playlist_obj["name"],
owner=playlist_obj["owner"]["name"],
)
- playlist.provider_ids.append(
+ playlist.add_provider_id(
MediaItemProviderId(
provider=self.id,
item_id=str(playlist_obj["id"]),
artist = Artist(
item_id=artist_obj["id"], provider=self.id, name=artist_obj["name"]
)
- artist.provider_ids.append(
+ artist.add_provider_id(
MediaItemProviderId(
provider=self.id,
item_id=artist_obj["id"],
album.metadata["copyright"] = album_obj["copyrights"][0]["text"]
if album_obj.get("explicit"):
album.metadata["explicit"] = str(album_obj["explicit"]).lower()
- album.provider_ids.append(
+ album.add_provider_id(
MediaItemProviderId(
provider=self.id,
item_id=album_obj["id"],
track.metadata["explicit"] = True
if track_obj.get("popularity"):
track.metadata["popularity"] = track_obj["popularity"]
- track.provider_ids.append(
+ track.add_provider_id(
MediaItemProviderId(
provider=self.id,
item_id=track_obj["id"],
name=playlist_obj["name"],
owner=playlist_obj["owner"]["display_name"],
)
- playlist.provider_ids.append(
+ playlist.add_provider_id(
MediaItemProviderId(
provider=self.id,
item_id=playlist_obj["id"],
quality = MediaQuality.LOSSY_OGG
else:
quality = MediaQuality.LOSSY_MP3
- radio.provider_ids.append(
+ radio.add_provider_id(
MediaItemProviderId(
provider=self.id,
item_id=f'{details["preset_id"]}--{stream["media_type"]}',