import asyncio
import logging
import os
+import webbrowser
from os.path import abspath, dirname
from sys import path
# pylint: disable=wrong-import-position
from music_assistant.mass import MusicAssistant
from music_assistant.models.config import MassConfig, MusicProviderConfig
-from music_assistant.models.enums import ProviderType
-from music_assistant.models.player import Player, PlayerState
-from music_assistant.models.player_queue import RepeatMode
+from music_assistant.models.enums import ProviderType, RepeatMode, PlayerState
+from music_assistant.models.player import Player
parser = argparse.ArgumentParser(description="MusicAssistant")
logging.getLogger("aiorun").setLevel(logging.WARNING)
logging.getLogger("asyncio").setLevel(logging.INFO)
logging.getLogger("aiosqlite").setLevel(logging.WARNING)
-logging.getLogger("databases").setLevel(logging.WARNING)
+logging.getLogger("databases").setLevel(logging.INFO)
# default database based on sqlite
print(f"stream url: {url}")
self._attr_current_url = url
self.update_state()
+ # launch stream url in browser so we can hear it playing ;-)
+ # normally this url is sent to the actual player implementation
+ webbrowser.open(url)
async def stop(self) -> None:
"""Send STOP command to player."""
async with MusicAssistant(mass_conf) as mass:
- # start sync
- await mass.music.start_sync(schedule=3)
+ # run sync
+ await mass.music.start_sync()
# get some data
- artists = await mass.music.artists.count()
- print(f"Got {artists} artists in library")
- albums = await mass.music.albums.count()
- print(f"Got {albums} albums in library")
- tracks = await mass.music.tracks.count()
- print(f"Got {tracks} tracks in library")
- radios = await mass.music.radio.count()
- print(f"Got {radios} radio stations in library")
- playlists = await mass.music.playlists.library()
- print(f"Got {len(playlists)} playlists in library")
+ artist_count = await mass.music.artists.count()
+ artist_count_lib = await mass.music.artists.count(True)
+ print(f"Got {artist_count} artists ({artist_count_lib} in library)")
+ album_count = await mass.music.albums.count()
+ album_count_lib = await mass.music.albums.count(True)
+ print(f"Got {album_count} albums ({album_count_lib} in library)")
+ track_count = await mass.music.tracks.count()
+ track_count_lib = await mass.music.tracks.count(True)
+ print(f"Got {track_count} tracks ({track_count_lib} in library)")
+ radio_count = await mass.music.radio.count(True)
+ print(f"Got {radio_count} radio stations in library")
+ playlist_count = await mass.music.playlists.library(True)
+ print(f"Got {len(playlist_count)} playlists in library")
# register a player
test_player1 = TestPlayer("test1")
test_player2 = TestPlayer("test2")
await mass.players.register_player(test_player1)
await mass.players.register_player(test_player2)
- # get full artist details
- await mass.music.artists.get("6", ProviderType.DATABASE)
- await mass.music.artists.albums("6", ProviderType.DATABASE)
- await mass.music.artists.toptracks("6", ProviderType.DATABASE)
- # try to play some playlist
+ # try to play some music
test_player1.active_queue.settings.shuffle_enabled = True
test_player1.active_queue.settings.repeat_mode = RepeatMode.ALL
- if len(playlists) > 0:
- await test_player1.active_queue.play_media(playlists[0].uri)
+
+ # we can send a MediaItem object (such as Artist, Album, Track, Playlist)
+ # we can also send an uri, such as spotify://track/abcdfefgh
+ # or database://playlist/1
+ # or a list of items
+ artist = await mass.music.artists.get("2", ProviderType.DATABASE)
+ await test_player1.active_queue.play_media(artist)
await asyncio.sleep(3600)
async def get_radio_metadata(self, radio: Radio) -> None:
"""Get/update rich metadata for a radio station."""
- # NOTE: we do not have any metadata for radiso so consider this future proofing ;-)
+ # NOTE: we do not have any metadata for radio so consider this future proofing ;-)
radio.metadata.last_refresh = int(time())
async def get_artist_musicbrainz_id(self, artist: Artist) -> str | None:
"""Fetch musicbrainz id by performing search using the artist name, albums and tracks."""
- ref_albums = await self.mass.music.artists.get_provider_artist_albums(
- artist.item_id, artist.provider
- )
+ ref_albums = await self.mass.music.artists.albums(artist=artist)
# first try audiodb
if musicbrainz_id := await self.audiodb.get_musicbrainz_id(artist, ref_albums):
return musicbrainz_id
return musicbrainz_id
# try again with matching on track isrc
- ref_tracks = await self.mass.music.artists.toptracks(
- artist.item_id, artist.provider
- )
+ ref_tracks = await self.mass.music.artists.toptracks(artist=artist)
for ref_track in ref_tracks:
- if not ref_track.isrc:
- continue
- if musicbrainz_id := await self.musicbrainz.get_mb_artist_id(
- artist.name,
- track_isrc=ref_track.isrc,
- ):
- return musicbrainz_id
+ for isrc in ref_track.isrcs:
+ if musicbrainz_id := await self.musicbrainz.get_mb_artist_id(
+ artist.name,
+ track_isrc=isrc,
+ ):
+ return musicbrainz_id
# last restort: track matching by name
for ref_track in ref_tracks:
musicbrainz_id = None
if data := await self._get_data("searchalbum.php", s=artist.name):
# NOTE: object is 'null' when no records found instead of empty array
- for item in data.get("album", []) or []:
+ albums = data.get("album") or []
+ for item in albums:
if not compare_strings(item["strArtistStripped"], artist.name):
continue
for ref_album in ref_albums:
from music_assistant.helpers.cache import use_cache
from music_assistant.helpers.compare import compare_strings
-from music_assistant.helpers.util import create_clean_string
+from music_assistant.helpers.util import create_sort_name
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
mb_id,
)
return mb_id
- for strictness in (True, False):
- if albumname:
- if mb_id := await self.search_artist_by_album(
- artistname, albumname, strict=strictness
- ):
- self.logger.debug(
- "Got MusicbrainzArtistId for %s after search on albumname %s --> %s",
- artistname,
- albumname,
- mb_id,
- )
- return mb_id
- if trackname:
- if mb_id := await self.search_artist_by_track(
- artistname, trackname, strict=strictness
- ):
- self.logger.debug(
- "Got MusicbrainzArtistId for %s after search on trackname %s --> %s",
- artistname,
- trackname,
- mb_id,
- )
- return mb_id
+ if albumname:
+ if mb_id := await self.search_artist_by_album(artistname, albumname):
+ self.logger.debug(
+ "Got MusicbrainzArtistId for %s after search on albumname %s --> %s",
+ artistname,
+ albumname,
+ mb_id,
+ )
+ return mb_id
+ if trackname:
+ if mb_id := await self.search_artist_by_track(artistname, trackname):
+ self.logger.debug(
+ "Got MusicbrainzArtistId for %s after search on trackname %s --> %s",
+ artistname,
+ trackname,
+ mb_id,
+ )
+ return mb_id
return None
- async def search_artist_by_album(
- self, artistname, albumname=None, album_upc=None, strict=True
- ):
+ async def search_artist_by_album(self, artistname, albumname=None, album_upc=None):
"""Retrieve musicbrainz artist id by providing the artist name and albumname or upc."""
- for searchartist in [
- re.sub(LUCENE_SPECIAL, r"\\\1", artistname),
- create_clean_string(artistname),
+ for searchartist in (
artistname,
- ]:
+ re.sub(LUCENE_SPECIAL, r"\\\1", create_sort_name(artistname)),
+ ):
if album_upc:
# search by album UPC (barcode)
query = f"barcode:{album_upc}"
else:
# search by name
searchalbum = re.sub(LUCENE_SPECIAL, r"\\\1", albumname)
- if strict:
- query = f'artist:"{searchartist}" AND release:"{searchalbum}"'
- else:
- query = f'release:"{searchalbum}"'
+ query = f'artist:"{searchartist}" AND release:"{searchalbum}"'
result = await self.get_data("release", query=query)
if result and "releases" in result:
+ for strict in (True, False):
+ for item in result["releases"]:
+ if not (
+ album_upc
+ or compare_strings(item["title"], albumname, strict)
+ ):
+ continue
+ for artist in item["artist-credit"]:
+ if compare_strings(
+ artist["artist"]["name"], artistname, strict
+ ):
+ return artist["artist"]["id"]
+ for alias in artist.get("aliases", []):
+ if compare_strings(alias["name"], artistname, strict):
+ return artist["id"]
+ return ""
- for item in result["releases"]:
+ async def search_artist_by_track(self, artistname, trackname=None, track_isrc=None):
+ """Retrieve artist id by providing the artist name and trackname or track isrc."""
+ searchartist = re.sub(LUCENE_SPECIAL, r"\\\1", artistname)
+ if track_isrc:
+ result = await self.get_data(f"isrc/{track_isrc}", inc="artist-credits")
+ else:
+ searchtrack = re.sub(LUCENE_SPECIAL, r"\\\1", trackname)
+ result = await self.get_data(
+ "recording", query=f'"{searchtrack}" AND artist:"{searchartist}"'
+ )
+ if result and "recordings" in result:
+ for strict in (True, False):
+ for item in result["recordings"]:
if not (
- album_upc or compare_strings(item["title"], albumname, strict)
+ track_isrc or compare_strings(item["title"], trackname, strict)
):
continue
for artist in item["artist-credit"]:
return artist["id"]
return ""
- async def search_artist_by_track(
- self, artistname, trackname=None, track_isrc=None, strict=True
- ):
- """Retrieve artist id by providing the artist name and trackname or track isrc."""
- searchartist = re.sub(LUCENE_SPECIAL, r"\\\1", artistname)
- if track_isrc:
- result = await self.get_data(f"isrc/{track_isrc}", inc="artist-credits")
- else:
- searchtrack = re.sub(LUCENE_SPECIAL, r"\\\1", trackname)
- if strict:
- result = await self.get_data(
- "recording", query=f'"{searchtrack}" AND artist:"{searchartist}"'
- )
- else:
- result = await self.get_data("recording", query=f'"{searchtrack}"')
- if result and "recordings" in result:
- for item in result["recordings"]:
- if not (
- track_isrc or compare_strings(item["title"], trackname, strict)
- ):
- continue
- for artist in item["artist-credit"]:
- if compare_strings(artist["artist"]["name"], artistname, strict):
- return artist["artist"]["id"]
- for alias in artist.get("aliases", []):
- if compare_strings(alias["name"], artistname, strict):
- return artist["id"]
- return ""
-
async def search_artist_by_album_mbid(
self, artistname, album_mbid: str
) -> str | None:
"""Retrieve musicbrainz artist id by providing the artist name and albumname or upc."""
result = await self.get_data(f"release-group/{album_mbid}?inc=artist-credits")
if result and "artist-credit" in result:
- for strictness in [True, False]:
- for item in result["artist-credit"]:
- if artist := item.get("artist"):
- if compare_strings(artistname, artist["name"], strictness):
- return artist["id"]
+ for item in result["artist-credit"]:
+ if artist := item.get("artist"):
+ if compare_strings(artistname, artist["name"]):
+ return artist["id"]
return None
@use_cache(86400 * 30)
from music_assistant.helpers.uri import parse_uri
from music_assistant.models.config import MusicProviderConfig
from music_assistant.models.enums import MediaType, ProviderType
-from music_assistant.models.errors import MusicAssistantError, SetupFailedError
+from music_assistant.models.errors import (
+ MusicAssistantError,
+ ProviderUnavailableError,
+ SetupFailedError,
+)
from music_assistant.models.media_items import MediaItem, MediaItemType, media_from_dict
from music_assistant.models.music_provider import MusicProvider
from music_assistant.music_providers.filesystem import FileSystemProvider
"""Return all (available) music providers."""
return tuple(x for x in self._providers.values() if x.available)
- def get_provider(
- self, provider_id: Union[str, ProviderType]
- ) -> MusicProvider | None:
+ def get_provider(self, provider_id: Union[str, ProviderType]) -> MusicProvider:
"""Return Music provider by id (or type)."""
if prov := self._providers.get(provider_id):
return prov
for prov in self._providers.values():
if provider_id in (prov.type, prov.id, prov.type.value):
return prov
- self.logger.warning("Provider %s is not available", provider_id)
- return None
+ raise ProviderUnavailableError(f"Provider {provider_id} is not available")
async def search(
self, search_query, media_types: List[MediaType], limit: int = 10
cur_providers = list(self._providers.keys())
removed_providers = {x for x in prev_providers if x not in cur_providers}
- async with self.mass.database.get_db() as db:
- for prov_id in removed_providers:
-
- # clean cache items from deleted provider(s)
- await self.mass.database.delete_where_query(
- TABLE_CACHE, f"key LIKE '%{prov_id}%'", db=db
- )
+ for prov_id in removed_providers:
- # cleanup media items from db matched to deleted provider
- for ctrl in (
- self.mass.music.artists,
- self.mass.music.albums,
- self.mass.music.tracks,
- self.mass.music.radio,
- self.mass.music.playlists,
- ):
- prov_items = await ctrl.get_db_items_by_prov_id(
- provider_id=prov_id, db=db
- )
- for item in prov_items:
- await ctrl.remove_prov_mapping(item.item_id, prov_id, db=db)
+ # clean cache items from deleted provider(s)
+ await self.mass.database.delete_where_query(
+ TABLE_CACHE, f"key LIKE '%{prov_id}%'"
+ )
+ # cleanup media items from db matched to deleted provider
+ for ctrl in (
+ self.mass.music.artists,
+ self.mass.music.albums,
+ self.mass.music.tracks,
+ self.mass.music.radio,
+ self.mass.music.playlists,
+ ):
+ prov_items = await ctrl.get_db_items_by_prov_id(provider_id=prov_id)
+ for item in prov_items:
+ await ctrl.remove_prov_mapping(item.item_id, prov_id)
await self.mass.cache.set("prov_ids", cur_providers)
import itertools
from typing import Any, Dict, List, Optional, Union
-from databases import Database as Db
-
from music_assistant.helpers.compare import compare_album, compare_artist
from music_assistant.helpers.database import TABLE_ALBUMS, TABLE_TRACKS
from music_assistant.helpers.json import json_serializer
+from music_assistant.helpers.tags import FALLBACK_ARTIST
from music_assistant.models.enums import EventType, ProviderType
from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
db_album = await self.get_db_item(item_id)
coros = [
self.get_provider_album_tracks(
- item.item_id, item.prov_id, cache_checksum=db_album.metadata.checksum
+ item.item_id, item.prov_type, cache_checksum=db_album.metadata.checksum
)
for item in db_album.provider_ids
]
and compare_artist(prov_item.artist, album.artist)
]
- async def add(self, item: Album) -> Album:
+ async def add(self, item: Album, overwrite_existing: bool = False) -> Album:
"""Add album to local db and return the database item."""
# grab additional metadata
await self.mass.metadata.get_album_metadata(item)
- db_item = await self.add_db_item(item)
+ db_item = await self.add_db_item(item, overwrite_existing)
# also fetch same album on all providers
await self._match(db_item)
db_item = await self.get_db_item(db_item.item_id)
)
return items
- async def add_db_item(
- self, item: Album, overwrite_existing: bool = False, db: Optional[Db] = None
- ) -> Album:
+ async def add_db_item(self, item: Album, overwrite_existing: bool = False) -> Album:
"""Add a new record to the database."""
assert item.provider_ids, f"Album {item.name} is missing provider id(s)"
assert item.artist, f"Album {item.name} is missing artist"
cur_item = None
- async with self.mass.database.get_db(db) as db:
- # always try to grab existing item by musicbrainz_id
- if item.musicbrainz_id:
- match = {"musicbrainz_id": item.musicbrainz_id}
- cur_item = await self.mass.database.get_row(self.db_table, match, db=db)
- if not cur_item and item.upc:
- match = {"upc": item.upc}
- cur_item = await self.mass.database.get_row(self.db_table, match, db=db)
- if not cur_item:
- # fallback to matching
- match = {"sort_name": item.sort_name}
- for row in await self.mass.database.get_rows(
- self.db_table, match, db=db
- ):
- row_album = Album.from_db_row(row)
- if compare_album(row_album, item):
- cur_item = row_album
- break
- if cur_item:
- # update existing
- return await self.update_db_item(
- cur_item.item_id, item, overwrite=overwrite_existing, db=db
- )
-
- # insert new item
- album_artists = await self._get_album_artists(item, cur_item, db=db)
- new_item = await self.mass.database.insert(
- self.db_table,
- {
- **item.to_db_row(),
- "artists": json_serializer(album_artists) or None,
- },
- db=db,
- )
- item_id = new_item["item_id"]
- self.logger.debug("added %s to database", item.name)
- # return created object
- db_item = await self.get_db_item(item_id, db=db)
- self.mass.signal_event(
- MassEvent(
- EventType.MEDIA_ITEM_ADDED, object_id=db_item.uri, data=db_item
- )
+ # always try to grab existing item by musicbrainz_id/upc
+ if item.musicbrainz_id:
+ match = {"musicbrainz_id": item.musicbrainz_id}
+ cur_item = await self.mass.database.get_row(self.db_table, match)
+ if not cur_item and item.upc:
+ match = {"upc": item.upc}
+ cur_item = await self.mass.database.get_row(self.db_table, match)
+ if not cur_item:
+ # fallback to search and match
+ for row in await self.mass.database.search(self.db_table, item.name):
+ row_album = Album.from_db_row(row)
+ if compare_album(row_album, item):
+ cur_item = row_album
+ break
+ if cur_item:
+ # update existing
+ return await self.update_db_item(
+ cur_item.item_id, item, overwrite=overwrite_existing
)
- return db_item
+
+ # insert new item
+ album_artists = await self._get_album_artists(item, cur_item)
+ new_item = await self.mass.database.insert(
+ self.db_table,
+ {
+ **item.to_db_row(),
+ "artists": json_serializer(album_artists) or None,
+ },
+ )
+ item_id = new_item["item_id"]
+ self.logger.debug("added %s to database", item.name)
+ # return created object
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_ADDED, object_id=db_item.uri, data=db_item)
+ )
+ return db_item
async def update_db_item(
self,
item_id: int,
item: Album,
overwrite: bool = False,
- db: Optional[Db] = None,
) -> Album:
"""Update Album record in the database."""
assert item.provider_ids, f"Album {item.name} is missing provider id(s)"
assert item.artist, f"Album {item.name} is missing artist"
- async with self.mass.database.get_db(db) as db:
- cur_item = await self.get_db_item(item_id)
+ cur_item = await self.get_db_item(item_id)
- if overwrite:
- metadata = item.metadata
- metadata.last_refresh = None
- provider_ids = item.provider_ids
- album_artists = await self._get_album_artists(cur_item, db=db)
- else:
- metadata = cur_item.metadata.update(item.metadata)
- provider_ids = {*cur_item.provider_ids, *item.provider_ids}
- album_artists = await self._get_album_artists(item, cur_item, db=db)
+ if overwrite:
+ metadata = item.metadata
+ metadata.last_refresh = None
+ provider_ids = item.provider_ids
+ album_artists = await self._get_album_artists(cur_item)
+ else:
+ metadata = cur_item.metadata.update(item.metadata)
+ provider_ids = {*cur_item.provider_ids, *item.provider_ids}
+ album_artists = await self._get_album_artists(item, cur_item)
- if item.album_type != AlbumType.UNKNOWN:
- album_type = item.album_type
- else:
- album_type = cur_item.album_type
+ if item.album_type != AlbumType.UNKNOWN:
+ album_type = item.album_type
+ else:
+ album_type = cur_item.album_type
- await self.mass.database.update(
- self.db_table,
- {"item_id": item_id},
- {
- "name": item.name if overwrite else cur_item.name,
- "sort_name": item.sort_name if overwrite else cur_item.sort_name,
- "version": item.version if overwrite else cur_item.version,
- "year": item.year or cur_item.year,
- "upc": item.upc or cur_item.upc,
- "album_type": album_type.value,
- "artists": json_serializer(album_artists) or None,
- "metadata": json_serializer(metadata),
- "provider_ids": json_serializer(provider_ids),
- },
- db=db,
- )
- self.logger.debug("updated %s in database: %s", item.name, item_id)
- db_item = await self.get_db_item(item_id, db=db)
- self.mass.signal_event(
- MassEvent(
- EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item
- )
- )
- return db_item
+ await self.mass.database.update(
+ self.db_table,
+ {"item_id": item_id},
+ {
+ "name": item.name if overwrite else cur_item.name,
+ "sort_name": item.sort_name if overwrite else cur_item.sort_name,
+ "version": item.version if overwrite else cur_item.version,
+ "year": item.year or cur_item.year,
+ "upc": item.upc or cur_item.upc,
+ "album_type": album_type.value,
+ "artists": json_serializer(album_artists) or None,
+ "metadata": json_serializer(metadata),
+ "provider_ids": json_serializer(provider_ids),
+ "musicbrainz_id": item.musicbrainz_id or cur_item.musicbrainz_id,
+ },
+ )
+ self.logger.debug("updated %s in database: %s", item.name, item_id)
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item)
+ )
+ return db_item
- async def delete_db_item(self, item_id: int, db: Optional[Db] = None) -> None:
+ async def delete_db_item(self, item_id: int) -> None:
"""Delete record from the database."""
# delete tracks connected to this album
- async with self.mass.database.get_db(db) as db:
- await self.mass.database.delete_where_query(
- TABLE_TRACKS, f"albums LIKE '%\"{item_id}\"%'", db=db
- )
+ await self.mass.database.delete_where_query(
+ TABLE_TRACKS, f"albums LIKE '%\"{item_id}\"%'"
+ )
# delete the album itself from db
- await super().delete_db_item(item_id, db)
+ await super().delete_db_item(item_id)
self.logger.debug("deleted item with id %s from database", item_id)
self,
db_album: Album,
updated_album: Optional[Album] = None,
- db: Optional[Db] = None,
) -> List[ItemMapping]:
"""Extract (database) album artist(s) as ItemMapping."""
album_artists = set()
if not album:
continue
for artist in album.artists:
- album_artists.add(await self._get_artist_mapping(artist, db=db))
+ album_artists.add(await self._get_artist_mapping(artist))
# use intermediate set to prevent duplicates
+ # filter various artists if multiple artists
+ if len(album_artists) > 1:
+ album_artists = {x for x in album_artists if x.name != FALLBACK_ARTIST}
return list(album_artists)
async def _get_artist_mapping(
- self, artist: Union[Artist, ItemMapping], db: Optional[Db] = None
+ self, artist: Union[Artist, ItemMapping]
) -> ItemMapping:
"""Extract (database) track artist as ItemMapping."""
if artist.provider == ProviderType.DATABASE:
return ItemMapping.from_item(artist)
if db_artist := await self.mass.music.artists.get_db_item_by_prov_id(
- artist.item_id, provider=artist.provider, db=db
+ artist.item_id, provider=artist.provider
):
return ItemMapping.from_item(db_artist)
- db_artist = await self.mass.music.artists.add_db_item(artist, db=db)
+ db_artist = await self.mass.music.artists.add_db_item(artist)
return ItemMapping.from_item(db_artist)
import itertools
from typing import Any, Dict, List, Optional
-from databases import Database as Db
-
from music_assistant.helpers.database import TABLE_ALBUMS, TABLE_ARTISTS, TABLE_TRACKS
from music_assistant.helpers.json import json_serializer
from music_assistant.models.enums import EventType, ProviderType
async def toptracks(
self,
- item_id: str,
+ item_id: Optional[str] = None,
provider: Optional[ProviderType] = None,
provider_id: Optional[str] = None,
+ artist: Optional[Artist] = None,
) -> List[Track]:
"""Return top tracks for an artist."""
- artist = await self.get(item_id, provider, provider_id)
+ if not artist:
+ artist = await self.get(item_id, provider, provider_id)
# get results from all providers
coros = [
self.get_provider_artist_toptracks(
- item.item_id, item.prov_id, cache_checksum=artist.metadata.checksum
+ item.item_id,
+ provider=item.prov_type,
+ provider_id=item.prov_id,
+ cache_checksum=artist.metadata.checksum,
)
for item in artist.provider_ids
]
async def albums(
self,
- item_id: str,
+ item_id: Optional[str] = None,
provider: Optional[ProviderType] = None,
provider_id: Optional[str] = None,
+ artist: Optional[Artist] = None,
) -> List[Album]:
"""Return (all/most popular) albums for an artist."""
- artist = await self.get(item_id, provider, provider_id)
+ if not artist:
+ artist = await self.get(item_id, provider, provider_id)
# get results from all providers
coros = [
- self.get_provider_artist_albums(item.item_id, item.prov_id)
+ self.get_provider_artist_albums(
+ item.item_id, item.prov_type, cache_checksum=artist.metadata.checksum
+ )
for item in artist.provider_ids
]
albums = itertools.chain.from_iterable(await asyncio.gather(*coros))
final_items[key].in_library = True
return list(final_items.values())
- async def add(self, item: Artist) -> Artist:
+ async def add(self, item: Artist, overwrite_existing: bool = False) -> Artist:
"""Add artist to local db and return the database item."""
# grab musicbrainz id and additional metadata
await self.mass.metadata.get_artist_metadata(item)
- db_item = await self.add_db_item(item)
+ db_item = await self.add_db_item(item, overwrite_existing)
# also fetch same artist on all providers
await self.match_artist(db_item)
db_item = await self.get_db_item(db_item.item_id)
return items
async def add_db_item(
- self, item: Artist, overwrite_existing: bool = False, db: Optional[Db] = None
+ self, item: Artist, overwrite_existing: bool = False
) -> Artist:
"""Add a new item record to the database."""
assert item.provider_ids, "Album is missing provider id(s)"
- async with self.mass.database.get_db(db) as db:
- # always try to grab existing item by musicbrainz_id
- cur_item = None
- if item.musicbrainz_id:
- match = {"musicbrainz_id": item.musicbrainz_id}
- cur_item = await self.mass.database.get_row(self.db_table, match, db=db)
- if not cur_item:
- # fallback to matching
- # NOTE: we match an artist by name which could theoretically lead to collisions
- # but the chance is so small it is not worth the additional overhead of grabbing
- # the musicbrainz id upfront
- match = {"sort_name": item.sort_name}
- for row in await self.mass.database.get_rows(
- self.db_table, match, db=db
- ):
- row_artist = Artist.from_db_row(row)
- if row_artist.sort_name == item.sort_name:
- # just to be sure ?!
- cur_item = row_artist
- break
- if cur_item:
- # update existing
- return await self.update_db_item(
- cur_item.item_id, item, overwrite=overwrite_existing, db=db
- )
-
- # insert item
- new_item = await self.mass.database.insert(
- self.db_table, item.to_db_row(), db=db
+ # always try to grab existing item by musicbrainz_id
+ cur_item = None
+ if item.musicbrainz_id:
+ match = {"musicbrainz_id": item.musicbrainz_id}
+ cur_item = await self.mass.database.get_row(self.db_table, match)
+ if not cur_item:
+ # fallback to exact name match
+ # NOTE: we match an artist by name which could theoretically lead to collisions
+ # but the chance is so small it is not worth the additional overhead of grabbing
+ # the musicbrainz id upfront
+ match = {"sort_name": item.sort_name}
+ for row in await self.mass.database.get_rows(self.db_table, match):
+ row_artist = Artist.from_db_row(row)
+ if row_artist.sort_name == item.sort_name:
+ # just to be sure ?!
+ cur_item = row_artist
+ break
+ if cur_item:
+ # update existing
+ return await self.update_db_item(
+ cur_item.item_id, item, overwrite=overwrite_existing
)
- item_id = new_item["item_id"]
- self.logger.debug("added %s to database", item.name)
- # return created object
- db_item = await self.get_db_item(item_id, db=db)
- self.mass.signal_event(
- MassEvent(
- EventType.MEDIA_ITEM_ADDED, object_id=db_item.uri, data=db_item
- )
- )
- return db_item
+
+ # insert item
+ new_item = await self.mass.database.insert(self.db_table, item.to_db_row())
+ item_id = new_item["item_id"]
+ self.logger.debug("added %s to database", item.name)
+ # return created object
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_ADDED, object_id=db_item.uri, data=db_item)
+ )
+ return db_item
async def update_db_item(
self,
item_id: int,
item: Artist,
overwrite: bool = False,
- db: Optional[Db] = None,
) -> Artist:
"""Update Artist record in the database."""
cur_item = await self.get_db_item(item_id)
metadata = cur_item.metadata.update(item.metadata)
provider_ids = {*cur_item.provider_ids, *item.provider_ids}
- async with self.mass.database.get_db(db) as db:
- await self.mass.database.update(
- self.db_table,
- {"item_id": item_id},
- {
- "name": item.name if overwrite else cur_item.name,
- "sort_name": item.sort_name if overwrite else cur_item.sort_name,
- "musicbrainz_id": item.musicbrainz_id or cur_item.musicbrainz_id,
- "metadata": json_serializer(metadata),
- "provider_ids": json_serializer(provider_ids),
- },
- db=db,
- )
- self.logger.debug("updated %s in database: %s", item.name, item_id)
- db_item = await self.get_db_item(item_id, db=db)
- self.mass.signal_event(
- MassEvent(
- EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item
- )
- )
- return db_item
+ await self.mass.database.update(
+ self.db_table,
+ {"item_id": item_id},
+ {
+ "name": item.name if overwrite else cur_item.name,
+ "sort_name": item.sort_name if overwrite else cur_item.sort_name,
+ "musicbrainz_id": item.musicbrainz_id or cur_item.musicbrainz_id,
+ "metadata": json_serializer(metadata),
+ "provider_ids": json_serializer(provider_ids),
+ },
+ )
+ self.logger.debug("updated %s in database: %s", item.name, item_id)
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item)
+ )
+ return db_item
- async def delete_db_item(self, item_id: int, db: Optional[Db] = None) -> None:
+ async def delete_db_item(self, item_id: int) -> None:
"""Delete record from the database."""
# delete tracks/albums connected to this artist
- async with self.mass.database.get_db(db) as db:
- await self.mass.database.delete_where_query(
- TABLE_TRACKS, f"artists LIKE '%\"{item_id}\"%'", db=db
- )
- await self.mass.database.delete_where_query(
- TABLE_ALBUMS, f"artists LIKE '%\"{item_id}\"%'", db=db
- )
+ await self.mass.database.delete_where_query(
+ TABLE_TRACKS, f"artists LIKE '%\"{item_id}\"%'"
+ )
+ await self.mass.database.delete_where_query(
+ TABLE_ALBUMS, f"artists LIKE '%\"{item_id}\"%'"
+ )
# delete the artist itself from db
- await super().delete_db_item(item_id, db)
+ await super().delete_db_item(item_id)
self.logger.debug("deleted item with id %s from database", item_id)
"Trying to match artist %s on provider %s", db_artist.name, provider.name
)
# try to get a match with some reference tracks of this artist
- for ref_track in await self.toptracks(db_artist.item_id, db_artist.provider):
+ for ref_track in await self.toptracks(
+ db_artist.item_id, db_artist.provider, artist=db_artist
+ ):
# make sure we have a full track
if isinstance(ref_track.album, ItemMapping):
ref_track = await self.mass.music.tracks.get(
await self.update_db_item(db_artist.item_id, prov_artist)
return True
# try to get a match with some reference albums of this artist
- artist_albums = await self.albums(db_artist.item_id, db_artist.provider)
+ artist_albums = await self.albums(
+ db_artist.item_id, db_artist.provider, artist=db_artist
+ )
for ref_album in artist_albums:
if ref_album.album_type == AlbumType.COMPILATION:
continue
from time import time
from typing import List, Optional
-from databases import Database as Db
-
from music_assistant.helpers.database import TABLE_PLAYLISTS
from music_assistant.helpers.json import json_serializer
from music_assistant.helpers.uri import create_uri
)
return items
- async def add(self, item: Playlist) -> Playlist:
+ async def add(self, item: Playlist, overwrite_existing: bool = False) -> Playlist:
"""Add playlist to local db and return the new database item."""
item.metadata.last_refresh = int(time())
- await self.mass.metadata.get_playlist_metadata(item)
+ await self.mass.metadata.get_playlist_metadata(item, overwrite_existing)
return await self.add_db_item(item)
async def add_playlist_tracks(self, db_playlist_id: str, uris: List[str]) -> None:
)
async def add_db_item(
- self, item: Playlist, overwrite_existing: bool = False, db: Optional[Db] = None
+ self, item: Playlist, overwrite_existing: bool = False
) -> Playlist:
"""Add a new record to the database."""
- async with self.mass.database.get_db(db) as db:
- match = {"name": item.name, "owner": item.owner}
- if cur_item := await self.mass.database.get_row(
- self.db_table, match, db=db
- ):
- # update existing
- return await self.update_db_item(
- cur_item["item_id"], item, overwrite=overwrite_existing, db=db
- )
-
- # insert new item
- new_item = await self.mass.database.insert(
- self.db_table, item.to_db_row(), db=db
+ match = {"name": item.name, "owner": item.owner}
+ if cur_item := await self.mass.database.get_row(self.db_table, match):
+ # update existing
+ return await self.update_db_item(
+ cur_item["item_id"], item, overwrite=overwrite_existing
)
- item_id = new_item["item_id"]
- self.logger.debug("added %s to database", item.name)
- # return created object
- db_item = await self.get_db_item(item_id, db=db)
- self.mass.signal_event(
- MassEvent(
- EventType.MEDIA_ITEM_ADDED, object_id=db_item.uri, data=db_item
- )
- )
- return db_item
+
+ # insert new item
+ new_item = await self.mass.database.insert(self.db_table, item.to_db_row())
+ item_id = new_item["item_id"]
+ self.logger.debug("added %s to database", item.name)
+ # return created object
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_ADDED, object_id=db_item.uri, data=db_item)
+ )
+ return db_item
async def update_db_item(
self,
item_id: int,
item: Playlist,
overwrite: bool = False,
- db: Optional[Db] = None,
) -> Playlist:
"""Update Playlist record in the database."""
- async with self.mass.database.get_db(db) as db:
-
- cur_item = await self.get_db_item(item_id, db=db)
- if overwrite:
- metadata = item.metadata
- provider_ids = item.provider_ids
- else:
- metadata = cur_item.metadata.update(item.metadata)
- provider_ids = {*cur_item.provider_ids, *item.provider_ids}
-
- await self.mass.database.update(
- self.db_table,
- {"item_id": item_id},
- {
- "name": item.name,
- "sort_name": item.sort_name,
- "owner": item.owner,
- "is_editable": item.is_editable,
- "metadata": json_serializer(metadata),
- "provider_ids": json_serializer(provider_ids),
- },
- db=db,
- )
- self.logger.debug("updated %s in database: %s", item.name, item_id)
- db_item = await self.get_db_item(item_id, db=db)
- self.mass.signal_event(
- MassEvent(
- EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item
- )
- )
- return db_item
+ cur_item = await self.get_db_item(item_id)
+ if overwrite:
+ metadata = item.metadata
+ provider_ids = item.provider_ids
+ else:
+ metadata = cur_item.metadata.update(item.metadata)
+ provider_ids = {*cur_item.provider_ids, *item.provider_ids}
+
+ await self.mass.database.update(
+ self.db_table,
+ {"item_id": item_id},
+ {
+ "name": item.name,
+ "sort_name": item.sort_name,
+ "owner": item.owner,
+ "is_editable": item.is_editable,
+ "metadata": json_serializer(metadata),
+ "provider_ids": json_serializer(provider_ids),
+ },
+ )
+ self.logger.debug("updated %s in database: %s", item.name, item_id)
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item)
+ )
+ return db_item
from __future__ import annotations
from time import time
-from typing import Optional
-
-from databases import Database as Db
from music_assistant.helpers.database import TABLE_RADIOS
from music_assistant.helpers.json import json_serializer
"""Get in-library radio by name."""
return await self.mass.database.get_row(self.db_table, {"name": name})
- async def add(self, item: Radio) -> Radio:
+ async def add(self, item: Radio, overwrite_existing: bool = False) -> Radio:
"""Add radio to local db and return the new database item."""
item.metadata.last_refresh = int(time())
await self.mass.metadata.get_radio_metadata(item)
- return await self.add_db_item(item)
+ return await self.add_db_item(item, overwrite_existing)
- async def add_db_item(
- self, item: Radio, overwrite_existing: bool = False, db: Optional[Db] = None
- ) -> Radio:
+ async def add_db_item(self, item: Radio, overwrite_existing: bool = False) -> Radio:
"""Add a new item record to the database."""
assert item.provider_ids
- async with self.mass.database.get_db(db) as db:
- match = {"name": item.name}
- if cur_item := await self.mass.database.get_row(
- self.db_table, match, db=db
- ):
- # update existing
- return await self.update_db_item(
- cur_item["item_id"], item, overwrite=overwrite_existing, db=db
- )
-
- # insert new item
- new_item = await self.mass.database.insert(
- self.db_table, item.to_db_row(), db=db
- )
- item_id = new_item["item_id"]
- self.logger.debug("added %s to database", item.name)
- # return created object
- db_item = await self.get_db_item(item_id, db=db)
- self.mass.signal_event(
- MassEvent(
- EventType.MEDIA_ITEM_ADDED, object_id=db_item.uri, data=db_item
- )
+ match = {"name": item.name}
+ if cur_item := await self.mass.database.get_row(self.db_table, match):
+ # update existing
+ return await self.update_db_item(
+ cur_item["item_id"], item, overwrite=overwrite_existing
)
- return db_item
+
+ # insert new item
+ new_item = await self.mass.database.insert(self.db_table, item.to_db_row())
+ item_id = new_item["item_id"]
+ self.logger.debug("added %s to database", item.name)
+ # return created object
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_ADDED, object_id=db_item.uri, data=db_item)
+ )
+ return db_item
async def update_db_item(
self,
item_id: int,
item: Radio,
overwrite: bool = False,
- db: Optional[Db] = None,
) -> Radio:
"""Update Radio record in the database."""
- async with self.mass.database.get_db(db) as db:
- cur_item = await self.get_db_item(item_id, db=db)
- if overwrite:
- metadata = item.metadata
- provider_ids = item.provider_ids
- else:
- metadata = cur_item.metadata.update(item.metadata)
- provider_ids = {*cur_item.provider_ids, *item.provider_ids}
+ cur_item = await self.get_db_item(item_id)
+ if overwrite:
+ metadata = item.metadata
+ provider_ids = item.provider_ids
+ else:
+ metadata = cur_item.metadata.update(item.metadata)
+ provider_ids = {*cur_item.provider_ids, *item.provider_ids}
- match = {"item_id": item_id}
- await self.mass.database.update(
- self.db_table,
- match,
- {
- "name": item.name,
- "sort_name": item.sort_name,
- "metadata": json_serializer(metadata),
- "provider_ids": json_serializer(provider_ids),
- },
- db=db,
- )
- self.logger.debug("updated %s in database: %s", item.name, item_id)
- db_item = await self.get_db_item(item_id, db=db)
- self.mass.signal_event(
- MassEvent(
- EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item
- )
- )
- return db_item
+ match = {"item_id": item_id}
+ await self.mass.database.update(
+ self.db_table,
+ match,
+ {
+ "name": item.name,
+ "sort_name": item.sort_name,
+ "metadata": json_serializer(metadata),
+ "provider_ids": json_serializer(provider_ids),
+ },
+ )
+ self.logger.debug("updated %s in database: %s", item.name, item_id)
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item)
+ )
+ return db_item
import asyncio
from typing import List, Optional, Union
-from databases import Database as Db
-
from music_assistant.helpers.compare import compare_artists, compare_track
from music_assistant.helpers.database import TABLE_TRACKS
from music_assistant.helpers.json import json_serializer
track.artists = full_artists
return track
- async def add(self, item: Track) -> Track:
+ async def add(self, item: Track, overwrite_existing: bool = False) -> Track:
"""Add track to local db and return the new database item."""
# make sure we have artists
assert item.artists
# grab additional metadata
await self.mass.metadata.get_track_metadata(item)
- db_item = await self.add_db_item(item)
+ db_item = await self.add_db_item(item, overwrite_existing)
# also fetch same track on all providers (will also get other quality versions)
await self._match(db_item)
return await self.get_db_item(db_item.item_id)
provider.name,
)
- async def add_db_item(
- self, item: Track, overwrite_existing: bool = False, db: Optional[Db] = None
- ) -> Track:
+ async def add_db_item(self, item: Track, overwrite_existing: bool = False) -> Track:
"""Add a new item record to the database."""
assert item.artists, "Track is missing artist(s)"
assert item.provider_ids, "Track is missing provider id(s)"
cur_item = None
- async with self.mass.database.get_db(db) as db:
- # always try to grab existing item by external_id
- if item.musicbrainz_id:
- match = {"musicbrainz_id": item.musicbrainz_id}
- cur_item = await self.mass.database.get_row(self.db_table, match, db=db)
- if not cur_item and item.isrc:
- match = {"isrc": item.isrc}
- cur_item = await self.mass.database.get_row(self.db_table, match, db=db)
- if not cur_item:
- # fallback to matching
- match = {"sort_name": item.sort_name}
- for row in await self.mass.database.get_rows(
- self.db_table, match, db=db
- ):
- row_track = Track.from_db_row(row)
- if compare_track(row_track, item):
- cur_item = row_track
- break
- if cur_item:
- # update existing
- return await self.update_db_item(
- cur_item.item_id, item, overwrite=overwrite_existing, db=db
- )
-
- # no existing match found: insert new item
- track_artists = await self._get_track_artists(item, db=db)
- track_albums = await self._get_track_albums(
- item, overwrite=overwrite_existing, db=db
- )
- new_item = await self.mass.database.insert(
- self.db_table,
- {
- **item.to_db_row(),
- "artists": json_serializer(track_artists),
- "albums": json_serializer(track_albums),
- },
- db=db,
- )
- item_id = new_item["item_id"]
- # return created object
- self.logger.debug("added %s to database: %s", item.name, item_id)
- db_item = await self.get_db_item(item_id, db=db)
- self.mass.signal_event(
- MassEvent(
- EventType.MEDIA_ITEM_ADDED, object_id=db_item.uri, data=db_item
- )
+ # always try to grab existing item by external_id
+ if item.musicbrainz_id:
+ match = {"musicbrainz_id": item.musicbrainz_id}
+ cur_item = await self.mass.database.get_row(self.db_table, match)
+ for isrc in item.isrcs:
+ match = {"isrc": isrc}
+ cur_item = await self.mass.database.get_row(self.db_table, match)
+ if not cur_item:
+ # fallback to matching
+ match = {"sort_name": item.sort_name}
+ for row in await self.mass.database.get_rows(self.db_table, match):
+ row_track = Track.from_db_row(row)
+ if compare_track(row_track, item):
+ cur_item = row_track
+ break
+ if cur_item:
+ # update existing
+ return await self.update_db_item(
+ cur_item.item_id, item, overwrite=overwrite_existing
)
- return db_item
+
+ # no existing match found: insert new item
+ track_artists = await self._get_track_artists(item)
+ track_albums = await self._get_track_albums(item, overwrite=overwrite_existing)
+ new_item = await self.mass.database.insert(
+ self.db_table,
+ {
+ **item.to_db_row(),
+ "artists": json_serializer(track_artists),
+ "albums": json_serializer(track_albums),
+ },
+ )
+ item_id = new_item["item_id"]
+ # return created object
+ self.logger.debug("added %s to database: %s", item.name, item_id)
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_ADDED, object_id=db_item.uri, data=db_item)
+ )
+ return db_item
async def update_db_item(
self,
item_id: int,
item: Track,
overwrite: bool = False,
- db: Optional[Db] = None,
) -> Track:
"""Update Track record in the database, merging data."""
- async with self.mass.database.get_db(db) as db:
- cur_item = await self.get_db_item(item_id, db=db)
+ cur_item = await self.get_db_item(item_id)
- if overwrite:
- metadata = item.metadata
- provider_ids = item.provider_ids
- metadata.last_refresh = None
- # we store a mapping to artists/albums on the item for easier access/listings
- track_artists = await self._get_track_artists(item, db=db)
- track_albums = await self._get_track_albums(item, overwrite=True, db=db)
- else:
- metadata = cur_item.metadata.update(item.metadata, overwrite)
- provider_ids = {*cur_item.provider_ids, *item.provider_ids}
- track_artists = await self._get_track_artists(cur_item, item, db=db)
- track_albums = await self._get_track_albums(cur_item, item, db=db)
+ if overwrite:
+ metadata = item.metadata
+ provider_ids = item.provider_ids
+ metadata.last_refresh = None
+ # we store a mapping to artists/albums on the item for easier access/listings
+ track_artists = await self._get_track_artists(item)
+ track_albums = await self._get_track_albums(item, overwrite=True)
+ else:
+ metadata = cur_item.metadata.update(item.metadata, overwrite)
+ provider_ids = {*cur_item.provider_ids, *item.provider_ids}
+ track_artists = await self._get_track_artists(cur_item, item)
+ track_albums = await self._get_track_albums(cur_item, item)
- await self.mass.database.update(
- self.db_table,
- {"item_id": item_id},
- {
- "name": item.name if overwrite else cur_item.name,
- "sort_name": item.sort_name if overwrite else cur_item.sort_name,
- "version": item.version if overwrite else cur_item.version,
- "duration": item.duration if overwrite else cur_item.duration,
- "artists": json_serializer(track_artists),
- "albums": json_serializer(track_albums),
- "metadata": json_serializer(metadata),
- "provider_ids": json_serializer(provider_ids),
- "isrc": item.isrc or cur_item.isrc,
- },
- db=db,
- )
- self.logger.debug("updated %s in database: %s", item.name, item_id)
- db_item = await self.get_db_item(item_id, db=db)
- self.mass.signal_event(
- MassEvent(
- EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item
- )
- )
- return db_item
+ await self.mass.database.update(
+ self.db_table,
+ {"item_id": item_id},
+ {
+ "name": item.name if overwrite else cur_item.name,
+ "sort_name": item.sort_name if overwrite else cur_item.sort_name,
+ "version": item.version if overwrite else cur_item.version,
+ "duration": item.duration if overwrite else cur_item.duration,
+ "artists": json_serializer(track_artists),
+ "albums": json_serializer(track_albums),
+ "metadata": json_serializer(metadata),
+ "provider_ids": json_serializer(provider_ids),
+ "isrc": item.isrc or cur_item.isrc,
+ },
+ )
+ self.logger.debug("updated %s in database: %s", item.name, item_id)
+ db_item = await self.get_db_item(item_id)
+ self.mass.signal_event(
+ MassEvent(EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item)
+ )
+ return db_item
async def _get_track_artists(
self,
base_track: Track,
upd_track: Optional[Track] = None,
- db: Optional[Db] = None,
) -> List[ItemMapping]:
"""Extract all (unique) artists of track as ItemMapping."""
if upd_track and upd_track.artists:
else:
track_artists = base_track.artists
# use intermediate set to clear out duplicates
- return list({await self._get_artist_mapping(x, db=db) for x in track_artists})
+ return list({await self._get_artist_mapping(x) for x in track_artists})
async def _get_track_albums(
self,
base_track: Track,
upd_track: Optional[Track] = None,
overwrite: bool = False,
- db: Optional[Db] = None,
) -> List[TrackAlbumMapping]:
"""Extract all (unique) albums of track as TrackAlbumMapping."""
track_albums: List[TrackAlbumMapping] = []
# append update item album if needed
if upd_track and upd_track.album:
mapping = await self._get_album_mapping(
- upd_track.album, overwrite=overwrite, db=db
+ upd_track.album, overwrite=overwrite
)
mapping = TrackAlbumMapping.from_dict(
{
# append base item album if needed
elif base_track and base_track.album:
mapping = await self._get_album_mapping(
- base_track.album, overwrite=overwrite, db=db
+ base_track.album, overwrite=overwrite
)
mapping = TrackAlbumMapping.from_dict(
{
self,
album: Union[Album, ItemMapping],
overwrite: bool = False,
- db: Optional[Db] = None,
) -> ItemMapping:
"""Extract (database) album as ItemMapping."""
if album.provider == ProviderType.DATABASE:
return ItemMapping.from_item(album)
if db_album := await self.mass.music.albums.get_db_item_by_prov_id(
- album.item_id, provider=album.provider, db=db
+ album.item_id, provider=album.provider
):
return ItemMapping.from_item(db_album)
db_album = await self.mass.music.albums.add_db_item(
- album, overwrite_existing=overwrite, db=db
+ album, overwrite_existing=overwrite
)
return ItemMapping.from_item(db_album)
async def _get_artist_mapping(
- self, artist: Union[Artist, ItemMapping], db: Optional[Db] = None
+ self, artist: Union[Artist, ItemMapping]
) -> ItemMapping:
"""Extract (database) track artist as ItemMapping."""
if artist.provider == ProviderType.DATABASE:
return ItemMapping.from_item(artist)
if db_artist := await self.mass.music.artists.get_db_item_by_prov_id(
- artist.item_id, provider=artist.provider, db=db
+ artist.item_id, provider=artist.provider
):
return ItemMapping.from_item(db_artist)
- db_artist = await self.mass.music.artists.add_db_item(artist, db=db)
+ db_artist = await self.mass.music.artists.add_db_item(artist)
return ItemMapping.from_item(db_artist)
if db_row["expires"] < cur_timestamp:
await self.delete(db_row["key"])
# compact db
- async with self.mass.database.get_db() as _db:
- await _db.execute("VACUUM")
+ await self.mass.database.execute("VACUUM")
def __schedule_cleanup_task(self):
"""Schedule the cleanup task."""
if not skip_cache and cachedata is not None:
return cachedata
result = await func(*args, **kwargs)
- await method_class.cache.set(
- cache_key, result, expiration=expiration, checksum=cache_checksum
+ asyncio.create_task(
+ method_class.cache.set(
+ cache_key, result, expiration=expiration, checksum=cache_checksum
+ )
)
return result
from typing import List, Union
-from music_assistant.helpers.util import create_clean_string
+from music_assistant.helpers.util import create_safe_string, create_sort_name
from music_assistant.models.enums import AlbumType
from music_assistant.models.media_items import (
Album,
)
-def compare_strings(str1, str2, strict=False) -> bool:
+def compare_strings(str1: str, str2: str, strict: bool = True) -> bool:
"""Compare strings and return True if we have an (almost) perfect match."""
if str1 is None or str2 is None:
return False
if not strict:
- return create_clean_string(str1) == create_clean_string(str2)
- return str1.lower().strip() == str2.lower().strip()
+ return create_safe_string(str1) == create_safe_string(str2)
+ return create_sort_name(str1) == create_sort_name(str2)
def compare_version(left_version: str, right_version: str) -> bool:
# fallback to comparing
if not left_artist.sort_name:
- left_artist.sort_name = create_clean_string(left_artist.name)
+ left_artist.sort_name = create_sort_name(left_artist.name)
if not right_artist.sort_name:
- right_artist.sort_name = create_clean_string(right_artist.name)
+ right_artist.sort_name = create_sort_name(right_artist.name)
return left_artist.sort_name == right_artist.sort_name
# fallback to comparing
if not left_album.sort_name:
- left_album.sort_name = create_clean_string(left_album.name)
+ left_album.sort_name = create_sort_name(left_album.name)
if not right_album.sort_name:
- right_album.sort_name = create_clean_string(right_album.name)
+ right_album.sort_name = create_sort_name(right_album.name)
if left_album.sort_name != right_album.sort_name:
return False
if not compare_version(left_album.version, right_album.version):
# return early on exact item_id match
if compare_item_id(left_track, right_track):
return True
- if left_track.isrc and left_track.isrc == right_track.isrc:
- # ISRC is always 100% accurate match
- return True
+ for left_isrc in left_track.isrcs:
+ for right_isrc in right_track.isrcs:
+ # ISRC is always 100% accurate match
+ if left_isrc == right_isrc:
+ return True
if left_track.musicbrainz_id and right_track.musicbrainz_id:
if left_track.musicbrainz_id == right_track.musicbrainz_id:
# musicbrainz_id is always 100% accurate match
return False
# track name must match
if not left_track.sort_name:
- left_track.sort_name = create_clean_string(left_track.name)
+ left_track.sort_name = create_sort_name(left_track.name)
if not right_track.sort_name:
- right_track.sort_name = create_clean_string(right_track.name)
+ right_track.sort_name = create_sort_name(right_track.name)
if left_track.sort_name != right_track.sort_name:
return False
# exact albumtrack match = 100% match
"""Database logic."""
from __future__ import annotations
-from contextlib import asynccontextmanager
-from typing import TYPE_CHECKING, Any, AsyncGenerator, Dict, List, Mapping, Optional
+from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Union
from databases import Database as Db
+from sqlalchemy.sql import ClauseElement
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
-SCHEMA_VERSION = 17
+SCHEMA_VERSION = 18
TABLE_TRACK_LOUDNESS = "track_loudness"
TABLE_PLAYLOG = "playlog"
self.url = mass.config.database_url
self.mass = mass
self.logger = mass.logger.getChild("db")
+ # we maintain one global connection - otherwise we run into (dead)lock issues.
+ # https://github.com/encode/databases/issues/456
+ self._db = Db(self.url, timeout=360)
async def setup(self) -> None:
"""Perform async initialization."""
- async with self.get_db() as _db:
- await _db.execute(
- """CREATE TABLE IF NOT EXISTS settings(
- key TEXT PRIMARY KEY,
- value TEXT
- );"""
- )
+ await self._db.connect()
+ self.logger.info("Database connected.")
+ await self.execute(
+ """CREATE TABLE IF NOT EXISTS settings(
+ key TEXT PRIMARY KEY,
+ value TEXT
+ );"""
+ )
await self._migrate()
- @asynccontextmanager
- async def get_db(self, db: Optional[Db] = None) -> Db:
- """Context manager helper to get the active db connection."""
- if db is not None:
- yield db
- else:
- async with Db(self.url, timeout=360) as _db:
- yield _db
+ async def close(self) -> None:
+ """Close db connection on exit."""
+ self.logger.info("Database disconnected.")
+ await self._db.disconnect()
- async def get_setting(self, key: str, db: Optional[Db] = None) -> str | None:
+ async def get_setting(self, key: str) -> str | None:
"""Get setting from settings table."""
- return await self.get_row(TABLE_SETTINGS, {"key": key}, db=db)
+ return await self.get_row(TABLE_SETTINGS, {"key": key})
- async def set_setting(self, key: str, value: str, db: Optional[Db] = None) -> None:
+ async def set_setting(self, key: str, value: str) -> None:
"""Set setting in settings table."""
if not isinstance(value, str):
value = str(value)
self,
table: str,
match: dict = None,
- db: Optional[Db] = None,
) -> int:
"""Get row count for given table/query."""
- async with self.get_db(db) as _db:
- sql_query = f"SELECT count() FROM {table}"
- if match is not None:
- sql_query += " WHERE " + " AND ".join((f"{x} = :{x}" for x in match))
- if res := await _db.fetch_one(sql_query, match):
- return res["count()"]
- return 0
+ sql_query = f"SELECT count() FROM {table}"
+ if match is not None:
+ sql_query += " WHERE " + " AND ".join((f"{x} = :{x}" for x in match))
+ if res := await self._db.fetch_one(sql_query, match):
+ return res["count()"]
+ return 0
async def get_rows(
self,
order_by: str = None,
limit: int = 500,
offset: int = 0,
- db: Optional[Db] = None,
) -> List[Mapping]:
"""Get all rows for given table."""
- async with self.get_db(db) as _db:
- sql_query = f"SELECT * FROM {table}"
- if match is not None:
- sql_query += " WHERE " + " AND ".join((f"{x} = :{x}" for x in match))
- if order_by is not None:
- sql_query += f" ORDER BY {order_by}"
- sql_query += f" LIMIT {limit} OFFSET {offset}"
- return await _db.fetch_all(sql_query, match)
+ sql_query = f"SELECT * FROM {table}"
+ if match is not None:
+ sql_query += " WHERE " + " AND ".join((f"{x} = :{x}" for x in match))
+ if order_by is not None:
+ sql_query += f" ORDER BY {order_by}"
+ sql_query += f" LIMIT {limit} OFFSET {offset}"
+ return await self._db.fetch_all(sql_query, match)
async def get_rows_from_query(
self,
params: Optional[dict] = None,
limit: int = 500,
offset: int = 0,
- db: Optional[Db] = None,
) -> List[Mapping]:
"""Get all rows for given custom query."""
- async with self.get_db(db) as _db:
- query = f"{query} LIMIT {limit} OFFSET {offset}"
- return await _db.fetch_all(query, params)
-
- async def iterate_rows(
- self,
- table: str,
- match: dict = None,
- db: Optional[Db] = None,
- ) -> AsyncGenerator[Mapping, None]:
- """Iterate (all) rows for given table."""
- async with self.get_db(db) as _db:
- sql_query = f"SELECT * FROM {table}"
- if match is not None:
- sql_query += " WHERE " + " AND ".join((f"{x} = :{x}" for x in match))
- async for row in _db.iterate(sql_query, match):
- yield row
+ query = f"{query} LIMIT {limit} OFFSET {offset}"
+ return await self._db.fetch_all(query, params)
async def search(
- self, table: str, search: str, column: str = "name", db: Optional[Db] = None
+ self, table: str, search: str, column: str = "name"
) -> List[Mapping]:
"""Search table by column."""
- async with self.get_db(db) as _db:
- sql_query = f"SELECT * FROM {table} WHERE {column} LIKE :search"
- params = {"search": f"%{search}%"}
- return await _db.fetch_all(sql_query, params)
+ sql_query = f"SELECT * FROM {table} WHERE {column} LIKE :search"
+ params = {"search": f"%{search}%"}
+ return await self._db.fetch_all(sql_query, params)
- async def get_row(
- self, table: str, match: Dict[str, Any] = None, db: Optional[Db] = None
- ) -> Mapping | None:
+ async def get_row(self, table: str, match: Dict[str, Any] = None) -> Mapping | None:
"""Get single row for given table where column matches keys/values."""
- async with self.get_db(db) as _db:
- sql_query = f"SELECT * FROM {table} WHERE "
- sql_query += " AND ".join((f"{x} = :{x}" for x in match))
- return await _db.fetch_one(sql_query, match)
+ # async with Db(self.url, timeout=360) as db:
+ sql_query = f"SELECT * FROM {table} WHERE "
+ sql_query += " AND ".join((f"{x} = :{x}" for x in match))
+ return await self._db.fetch_one(sql_query, match)
async def insert(
self,
table: str,
values: Dict[str, Any],
allow_replace: bool = False,
- db: Optional[Db] = None,
) -> Mapping:
"""Insert data in given table."""
- async with self.get_db(db) as _db:
- keys = tuple(values.keys())
- if allow_replace:
- sql_query = f'INSERT OR REPLACE INTO {table}({",".join(keys)})'
- else:
- sql_query = f'INSERT INTO {table}({",".join(keys)})'
- sql_query += f' VALUES ({",".join((f":{x}" for x in keys))})'
- await _db.execute(sql_query, values)
- # return inserted/replaced item
- lookup_vals = {
- key: value
- for key, value in values.items()
- if value is not None and value != ""
- }
- return await self.get_row(table, lookup_vals, db=_db)
+ keys = tuple(values.keys())
+ if allow_replace:
+ sql_query = f'INSERT OR REPLACE INTO {table}({",".join(keys)})'
+ else:
+ sql_query = f'INSERT INTO {table}({",".join(keys)})'
+ sql_query += f' VALUES ({",".join((f":{x}" for x in keys))})'
+ await self.execute(sql_query, values)
+ # return inserted/replaced item
+ lookup_vals = {
+ key: value
+ for key, value in values.items()
+ if value is not None and value != ""
+ }
+ return await self.get_row(table, lookup_vals)
- async def insert_or_replace(
- self, table: str, values: Dict[str, Any], db: Optional[Db] = None
- ) -> Mapping:
+ async def insert_or_replace(self, table: str, values: Dict[str, Any]) -> Mapping:
"""Insert or replace data in given table."""
- return await self.insert(table=table, values=values, allow_replace=True, db=db)
+ return await self.insert(table=table, values=values, allow_replace=True)
async def update(
self,
table: str,
match: Dict[str, Any],
values: Dict[str, Any],
- db: Optional[Db] = None,
) -> Mapping:
"""Update record."""
- async with self.get_db(db) as _db:
- keys = tuple(values.keys())
- sql_query = (
- f'UPDATE {table} SET {",".join((f"{x}=:{x}" for x in keys))} WHERE '
- )
- sql_query += " AND ".join((f"{x} = :{x}" for x in match))
- await _db.execute(sql_query, {**match, **values})
- # return updated item
- return await self.get_row(table, match, db=_db)
+ keys = tuple(values.keys())
+ sql_query = f'UPDATE {table} SET {",".join((f"{x}=:{x}" for x in keys))} WHERE '
+ sql_query += " AND ".join((f"{x} = :{x}" for x in match))
+ await self.execute(sql_query, {**match, **values})
+ # return updated item
+ return await self.get_row(table, match)
- async def delete(
- self, table: str, match: Dict[str, Any], db: Optional[Db] = None
- ) -> None:
+ async def delete(self, table: str, match: Dict[str, Any]) -> None:
"""Delete data in given table."""
- async with self.get_db(db) as _db:
- sql_query = f"DELETE FROM {table}"
- sql_query += " WHERE " + " AND ".join((f"{x} = :{x}" for x in match))
- await _db.execute(sql_query, match)
+ sql_query = f"DELETE FROM {table}"
+ sql_query += " WHERE " + " AND ".join((f"{x} = :{x}" for x in match))
+ await self.execute(sql_query, match)
- async def delete_where_query(
- self, table: str, query: str, db: Optional[Db] = None
- ) -> None:
+ async def delete_where_query(self, table: str, query: str) -> None:
"""Delete data in given table using given where clausule."""
- async with self.get_db(db) as _db:
- sql_query = f"DELETE FROM {table} WHERE {query}"
- await _db.execute(sql_query)
+ sql_query = f"DELETE FROM {table} WHERE {query}"
+ await self.execute(sql_query)
+
+ async def execute(
+ self, query: Union[ClauseElement, str], values: dict = None
+ ) -> Any:
+ """Execute command on the database."""
+ return await self._db.execute(query, values)
async def _migrate(self):
"""Perform database migration actions if needed."""
- async with self.get_db() as db:
- try:
- if prev_version := await self.get_setting("version", db):
- prev_version = int(prev_version["value"])
- else:
- prev_version = 0
- except (KeyError, ValueError):
+ try:
+ if prev_version := await self.get_setting("version"):
+ prev_version = int(prev_version["value"])
+ else:
prev_version = 0
+ except (KeyError, ValueError):
+ prev_version = 0
- if SCHEMA_VERSION != prev_version:
- self.logger.info(
- "Performing database migration from %s to %s",
- prev_version,
- SCHEMA_VERSION,
- )
- # always create db tables if they don't exist to prevent errors trying to access them later
- await self.__create_database_tables(db)
+ if SCHEMA_VERSION != prev_version:
+ self.logger.info(
+ "Performing database migration from %s to %s",
+ prev_version,
+ SCHEMA_VERSION,
+ )
+ # always create db tables if they don't exist to prevent errors trying to access them later
+ await self.__create_database_tables()
- if prev_version < 17:
- # too many changes, just recreate
- await db.execute(f"DROP TABLE IF EXISTS {TABLE_ARTISTS}")
- await db.execute(f"DROP TABLE IF EXISTS {TABLE_ALBUMS}")
- await db.execute(f"DROP TABLE IF EXISTS {TABLE_TRACKS}")
- await db.execute(f"DROP TABLE IF EXISTS {TABLE_PLAYLISTS}")
- await db.execute(f"DROP TABLE IF EXISTS {TABLE_RADIOS}")
- await db.execute(f"DROP TABLE IF EXISTS {TABLE_CACHE}")
- await db.execute(f"DROP TABLE IF EXISTS {TABLE_THUMBS}")
- await db.execute("DROP TABLE IF EXISTS provider_mappings")
- # recreate missing tables
- await self.__create_database_tables(db)
+ if prev_version < 18:
+ # too many changes, just recreate
+ await self.execute(f"DROP TABLE IF EXISTS {TABLE_ARTISTS}")
+ await self.execute(f"DROP TABLE IF EXISTS {TABLE_ALBUMS}")
+ await self.execute(f"DROP TABLE IF EXISTS {TABLE_TRACKS}")
+ await self.execute(f"DROP TABLE IF EXISTS {TABLE_PLAYLISTS}")
+ await self.execute(f"DROP TABLE IF EXISTS {TABLE_RADIOS}")
+ await self.execute(f"DROP TABLE IF EXISTS {TABLE_CACHE}")
+ await self.execute(f"DROP TABLE IF EXISTS {TABLE_THUMBS}")
+ await self.execute("DROP TABLE IF EXISTS provider_mappings")
+ # recreate missing tables
+ await self.__create_database_tables()
- # store current schema version
- await self.set_setting("version", str(SCHEMA_VERSION), db=db)
+ # store current schema version
+ await self.set_setting("version", str(SCHEMA_VERSION))
- @staticmethod
- async def __create_database_tables(db: Db) -> None:
+ async def __create_database_tables(self) -> None:
"""Init database tables."""
# TODO: create indexes, especially for the json columns
- await db.execute(
+ await self.execute(
f"""CREATE TABLE IF NOT EXISTS {TABLE_TRACK_LOUDNESS}(
item_id INTEGER NOT NULL,
provider TEXT NOT NULL,
loudness REAL,
UNIQUE(item_id, provider));"""
)
- await db.execute(
+ await self.execute(
f"""CREATE TABLE IF NOT EXISTS {TABLE_PLAYLOG}(
item_id INTEGER NOT NULL,
provider TEXT NOT NULL,
- timestamp REAL,
+ timestamp INTEGER DEFAULT 0,
UNIQUE(item_id, provider));"""
)
- await db.execute(
+ await self.execute(
f"""CREATE TABLE IF NOT EXISTS {TABLE_ALBUMS}(
item_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
musicbrainz_id TEXT,
artists json,
metadata json,
- provider_ids json
+ provider_ids json,
+ timestamp INTEGER DEFAULT 0
);"""
)
- await db.execute(
+ await self.execute(
f"""CREATE TABLE IF NOT EXISTS {TABLE_ARTISTS}(
item_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
musicbrainz_id TEXT,
in_library BOOLEAN DEFAULT 0,
metadata json,
- provider_ids json
+ provider_ids json,
+ timestamp INTEGER DEFAULT 0
);"""
)
- await db.execute(
+ await self.execute(
f"""CREATE TABLE IF NOT EXISTS {TABLE_TRACKS}(
item_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
artists json,
albums json,
metadata json,
- provider_ids json
+ provider_ids json,
+ timestamp INTEGER DEFAULT 0
);"""
)
- await db.execute(
+ await self.execute(
f"""CREATE TABLE IF NOT EXISTS {TABLE_PLAYLISTS}(
item_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
in_library BOOLEAN DEFAULT 0,
metadata json,
provider_ids json,
+ timestamp INTEGER DEFAULT 0,
UNIQUE(name, owner)
);"""
)
- await db.execute(
+ await self.execute(
f"""CREATE TABLE IF NOT EXISTS {TABLE_RADIOS}(
item_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
sort_name TEXT NOT NULL,
in_library BOOLEAN DEFAULT 0,
metadata json,
- provider_ids json
+ provider_ids json,
+ timestamp INTEGER DEFAULT 0
);"""
)
- await db.execute(
+ await self.execute(
f"""CREATE TABLE IF NOT EXISTS {TABLE_CACHE}(
key TEXT UNIQUE NOT NULL, expires INTEGER NOT NULL, data TEXT, checksum TEXT NULL)"""
)
- await db.execute(
+ await self.execute(
f"""CREATE TABLE IF NOT EXISTS {TABLE_THUMBS}(
id INTEGER PRIMARY KEY AUTOINCREMENT,
path TEXT NOT NULL,
UNIQUE(path, size));"""
)
# create indexes
- await db.execute(
+ await self.execute(
"CREATE INDEX IF NOT EXISTS artists_in_library_idx on artists(in_library);"
)
- await db.execute(
+ await self.execute(
"CREATE INDEX IF NOT EXISTS albums_in_library_idx on albums(in_library);"
)
- await db.execute(
+ await self.execute(
"CREATE INDEX IF NOT EXISTS tracks_in_library_idx on tracks(in_library);"
)
- await db.execute(
+ await self.execute(
"CREATE INDEX IF NOT EXISTS playlists_in_library_idx on playlists(in_library);"
)
- await db.execute(
+ await self.execute(
"CREATE INDEX IF NOT EXISTS radios_in_library_idx on radios(in_library);"
)
- await db.execute(
+ await self.execute(
"CREATE INDEX IF NOT EXISTS artists_sort_name_idx on artists(sort_name);"
)
- await db.execute(
+ await self.execute(
"CREATE INDEX IF NOT EXISTS albums_sort_name_idx on albums(sort_name);"
)
- await db.execute(
+ await self.execute(
"CREATE INDEX IF NOT EXISTS tracks_sort_name_idx on tracks(sort_name);"
)
- await db.execute(
+ await self.execute(
"CREATE INDEX IF NOT EXISTS playlists_sort_name_idx on playlists(sort_name);"
)
- await db.execute(
+ await self.execute(
"CREATE INDEX IF NOT EXISTS radios_sort_name_idx on radios(sort_name);"
)
- await db.execute(
+ await self.execute(
"CREATE INDEX IF NOT EXISTS artists_musicbrainz_id_idx on artists(musicbrainz_id);"
)
- await db.execute(
+ await self.execute(
"CREATE INDEX IF NOT EXISTS albums_musicbrainz_id_idx on albums(musicbrainz_id);"
)
- await db.execute(
+ await self.execute(
"CREATE INDEX IF NOT EXISTS tracks_musicbrainz_id_idx on tracks(musicbrainz_id);"
)
- await db.execute("CREATE INDEX IF NOT EXISTS tracks_isrc_idx on tracks(isrc);")
- await db.execute("CREATE INDEX IF NOT EXISTS albums_upc_idx on albums(upc);")
+ await self.execute(
+ "CREATE INDEX IF NOT EXISTS tracks_isrc_idx on tracks(isrc);"
+ )
+ await self.execute("CREATE INDEX IF NOT EXISTS albums_upc_idx on albums(upc);")
from PIL import Image
+from music_assistant.helpers.tags import get_embedded_image
+
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
if not await prov.exists(path):
continue
# embedded image in music file
- img_data = await prov.get_embedded_image(path)
+ img_data = await get_embedded_image(path)
# regular image file on disk
if not img_data:
async with prov.open_file(path) as _file:
if self._proc.stdin.can_write_eof():
self._proc.stdin.write_eof()
- async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
+ async def communicate(
+ self, input_data: Optional[bytes] = None
+ ) -> Tuple[bytes, bytes]:
"""Write bytes to process and read back results."""
return await self._proc.communicate(input_data)
--- /dev/null
+"""Helpers/utilities to parse ID3 tags from audio files with ffmpeg."""
+from __future__ import annotations
+
+import json
+import os
+from dataclasses import dataclass
+from typing import Any, Dict, Tuple
+
+from music_assistant.helpers.process import AsyncProcess
+
+FALLBACK_ARTIST = "Various Artists"
+
+SPLITTERS = (";", ",", "Featuring", " Feat. ", " Feat ", "feat.", " & ", "/ ")
+
+
+def split_items(org_str: str) -> Tuple[str]:
+ """Split up a tags string by common splitter."""
+ if isinstance(org_str, list):
+ return org_str
+ if not org_str:
+ return tuple()
+ for splitter in SPLITTERS:
+ if splitter in org_str:
+ return tuple((x.strip() for x in org_str.split(splitter)))
+ return (org_str,)
+
+
+@dataclass
+class AudioTags:
+ """Audio metadata parsed from an audio file."""
+
+ raw: Dict[str, Any]
+ sample_rate: int
+ channels: int
+ bits_per_sample: int
+ format: str
+ bit_rate: int
+ duration: float
+ tags: Dict[str, str]
+ has_cover_image: bool
+ filename: str
+
+ @property
+ def artist(self) -> str:
+ """Return artist tag (as-is)."""
+ if tag := self.tags.get("artist"):
+ return tag
+ # fallback to parsing from filename
+ title = self.filename.rsplit(os.sep, 1)[-1].split(".")[0]
+ title_parts = title.split(" - ")
+ if len(title_parts) >= 2:
+ return title_parts[0].strip()
+ return FALLBACK_ARTIST
+
+ @property
+ def title(self) -> str:
+ """Return title tag (as-is)."""
+ if tag := self.tags.get("title"):
+ return tag
+ # fallback to parsing from filename
+ title = self.filename.rsplit(os.sep, 1)[-1].split(".")[0]
+ title_parts = title.split(" - ")
+ if len(title_parts) >= 2:
+ return title_parts[1].strip()
+ return title
+
+ @property
+ def album(self) -> str:
+ """Return album tag (as-is) if present."""
+ return self.tags.get("album")
+
+ @property
+ def artists(self) -> Tuple[str]:
+ """Return track artists."""
+ return split_items(self.artist)
+
+ @property
+ def album_artists(self) -> Tuple[str]:
+ """Return (all) album artists (if any)."""
+ return split_items(self.tags.get("albumartist"))
+
+ @property
+ def genres(self) -> Tuple[str]:
+ """Return (all) genres, if any."""
+ return split_items(self.tags.get("genre", ""))
+
+ @property
+ def disc(self) -> int | None:
+ """Return disc tag if present."""
+ if tag := self.tags.get("disc"):
+ return int(tag.split("/")[0])
+ return None
+
+ @property
+ def track(self) -> int | None:
+ """Return track tag if present."""
+ if tag := self.tags.get("track"):
+ return int(tag.split("/")[0])
+ return None
+
+ @property
+ def year(self) -> int | None:
+ """Return album's year if present, parsed from date."""
+ if tag := self.tags.get("originalyear"):
+ return int(tag)
+ if tag := self.tags.get("otiginaldate"):
+ return int(tag.split("-")[0])
+ if tag := self.tags.get("date"):
+ return int(tag.split("-")[0])
+ return None
+
+ @property
+ def musicbrainz_artistids(self) -> Tuple[str]:
+ """Return musicbrainz_artistid tag(s) if present."""
+ return split_items(self.tags.get("musicbrainzartistid"))
+
+ @property
+ def musicbrainz_albumartistids(self) -> Tuple[str]:
+ """Return musicbrainz_albumartistid tag if present."""
+ return split_items(self.tags.get("musicbrainzalbumartistid"))
+
+ @property
+ def musicbrainz_releasegroupid(self) -> str | None:
+ """Return musicbrainz_releasegroupid tag if present."""
+ return self.tags.get("musicbrainzreleasegroupid")
+
+ @property
+ def musicbrainz_trackid(self) -> str | None:
+ """Return musicbrainz_trackid tag if present."""
+ if tag := self.tags.get("musicbrainztrackid"):
+ return tag
+ return self.tags.get("musicbrainzreleasetrackid")
+
+ @property
+ def album_type(self) -> str | None:
+ """Return albumtype tag if present."""
+ if tag := self.tags.get("musicbrainzalbumtype"):
+ return tag
+ return self.tags.get("releasetype")
+
+ @classmethod
+ def parse(cls, raw: dict) -> "AudioTags":
+ """Parse instance from raw ffmpeg info output."""
+ audio_stream = next(x for x in raw["streams"] if x["codec_type"] == "audio")
+ has_cover_image = any(
+ x for x in raw["streams"] if x["codec_name"] in ("mjpeg", "png")
+ )
+ # convert all tag-keys to lowercase without spaces
+ tags = {
+ key.lower().replace(" ", "").replace("_", ""): value
+ for key, value in raw["format"]["tags"].items()
+ }
+
+ return AudioTags(
+ raw=raw,
+ sample_rate=int(audio_stream["sample_rate"]),
+ channels=audio_stream["channels"],
+ bits_per_sample=int(
+ audio_stream.get(
+ "bits_per_raw_sample", audio_stream.get("bits_per_sample")
+ )
+ )
+ or 16,
+ format=raw["format"]["format_name"],
+ bit_rate=int(raw["format"]["bit_rate"]),
+ duration=float(raw["format"]["duration"]),
+ tags=tags,
+ has_cover_image=has_cover_image,
+ filename=raw["format"]["filename"],
+ )
+
+ def get(self, key: str, default=None) -> Any:
+ """Get tag by key."""
+ return self.tags.get(key, default)
+
+
+async def parse_tags(file_path: str) -> AudioTags:
+ """Parse tags from a media file."""
+
+ args = (
+ "ffprobe",
+ "-hide_banner",
+ "-loglevel",
+ "fatal",
+ "-show_error",
+ "-show_format",
+ "-show_streams",
+ "-print_format",
+ "json",
+ "-i",
+ file_path,
+ )
+
+ async with AsyncProcess(
+ args, enable_stdin=False, enable_stdout=True, enable_stderr=False
+ ) as proc:
+
+ res, _ = await proc.communicate()
+ return AudioTags.parse(json.loads(res))
+
+
+async def get_embedded_image(file_path: str) -> bytes | None:
+ """Return embedded image data."""
+ args = ("ffmpeg", "-i", file_path, "-map", "0:v", "-c", "copy", "-f", "mjpeg", "-")
+
+ async with AsyncProcess(
+ args, enable_stdin=False, enable_stdout=True, enable_stderr=False
+ ) as proc:
+
+ res, _ = await proc.communicate()
+ return res
return possible_bool in ["true", "True", "1", "on", "ON", 1]
-def create_clean_string(input_str: str) -> str:
+def create_safe_string(input_str: str) -> str:
"""Return clean lowered string for compare actions."""
input_str = input_str.lower().strip()
+ unaccented_string = unidecode.unidecode(input_str)
+ return re.sub(r"[^a-zA-Z0-9]", "", unaccented_string)
+
+
+def create_sort_name(input_str: str) -> str:
+ """Create sort name/title from string."""
+ input_str = input_str.lower().strip()
for item in ["the ", "de ", "les "]:
if input_str.startswith(item):
input_str = input_str.replace(item, "")
- unaccented_string = unidecode.unidecode(input_str)
- return re.sub(r"[^a-zA-Z0-9]", "", unaccented_string)
+ return input_str.strip()
def parse_title_and_version(title: str, track_version: str = None):
for task in self._tracked_tasks:
task.cancel()
self.signal_event(MassEvent(EventType.SHUTDOWN))
+ await self.database.close()
self.closed = True
if self.http_session and not self.http_session_provided:
await self.http_session.close()
LOSSY_MP3 = 1
LOSSY_OGG = 2
LOSSY_AAC = 3
- FLAC_LOSSLESS = 10 # 44.1/48khz 16 bits
- FLAC_LOSSLESS_HI_RES_1 = 20 # 44.1/48khz 24 bits HI-RES
- FLAC_LOSSLESS_HI_RES_2 = 21 # 88.2/96khz 24 bits HI-RES
- FLAC_LOSSLESS_HI_RES_3 = 22 # 176/192khz 24 bits HI-RES
- FLAC_LOSSLESS_HI_RES_4 = 23 # above 192khz 24 bits HI-RES
+ LOSSY_M4A = 4
+ LOSSLESS = 10 # 44.1/48khz 16 bits
+ LOSSLESS_HI_RES_1 = 20 # 44.1/48khz 24 bits HI-RES
+ LOSSLESS_HI_RES_2 = 21 # 88.2/96khz 24 bits HI-RES
+ LOSSLESS_HI_RES_3 = 22 # 176/192khz 24 bits HI-RES
+ LOSSLESS_HI_RES_4 = 23 # above 192khz 24 bits HI-RES
class LinkType(Enum):
AIFF = "aiff"
WMA = "wma"
M4A = "m4a"
+ DSF = "dsf"
PCM_S16LE = "s16le" # PCM signed 16-bit little-endian
PCM_S24LE = "s24le" # PCM signed 24-bit little-endian
PCM_S32LE = "s32le" # PCM signed 32-bit little-endian
@classmethod
def try_parse(cls: "ContentType", string: str) -> "ContentType":
- """Try to parse ContentType from (url)string."""
+ """Try to parse ContentType from (url)string/extension."""
tempstr = string.lower()
if "." in tempstr:
tempstr = tempstr.split(".")[-1]
+ if "," in tempstr:
+ for val in tempstr.split(","):
+ try:
+ return cls(val.strip())
+ except ValueError:
+ pass
+
tempstr = tempstr.split("?")[0]
tempstr = tempstr.split("&")[0]
try:
except ValueError:
return cls.UNKNOWN
- def is_pcm(self):
+ def is_pcm(self) -> bool:
"""Return if contentype is PCM."""
return self.name.startswith("PCM")
+ def is_lossless(self) -> bool:
+ """Return if format is lossless."""
+ return self.is_pcm() or self in (
+ ContentType.DSF,
+ ContentType.FLAC,
+ ContentType.AIFF,
+ ContentType.WAV,
+ )
+
@classmethod
def from_bit_depth(
cls, bit_depth: int, floating_point: bool = False
from abc import ABCMeta, abstractmethod
from time import time
-from typing import (
- TYPE_CHECKING,
- AsyncGenerator,
- Generic,
- List,
- Optional,
- Tuple,
- TypeVar,
-)
-
-from databases import Database as Db
-
-from music_assistant.models.errors import MediaNotFoundError, ProviderUnavailableError
+from typing import TYPE_CHECKING, Generic, List, Optional, Tuple, TypeVar
+
+from music_assistant.models.errors import MediaNotFoundError
from music_assistant.models.event import MassEvent
from .enums import EventType, MediaType, ProviderType
self.logger = mass.logger.getChild(f"music.{self.media_type.value}")
@abstractmethod
- async def add(self, item: ItemCls) -> ItemCls:
+ async def add(self, item: ItemCls, overwrite_existing: bool = False) -> ItemCls:
"""Add item to local db and return the database item."""
raise NotImplementedError
@abstractmethod
async def add_db_item(
- self, item: ItemCls, overwrite_existing: bool = False, db: Optional[Db] = None
+ self, item: ItemCls, overwrite_existing: bool = False
) -> ItemCls:
"""Add a new record for this mediatype to the database."""
raise NotImplementedError
item_id: int,
item: ItemCls,
overwrite: bool = False,
- db: Optional[Db] = None,
) -> ItemCls:
"""Update record in the database, merging data."""
raise NotImplementedError
async def library(self, limit: int = 500, offset: int = 0) -> List[ItemCls]:
"""Get all in-library items."""
match = {"in_library": True}
- return [
- self.item_cls.from_db_row(db_row)
- for db_row in await self.mass.database.get_rows(
- self.db_table, match, order_by="name", limit=limit, offset=offset
- )
- ]
+ return await self.get_db_items(match=match, limit=limit, offset=offset)
- async def count(self) -> int:
+ async def count(self, in_library: bool = False) -> int:
"""Return number of in-library items for this MediaType."""
- return await self.mass.database.get_count(self.db_table, {"in_library": 1})
+ return await self.mass.database.get_count(
+ self.db_table, {"in_library": in_library}
+ )
async def get(
self,
force_refresh: bool = False,
lazy: bool = True,
details: ItemCls = None,
+ overwrite_existing: bool = None,
) -> ItemCls:
"""Return (full) details for a single media item."""
assert provider or provider_id, "provider or provider_id must be supplied"
provider=provider,
provider_id=provider_id,
)
+ if overwrite_existing is None:
+ overwrite_existing = force_refresh
if db_item and (time() - db_item.last_refresh) > REFRESH_INTERVAL:
# it's been too long since the full metadata was last retrieved (or never at all)
force_refresh = True
# in 99% of the cases we just return lazy because we want the details as fast as possible
# only if we really need to wait for the result (e.g. to prevent race conditions), we
# can set lazy to false and we await to job to complete.
- add_job = self.mass.add_job(self.add(details), f"Add {details.uri} to database")
+ add_job = self.mass.add_job(
+ self.add(details, overwrite_existing=overwrite_existing),
+ f"Add {details.uri} to database",
+ )
if not lazy:
await add_job.wait()
return add_job.result
)
)
- async def get_provider_id(
- self, item: ItemCls, db: Optional[Db] = None
- ) -> Tuple[str, str]:
+ async def get_provider_id(self, item: ItemCls) -> Tuple[str, str]:
"""Return provider and item id."""
if item.provider == ProviderType.DATABASE:
# make sure we have a full object
- item = await self.get_db_item(item.item_id, db=db)
+ item = await self.get_db_item(item.item_id)
for prov in item.provider_ids:
# returns the first provider that is available
if not prov.available:
self,
query: Optional[str] = None,
query_params: Optional[dict] = None,
+ match: Optional[dict] = None,
limit: int = 500,
offset: int = 0,
- db: Optional[Db] = None,
) -> List[ItemCls]:
"""Fetch all records from database."""
+ assert not (query and match), "query and match are mutually exclusive"
if query is not None:
func = self.mass.database.get_rows_from_query(
- query, query_params, limit=limit, offset=offset, db=db
+ query, query_params, limit=limit, offset=offset
)
else:
func = self.mass.database.get_rows(
- self.db_table, limit=limit, offset=offset, db=db
+ self.db_table, match, limit=limit, offset=offset
)
return [self.item_cls.from_db_row(db_row) for db_row in await func]
- async def get_db_item(self, item_id: int, db: Optional[Db] = None) -> ItemCls:
+ async def get_db_item(self, item_id: int) -> ItemCls:
"""Get record by id."""
match = {"item_id": int(item_id)}
- if db_row := await self.mass.database.get_row(self.db_table, match, db=db):
+ if db_row := await self.mass.database.get_row(self.db_table, match):
return self.item_cls.from_db_row(db_row)
return None
provider_item_id: str,
provider: Optional[ProviderType] = None,
provider_id: Optional[str] = None,
- db: Optional[Db] = None,
) -> ItemCls | None:
"""Get the database item for the given prov_id."""
assert provider or provider_id, "provider or provider_id must be supplied"
if isinstance(provider, str):
provider = ProviderType(provider)
if provider == ProviderType.DATABASE or provider_id == "database":
- return await self.get_db_item(provider_item_id, db=db)
+ return await self.get_db_item(provider_item_id)
for item in await self.get_db_items_by_prov_id(
provider=provider,
provider_id=provider_id,
provider_item_ids=(provider_item_id,),
- db=db,
):
return item
return None
provider_item_ids: Optional[Tuple[str]] = None,
limit: int = 500,
offset: int = 0,
- db: Optional[Db] = None,
) -> List[ItemCls]:
"""Fetch all records from database for given provider."""
assert provider or provider_id, "provider or provider_id must be supplied"
if isinstance(provider, str):
provider = ProviderType(provider)
if provider == ProviderType.DATABASE or provider_id == "database":
- return await self.get_db_items(limit=limit, offset=offset, db=db)
+ return await self.get_db_items(limit=limit, offset=offset)
query = f"SELECT * FROM {self.db_table}, json_each(provider_ids)"
if provider_id is not None:
prov_ids = prov_ids.replace(",)", ")")
query += f" AND json_extract(json_each.value, '$.item_id') in {prov_ids}"
- return await self.get_db_items(query, limit=limit, offset=offset, db=db)
+ return await self.get_db_items(query, limit=limit, offset=offset)
- async def iterate_db_items(
- self,
- db: Optional[Db] = None,
- ) -> AsyncGenerator[ItemCls, None]:
- """Iterate all records from database."""
- async for db_row in self.mass.database.iterate_rows(self.db_table, db=db):
- yield self.item_cls.from_db_row(db_row)
-
- async def set_db_library(
- self, item_id: int, in_library: bool, db: Optional[Db] = None
- ) -> None:
+ async def set_db_library(self, item_id: int, in_library: bool) -> None:
"""Set the in-library bool on a database item."""
match = {"item_id": item_id}
await self.mass.database.update(
- self.db_table, match, {"in_library": in_library}, db=db
+ self.db_table, match, {"in_library": in_library}
)
async def get_provider_item(
item = await self.get_db_item(item_id)
else:
provider = self.mass.music.get_provider(provider_id)
- if not provider:
- raise ProviderUnavailableError(
- f"Provider {provider_id} is not available!"
- )
item = await provider.get_item(self.media_type, item_id)
if not item:
raise MediaNotFoundError(
)
return item
- async def remove_prov_mapping(
- self, item_id: int, prov_id: str, db: Optional[Db] = None
- ) -> None:
+ async def remove_prov_mapping(self, item_id: int, prov_id: str) -> None:
"""Remove provider id(s) from item."""
- async with self.mass.database.get_db(db) as db:
- if db_item := await self.get_db_item(item_id, db=db):
- db_item.provider_ids = {
- x for x in db_item.provider_ids if x.prov_id != prov_id
- }
- if not db_item.provider_ids:
- # item has no more provider_ids left, it is completely deleted
- await self.delete_db_item(db_item.item_id)
- return
- await self.update_db_item(
- db_item.item_id, db_item, overwrite=True, db=db
- )
+ if db_item := await self.get_db_item(item_id):
+ db_item.provider_ids = {
+ x for x in db_item.provider_ids if x.prov_id != prov_id
+ }
+ if not db_item.provider_ids:
+ # item has no more provider_ids left, it is completely deleted
+ await self.delete_db_item(db_item.item_id)
+ return
+ await self.update_db_item(db_item.item_id, db_item, overwrite=True)
self.logger.debug("removed provider %s from item id %s", prov_id, item_id)
- async def delete_db_item(self, item_id: int, db: Optional[Db] = None) -> None:
+ async def delete_db_item(self, item_id: int) -> None:
"""Delete record from the database."""
- async with self.mass.database.get_db(db) as db:
-
- # delete item
- await self.mass.database.delete(
- self.db_table,
- {"item_id": int(item_id)},
- db=db,
- )
+ # delete item
+ await self.mass.database.delete(
+ self.db_table,
+ {"item_id": int(item_id)},
+ )
# NOTE: this does not delete any references to this item in other records!
self.logger.debug("deleted item with id %s from database", item_id)
from dataclasses import dataclass, field, fields
from time import time
-from typing import Any, Dict, List, Mapping, Optional, Set, Union
+from typing import Any, Dict, List, Mapping, Optional, Set, Tuple, Union
from mashumaro import DataClassDictMixin
from music_assistant.helpers.json import json
from music_assistant.helpers.uri import create_uri
-from music_assistant.helpers.util import create_clean_string, merge_lists
+from music_assistant.helpers.util import create_sort_name, merge_lists
from music_assistant.models.enums import (
AlbumType,
ContentType,
metadata: MediaItemMetadata = field(default_factory=MediaItemMetadata)
in_library: bool = False
media_type: MediaType = MediaType.UNKNOWN
- # sort_name and uri are auto generated, do not override unless needed
+ # sort_name and uri are auto generated, do not override unless really needed
sort_name: Optional[str] = None
uri: Optional[str] = None
+ # timestamp is used to determine when the item was added to the library
+ timestamp: int = 0
def __post_init__(self):
"""Call after init."""
if not self.uri:
self.uri = create_uri(self.media_type, self.provider, self.item_id)
if not self.sort_name:
- self.sort_name = create_clean_string(self.name)
+ self.sort_name = create_sort_name(self.name)
@classmethod
def from_db_row(cls, db_row: Mapping):
return self.album.image
return None
+ @property
+ def isrcs(self) -> Tuple[str]:
+ """Split multiple values in isrc field."""
+ # sometimes the isrc contains multiple values, splitted by semicolon
+ if not self.isrc:
+ return tuple()
+ return tuple(self.isrc.split(";"))
+
@dataclass
class Playlist(MediaItem):
# this logic is aimed at streaming/online providers,
# which all have more or less the same structure.
# filesystem implementation(s) just override this.
- async with self.mass.database.get_db() as db:
- for media_type in self.supported_mediatypes:
- if media_types is not None and media_type not in media_types:
+ for media_type in self.supported_mediatypes:
+ if media_types is not None and media_type not in media_types:
+ continue
+ self.logger.debug("Start sync of %s items.", media_type.value)
+ controller = self.mass.music.get_controller(media_type)
+
+ # create a set of all previous and current db id's
+ # note we only store the items in the prev_ids list that are
+ # unique to this provider to avoid getting into a mess where
+ # for example an item still exists on disk (in case of file provider)
+ # and no longer favorite on streaming provider.
+ # Bottomline this means that we don't do a full 2 way sync if multiple
+ # providers are attached to the same media item.
+ prev_ids = set()
+ for db_item in await controller.library():
+ prov_types = {x.prov_type for x in db_item.provider_ids}
+ if len(prov_types) > 1:
continue
- self.logger.debug("Start sync of %s items.", media_type.value)
- controller = self.mass.music.get_controller(media_type)
-
- # create a set of all previous and current db id's
- # note we only store the items in the prev_ids list that are
- # unique to this provider to avoid getting into a mess where
- # for example an item still exists on disk (in case of file provider)
- # and no longer favorite on streaming provider.
- # Bottomline this means that we don't do a full 2 way sync if multiple
- # providers are attached to the same media item.
- prev_ids = set()
- for db_item in await controller.library():
- prov_types = {x.prov_type for x in db_item.provider_ids}
- if len(prov_types) > 1:
- continue
- for prov_id in db_item.provider_ids:
- if prov_id.prov_id == self.id:
- prev_ids.add(db_item.item_id)
- cur_ids = set()
- async for prov_item in self._get_library_gen(media_type)():
- prov_item: MediaItemType = prov_item
-
- db_item: MediaItemType = await controller.get_db_item_by_prov_id(
- provider_item_id=prov_item.item_id,
- provider=prov_item.provider,
- db=db,
+ for prov_id in db_item.provider_ids:
+ if prov_id.prov_id == self.id:
+ prev_ids.add(db_item.item_id)
+ cur_ids = set()
+ async for prov_item in self._get_library_gen(media_type)():
+ prov_item: MediaItemType = prov_item
+
+ db_item: MediaItemType = await controller.get_db_item_by_prov_id(
+ provider_item_id=prov_item.item_id,
+ provider=prov_item.provider,
+ )
+ if not db_item:
+ # dump the item in the db, rich metadata is lazy loaded later
+ db_item = await controller.add_db_item(prov_item)
+ elif (
+ db_item.metadata.checksum and prov_item.metadata.checksum
+ ) and db_item.metadata.checksum != prov_item.metadata.checksum:
+ # item checksum changed
+ db_item = await controller.update_db_item(
+ db_item.item_id, prov_item
)
- if not db_item:
- # dump the item in the db, rich metadata is lazy loaded later
- db_item = await controller.add_db_item(prov_item, db=db)
- elif (
- db_item.metadata.checksum and prov_item.metadata.checksum
- ) and db_item.metadata.checksum != prov_item.metadata.checksum:
- # item checksum changed
- db_item = await controller.update_db_item(
- db_item.item_id, prov_item, db=db
- )
- cur_ids.add(db_item.item_id)
- if not db_item.in_library:
- await controller.set_db_library(db_item.item_id, True, db=db)
-
- # process deletions
- for item_id in prev_ids:
- if item_id not in cur_ids:
- # only mark the item as not in library and leave the metadata in db
- await controller.set_db_library(item_id, False, db=db)
+ cur_ids.add(db_item.item_id)
+ if not db_item.in_library:
+ await controller.set_db_library(db_item.item_id, True)
+
+ # process deletions
+ for item_id in prev_ids:
+ if item_id not in cur_ids:
+ # only mark the item as not in library and leave the metadata in db
+ await controller.set_db_library(item_id, False)
# DO NOT OVERRIDE BELOW
async def restore(self) -> None:
"""Restore state from db."""
- async with self.mass.database.get_db() as _db:
- for key, val_type in (
- ("repeat_mode", RepeatMode),
- ("crossfade_mode", CrossFadeMode),
- ("shuffle_enabled", bool),
- ("crossfade_duration", int),
- ("volume_normalization_enabled", bool),
- ("volume_normalization_target", float),
- ("stream_type", ContentType),
- ("sample_rates", tuple),
- ):
- db_key = f"{self._queue.queue_id}_{key}"
- if db_value := await self.mass.database.get_setting(db_key, db=_db):
- value = val_type(db_value["value"])
- setattr(self, f"_{key}", value)
+ for key, val_type in (
+ ("repeat_mode", RepeatMode),
+ ("crossfade_mode", CrossFadeMode),
+ ("shuffle_enabled", bool),
+ ("crossfade_duration", int),
+ ("volume_normalization_enabled", bool),
+ ("volume_normalization_target", float),
+ ("stream_type", ContentType),
+ ("sample_rates", tuple),
+ ):
+ db_key = f"{self._queue.queue_id}_{key}"
+ if db_value := await self.mass.database.get_setting(db_key):
+ value = val_type(db_value["value"])
+ setattr(self, f"_{key}", value)
def _on_update(self, changed_key: Optional[str] = None) -> None:
"""Handle state changed."""
async def save(self, changed_key: Optional[str] = None) -> None:
"""Save state in db."""
- async with self.mass.database.get_db() as _db:
- for key, value in self.to_dict().items():
- if key == changed_key or changed_key is None:
- db_key = f"{self._queue.queue_id}_{key}"
- await self.mass.database.set_setting(db_key, value, db=_db)
+ for key, value in self.to_dict().items():
+ if key == changed_key or changed_key is None:
+ db_key = f"{self._queue.queue_id}_{key}"
+ await self.mass.database.set_setting(db_key, value)
import urllib.parse
from contextlib import asynccontextmanager
from pathlib import Path
+from time import time
from typing import AsyncGenerator, List, Optional, Set, Tuple
import aiofiles
import xmltodict
from aiofiles.os import wrap
from aiofiles.threadpool.binary import AsyncFileIO
-from tinytag.tinytag import TinyTag
from music_assistant.helpers.audio import get_file_stream
from music_assistant.helpers.compare import compare_strings
-from music_assistant.helpers.util import (
- create_clean_string,
- parse_title_and_version,
- try_parse_int,
-)
+from music_assistant.helpers.tags import FALLBACK_ARTIST, parse_tags, split_items
+from music_assistant.helpers.util import create_safe_string, parse_title_and_version
from music_assistant.models.enums import ProviderType
from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant.models.media_items import (
)
from music_assistant.models.music_provider import MusicProvider
-FALLBACK_ARTIST = "Various Artists"
-SPLITTERS = (";", ",", "Featuring", " Feat. ", " Feat ", "feat.", " & ", " / ")
-CONTENT_TYPE_EXT = {
- # map of supported file extensions (mapped to ContentType)
- "mp3": ContentType.MP3,
- "m4a": ContentType.M4A,
- "flac": ContentType.FLAC,
- "wav": ContentType.WAV,
- "ogg": ContentType.OGG,
- "wma": ContentType.WMA,
- "aiff": ContentType.AIFF,
-}
+VALID_EXTENSIONS = ("mp3", "m4a", "mp4", "flac", "wav", "ogg", "aiff", "wma", "dsf")
SCHEMA_VERSION = 17
LOGGER = logging.getLogger(__name__)
yield entry
-def split_items(org_str: str) -> Tuple[str]:
- """Split up a tags string by common splitter."""
- if isinstance(org_str, list):
- return org_str
- if org_str is None:
- return tuple()
- for splitter in SPLITTERS:
- if splitter in org_str:
- return tuple((x.strip() for x in org_str.split(splitter)))
- return (org_str,)
+def get_parentdir(base_path: str, name: str) -> str | None:
+ """Look for folder name in path (to find dedicated artist or album folder)."""
+ parentdir = os.path.dirname(base_path)
+ for _ in range(3):
+ dirname = parentdir.rsplit(os.sep)[-1]
+ if compare_strings(name, dirname):
+ return parentdir
+ parentdir = os.path.dirname(parentdir)
+ return None
class FileSystemProvider(MusicProvider):
# find all music files in the music directory and all subfolders
# we work bottom up, as-in we derive all info from the tracks
cur_checksums = {}
- async with self.mass.database.get_db() as db:
- async for entry in scantree(self.config.path):
- try:
- # mtime is used as file checksum
- stat = await asyncio.get_running_loop().run_in_executor(
- None, entry.stat
- )
- checksum = int(stat.st_mtime)
- cur_checksums[entry.path] = checksum
- if checksum == prev_checksums.get(entry.path):
- continue
-
- if track := await self._parse_track(entry.path):
- # set checksum on track to invalidate any cached listings
- track.metadata.checksum = checksum
- # process album
- if track.album:
- # set checksum on album to invalidate cached albumtracks listings etc
- track.album.metadata.checksum = checksum
- db_album = await self.mass.music.albums.add_db_item(
- track.album, overwrite_existing=True, db=db
- )
- if not db_album.in_library:
- await self.mass.music.albums.set_db_library(
- db_album.item_id, True, db=db
- )
- # process (album)artist
- if track.album.artist:
- # set checksum on albumartist to invalidate cached artisttracks listings etc
- track.album.artist.metadata.checksum = checksum
- db_artist = await self.mass.music.artists.add_db_item(
- track.album.artist, db=db
- )
- if not db_artist.in_library:
- await self.mass.music.artists.set_db_library(
- db_artist.item_id, True, db=db
- )
- # add/update track to db
- db_track = await self.mass.music.tracks.add_db_item(
- track, overwrite_existing=True, db=db
- )
- if not db_track.in_library:
- await self.mass.music.tracks.set_db_library(
- db_track.item_id, True, db=db
- )
- elif playlist := await self._parse_playlist(entry.path):
- # add/update] playlist to db
- playlist.metadata.checksum = checksum
- await self.mass.music.playlists.add_db_item(playlist, db=db)
- except Exception as err: # pylint: disable=broad-except
- # we don't want the whole sync to crash on one file so we catch all exceptions here
- self.logger.exception(
- "Error processing %s - %s", entry.path, str(err)
- )
-
- # save checksums every 50 processed items
- # this allows us to pickup where we leftoff when initial scan gets intterrupted
- if save_checksum_interval == 50:
- await self.mass.cache.set(cache_key, cur_checksums, SCHEMA_VERSION)
- save_checksum_interval = 0
- else:
- save_checksum_interval += 1
+ async for entry in scantree(self.config.path):
+ try:
+ # mtime is used as file checksum
+ stat = await asyncio.get_running_loop().run_in_executor(
+ None, entry.stat
+ )
+ checksum = int(stat.st_mtime)
+ cur_checksums[entry.path] = checksum
+ if checksum == prev_checksums.get(entry.path):
+ continue
+
+ if track := await self._parse_track(entry.path):
+ # add/update track to db
+ await self.mass.music.tracks.add_db_item(track)
+ elif playlist := await self._parse_playlist(entry.path):
+ # add/update] playlist to db
+ playlist.metadata.checksum = checksum
+ await self.mass.music.playlists.add_db_item(playlist)
+ except Exception as err: # pylint: disable=broad-except
+ # we don't want the whole sync to crash on one file so we catch all exceptions here
+ self.logger.exception("Error processing %s - %s", entry.path, str(err))
+
+ # save checksums every 50 processed items
+ # this allows us to pickup where we leftoff when initial scan gets intterrupted
+ if save_checksum_interval == 50:
+ await self.mass.cache.set(cache_key, cur_checksums, SCHEMA_VERSION)
+ save_checksum_interval = 0
+ else:
+ save_checksum_interval += 1
await self.mass.cache.set(cache_key, cur_checksums, SCHEMA_VERSION)
# work out deletions
if not await self.exists(itempath):
raise MediaNotFoundError(f"Track path does not exist: {itempath}")
- def parse_tag():
- return TinyTag.get(itempath)
-
- tags = await self.mass.loop.run_in_executor(None, parse_tag)
- _, ext = Path(itempath).name.rsplit(".", 1)
- content_type = CONTENT_TYPE_EXT.get(ext.lower())
-
+ metadata = await parse_tags(itempath)
stat = await self.mass.loop.run_in_executor(None, os.stat, itempath)
return StreamDetails(
provider=self.type,
item_id=item_id,
- content_type=content_type,
+ content_type=ContentType.try_parse(metadata.format),
media_type=MediaType.TRACK,
- duration=tags.duration,
+ duration=metadata.duration,
size=stat.st_size,
- sample_rate=tags.samplerate or 44100,
- bit_depth=16, # TODO: parse bitdepth
+ sample_rate=metadata.sample_rate,
+ bit_depth=metadata.bits_per_sample,
data=itempath,
)
# skip system files and files without extension
return None
- filename_base, ext = Path(track_path).name.rsplit(".", 1)
- content_type = CONTENT_TYPE_EXT.get(ext.lower())
- if content_type is None:
+ _, ext = track_path.rsplit(".", 1)
+ if ext not in VALID_EXTENSIONS:
# unsupported file extension
return None
track_item_id = self._get_item_id(track_path)
- # parse ID3 tags with TinyTag
- def parse_tags():
- return TinyTag.get(track_path, image=True, ignore_errors=True)
-
- tags = await self.mass.loop.run_in_executor(None, parse_tags)
-
- # prefer title from tags, fallback to filename
- if not tags.title or not tags.artist:
- self.logger.warning(
- "%s is missing ID3 tags, using filename as fallback", track_path
- )
- filename_parts = filename_base.split(" - ", 1)
- if len(filename_parts) == 2:
- tags.artist = tags.artist or filename_parts[0]
- tags.title = tags.title or filename_parts[1]
- else:
- tags.artist = tags.artist or FALLBACK_ARTIST
- tags.title = tags.title or filename_base
+ # parse tags
+ tags = await parse_tags(track_path)
name, version = parse_title_and_version(tags.title)
track = Track(
provider=self.type,
name=name,
version=version,
- # a track on disk is always in library
- in_library=True,
)
# album
- # work out if we have an artist/album/track.ext structure
if tags.album:
- track_parts = track_path.rsplit(os.sep)
- album_folder = None
- artist_folder = None
- parentdir = os.path.dirname(track_path)
- for _ in range(len(track_parts)):
- dirname = parentdir.rsplit(os.sep)[-1]
- if compare_strings(dirname, tags.albumartist):
- artist_folder = parentdir
- if compare_strings(dirname, tags.album):
- album_folder = parentdir
- parentdir = os.path.dirname(parentdir)
-
- # album artist
- if artist_folder:
- album_artists = [
- await self._parse_artist(
- name=tags.albumartist,
- artist_path=artist_folder,
- in_library=True,
+ # work out if we have an album folder
+ album_dir = get_parentdir(track_path, tags.album)
+
+ # album artist(s)
+ if tags.album_artists:
+ album_artists = []
+ for index, album_artist_str in enumerate(tags.album_artists):
+ # work out if we have an artist folder
+ artist_dir = get_parentdir(track_path, album_artist_str)
+ artist = await self._parse_artist(
+ album_artist_str, artist_path=artist_dir
)
- ]
- elif tags.albumartist:
- album_artists = [
- await self._parse_artist(name=item, in_library=True)
- for item in split_items(tags.albumartist)
- ]
-
+ if not artist.musicbrainz_id:
+ try:
+ artist.musicbrainz_id = tags.musicbrainz_artistids[index]
+ except IndexError:
+ pass
+ album_artists.append(artist)
else:
# always fallback to various artists as album artist if user did not tag album artist
# ID3 tag properly because we must have an album artist
- album_artists = [await self._parse_artist(name=FALLBACK_ARTIST)]
self.logger.warning(
"%s is missing ID3 tag [albumartist], using %s as fallback",
track_path,
FALLBACK_ARTIST,
)
+ album_artists = [await self._parse_artist(name=FALLBACK_ARTIST)]
track.album = await self._parse_album(
tags.album,
- album_folder,
+ album_dir,
artists=album_artists,
- in_library=True,
)
else:
self.logger.warning("%s is missing ID3 tag [album]", track_path)
# track artist(s)
- if tags.artist == tags.albumartist and track.album:
- track.artists = track.album.artists
- else:
- # Parse track artist(s) from artist string using common splitters used in ID3 tags
- # NOTE: do not use a '/' or '&' to prevent artists like AC/DC become messed up
- track_artists_str = tags.artist or FALLBACK_ARTIST
- track.artists = [
- await self._parse_artist(item, in_library=False)
- for item in split_items(track_artists_str)
+ for index, track_artist_str in enumerate(tags.artists):
+ # re-use album artist details if possible
+ if track.album:
+ if artist := next(
+ (x for x in track.album.artists if x.name == track_artist_str), None
+ ):
+ track.artists.append(artist)
+ continue
+ artist = await self._parse_artist(track_artist_str)
+ if not artist.musicbrainz_id:
+ try:
+ artist.musicbrainz_id = tags.musicbrainz_artistids[index]
+ except IndexError:
+ pass
+ track.artists.append(artist)
+
+ # cover image - prefer album image, fallback to embedded
+ if track.album and track.album.image:
+ track.album.metadata.images = [
+ MediaItemImage(ImageType.THUMB, track.album.image, True)
]
-
- # Check if track has embedded metadata
- img = await self.mass.loop.run_in_executor(None, tags.get_image)
- if not track.metadata.images and img:
+ elif tags.has_cover_image:
# we do not actually embed the image in the metadata because that would consume too
# much space and bandwidth. Instead we set the filename as value so the image can
# be retrieved later in realtime.
track.metadata.images = [MediaItemImage(ImageType.THUMB, track_path, True)]
- if track.album and not track.album.metadata.images:
+ if track.album:
+ # set embedded cover on album
track.album.metadata.images = track.metadata.images
# parse other info
track.duration = tags.duration
- track.metadata.genres = set(split_items(tags.genre))
- track.disc_number = try_parse_int(tags.disc)
- track.track_number = try_parse_int(tags.track)
- track.isrc = tags.extra.get("isrc", "")
- if "copyright" in tags.extra:
- track.metadata.copyright = tags.extra["copyright"]
- if "lyrics" in tags.extra:
- track.metadata.lyrics = tags.extra["lyrics"]
+ track.metadata.genres = tags.genres
+ track.disc_number = tags.disc
+ track.track_number = tags.track
+ track.isrc = tags.get("isrc")
+ track.metadata.copyright = tags.get("copyright")
+ track.metadata.lyrics = tags.get("lyrics")
+ track.musicbrainz_id = tags.musicbrainz_trackid
+ if track.album:
+ if not track.album.musicbrainz_id:
+ track.album.musicbrainz_id = tags.musicbrainz_releasegroupid
+ if not track.album.year:
+ track.album.year = tags.year
+ if not track.album.upc:
+ track.album.upc = tags.get("barcode")
+ # try to parse albumtype
+ if track.album and track.album.album_type == AlbumType.UNKNOWN:
+ album_type = tags.album_type
+ if album_type and "compilation" in album_type:
+ track.album.album_type = AlbumType.COMPILATION
+ elif album_type and "single" in album_type:
+ track.album.album_type = AlbumType.SINGLE
+ elif album_type and "album" in album_type:
+ track.album.album_type = AlbumType.ALBUM
+ elif track.album.sort_name in track.sort_name:
+ track.album.album_type = AlbumType.SINGLE
+
+ # set checksum to invalidate any cached listings
+ checksum_timestamp = str(int(time()))
+ track.metadata.checksum = checksum_timestamp
+ if track.album:
+ track.album.metadata.checksum = checksum_timestamp
+ for artist in track.album.artists:
+ artist.metadata.checksum = checksum_timestamp
quality_details = ""
- if content_type == ContentType.FLAC:
- # TODO: get bit depth
- quality = MediaQuality.FLAC_LOSSLESS
- if tags.samplerate > 192000:
- quality = MediaQuality.FLAC_LOSSLESS_HI_RES_4
- elif tags.samplerate > 96000:
- quality = MediaQuality.FLAC_LOSSLESS_HI_RES_3
- elif tags.samplerate > 48000:
- quality = MediaQuality.FLAC_LOSSLESS_HI_RES_2
- quality_details = f"{tags.samplerate / 1000} Khz"
- elif track_path.endswith(".ogg"):
+ content_type = ContentType.try_parse(tags.format)
+ quality_details = f"{int(tags.bit_rate / 1000)} kbps"
+ if content_type == ContentType.MP3:
+ quality = MediaQuality.LOSSY_MP3
+ elif content_type == ContentType.OGG:
quality = MediaQuality.LOSSY_OGG
- quality_details = f"{tags.bitrate} kbps"
- elif track_path.endswith(".m4a"):
+ elif content_type == ContentType.AAC:
quality = MediaQuality.LOSSY_AAC
- quality_details = f"{tags.bitrate} kbps"
+ elif content_type == ContentType.M4A:
+ quality = MediaQuality.LOSSY_M4A
+ elif content_type.is_lossless():
+ quality = MediaQuality.LOSSLESS
+ quality_details = f"{tags.sample_rate / 1000} Khz / {tags.bit_rate} bit"
+ if tags.sample_rate > 192000:
+ quality = MediaQuality.LOSSLESS_HI_RES_4
+ elif tags.sample_rate > 96000:
+ quality = MediaQuality.LOSSLESS_HI_RES_3
+ elif tags.sample_rate > 48000:
+ quality = MediaQuality.LOSSLESS_HI_RES_2
+ elif tags.bits_per_sample > 16:
+ quality = MediaQuality.LOSSLESS_HI_RES_1
else:
- quality = MediaQuality.LOSSY_MP3
- quality_details = f"{tags.bitrate} kbps"
+ quality = MediaQuality.UNKNOWN
+
track.add_provider_id(
MediaItemProviderId(
item_id=track_item_id,
self,
name: Optional[str] = None,
artist_path: Optional[str] = None,
- in_library: bool = True,
) -> Artist | None:
"""Lookup metadata in Artist folder."""
assert name or artist_path
provider_ids={
MediaItemProviderId(artist_item_id, self.type, self.id, url=artist_path)
},
- in_library=in_library,
)
if not await self.exists(artist_path):
# return basic object if there is no dedicated artist folder
return artist
- # always mark artist as in-library when it exists as folder on disk
- artist.in_library = True
-
nfo_file = os.path.join(artist_path, "artist.nfo")
if await self.exists(nfo_file):
# found NFO file with metadata
artist.metadata.genres = set(split_items(genre))
# find local images
images = []
- async for _path in scantree(artist_path):
- _filename = _path.path
- ext = _filename.split(".")[-1]
+ for _path in await self.mass.loop.run_in_executor(
+ None, os.scandir, artist_path
+ ):
+ if "." not in _path.path or _path.is_dir():
+ continue
+ filename, ext = _path.path.rsplit(os.sep, 1)[-1].split(".", 1)
if ext not in ("jpg", "png"):
continue
- _filepath = os.path.join(artist_path, _filename)
- for img_type in ImageType:
- if img_type.value in _filepath:
- images.append(MediaItemImage(img_type, _filepath, True))
- elif _filename == "folder.jpg":
- images.append(MediaItemImage(ImageType.THUMB, _filepath, True))
+ try:
+ images.append(MediaItemImage(ImageType(filename), _path.path, True))
+ except ValueError:
+ if "folder" in filename:
+ images.append(MediaItemImage(ImageType.THUMB, _path.path, True))
+ elif "Artist" in filename:
+ images.append(MediaItemImage(ImageType.THUMB, _path.path, True))
if images:
artist.metadata.images = images
return artist
async def _parse_album(
- self,
- name: Optional[str],
- album_path: Optional[str],
- artists: List[Artist],
- in_library: bool = True,
+ self, name: Optional[str], album_path: Optional[str], artists: List[Artist]
) -> Album | None:
"""Lookup metadata in Album folder."""
assert (name or album_path) and artists
provider_ids={
MediaItemProviderId(album_item_id, self.type, self.id, url=album_path)
},
- in_library=in_library,
)
if not await self.exists(album_path):
# return basic object if there is no dedicated album folder
return album
- # always mark as in-library when it exists as folder on disk
- album.in_library = True
-
nfo_file = os.path.join(album_path, "album.nfo")
if await self.exists(nfo_file):
# found NFO file with metadata
# parse name/version
album.name, album.version = parse_title_and_version(album.name)
- # try to guess the album type
- album_tracks = [
- x async for x in scantree(album_path) if TinyTag.is_supported(x.path)
- ]
- if album.artist.sort_name == "variousartists":
- album.album_type = AlbumType.COMPILATION
- elif len(album_tracks) <= 5:
- album.album_type = AlbumType.SINGLE
- else:
- album.album_type = AlbumType.ALBUM
-
# find local images
images = []
async for _path in scantree(album_path):
- _filename = _path.path
- ext = _filename.split(".")[-1]
+ if "." not in _path.path or _path.is_dir():
+ continue
+ filename, ext = _path.path.rsplit(os.sep, 1)[-1].split(".", 1)
if ext not in ("jpg", "png"):
continue
- _filepath = os.path.join(album_path, _filename)
- for img_type in ImageType:
- if img_type.value in _filepath:
- images.append(MediaItemImage(img_type, _filepath, True))
- elif "folder." in _filepath:
- images.append(MediaItemImage(ImageType.THUMB, _filepath, True))
+ try:
+ images.append(MediaItemImage(ImageType(filename), _path.path, True))
+ except ValueError:
+ if "folder" in filename:
+ images.append(MediaItemImage(ImageType.THUMB, _path.path, True))
+ elif "AlbumArt" in filename:
+ images.append(MediaItemImage(ImageType.THUMB, _path.path, True))
if images:
album.metadata.images = images
playlist = Playlist(playlist_item_id, provider=self.type, name=name)
playlist.is_editable = True
+ # playlist is always in-library
playlist.in_library = True
playlist.add_provider_id(
MediaItemProviderId(
async with aiofiles.open(file_path, mode) as _file:
yield _file
- async def get_embedded_image(self, file_path) -> bytes | None:
- """Return embedded image data."""
- if not TinyTag.is_supported(file_path):
- return None
-
- # embedded image in music file
- def _get_data():
- tags = TinyTag.get(file_path, image=True)
- return tags.get_image()
-
- return await self.mass.loop.run_in_executor(None, _get_data)
-
async def get_filepath(
self, media_type: MediaType, prov_item_id: str
) -> str | None:
def _get_item_id(self, file_path: str) -> str:
"""Create item id from filename."""
- return create_clean_string(file_path.replace(self.config.path, ""))
+ return create_safe_string(file_path.replace(self.config.path, ""))
item_id=str(album_obj["id"]), provider=self.type, name=name, version=version
)
if album_obj["maximum_sampling_rate"] > 192:
- quality = MediaQuality.FLAC_LOSSLESS_HI_RES_4
+ quality = MediaQuality.LOSSLESS_HI_RES_4
elif album_obj["maximum_sampling_rate"] > 96:
- quality = MediaQuality.FLAC_LOSSLESS_HI_RES_3
+ quality = MediaQuality.LOSSLESS_HI_RES_3
elif album_obj["maximum_sampling_rate"] > 48:
- quality = MediaQuality.FLAC_LOSSLESS_HI_RES_2
+ quality = MediaQuality.LOSSLESS_HI_RES_2
elif album_obj["maximum_bit_depth"] > 16:
- quality = MediaQuality.FLAC_LOSSLESS_HI_RES_1
+ quality = MediaQuality.LOSSLESS_HI_RES_1
elif album_obj.get("format_id", 0) == 5:
quality = MediaQuality.LOSSY_AAC
else:
- quality = MediaQuality.FLAC_LOSSLESS
+ quality = MediaQuality.LOSSLESS
album.add_provider_id(
MediaItemProviderId(
item_id=str(album_obj["id"]),
track.metadata.images = [MediaItemImage(ImageType.THUMB, img)]
# get track quality
if track_obj["maximum_sampling_rate"] > 192:
- quality = MediaQuality.FLAC_LOSSLESS_HI_RES_4
+ quality = MediaQuality.LOSSLESS_HI_RES_4
elif track_obj["maximum_sampling_rate"] > 96:
- quality = MediaQuality.FLAC_LOSSLESS_HI_RES_3
+ quality = MediaQuality.LOSSLESS_HI_RES_3
elif track_obj["maximum_sampling_rate"] > 48:
- quality = MediaQuality.FLAC_LOSSLESS_HI_RES_2
+ quality = MediaQuality.LOSSLESS_HI_RES_2
elif track_obj["maximum_bit_depth"] > 16:
- quality = MediaQuality.FLAC_LOSSLESS_HI_RES_1
+ quality = MediaQuality.LOSSLESS_HI_RES_1
elif track_obj.get("format_id", 0) == 5:
quality = MediaQuality.LOSSY_AAC
else:
- quality = MediaQuality.FLAC_LOSSLESS
+ quality = MediaQuality.LOSSLESS
track.add_provider_id(
MediaItemProviderId(
item_id=str(track_obj["id"]),
from asyncio_throttle import Throttler
from music_assistant.helpers.audio import get_radio_stream
-from music_assistant.helpers.util import create_clean_string
+from music_assistant.helpers.util import create_sort_name
from music_assistant.models.enums import ProviderType
from music_assistant.models.errors import LoginFailed, MediaNotFoundError
from music_assistant.models.media_items import (
radio.sort_name = f'{folder}-{details["preset_number"]}'
elif preset_number:
radio.sort_name = details["preset_number"]
- radio.sort_name += create_clean_string(name)
+ radio.sort_name += create_sort_name(name)
if "text" in details:
radio.metadata.description = details["text"]
# images
pillow>=8.0,<=9.2.0
unidecode>=1.0,<=1.3.4
mashumaro>=3.0,<=3.1
-tinytag>=1.6,<=1.8.1
xmltodict>=0.12.0,<=0.13.0