import os
from music_assistant.mass import MusicAssistant
-from music_assistant.models.config import MassConfig
+from music_assistant.models.config import MassConfig, MusicProviderConfig
+from music_assistant.models.enums import ProviderType
from music_assistant.models.player import Player, PlayerState
from music_assistant.models.player_queue import RepeatMode
required=False,
help="Directory on disk for local music library",
)
-parser.add_argument(
- "--playlistdir",
- required=False,
- help="Directory on disk for local (m3u) playlists",
-)
parser.add_argument(
"--debug",
action="store_true",
mass_conf = MassConfig(
database_url=f"sqlite:///{db_file}",
- spotify_enabled=args.spotify_username and args.spotify_password,
- spotify_username=args.spotify_username,
- spotify_password=args.spotify_password,
- qobuz_enabled=args.qobuz_username and args.qobuz_password,
- qobuz_username=args.qobuz_username,
- qobuz_password=args.qobuz_password,
- tunein_enabled=args.tunein_username is not None,
- tunein_username=args.tunein_username,
- filesystem_enabled=args.musicdir is not None,
- filesystem_music_dir=args.musicdir,
- filesystem_playlists_dir=args.playlistdir,
)
+if args.spotify_username and args.spotify_password:
+ mass_conf.providers.append(
+ MusicProviderConfig(
+ ProviderType.SPOTIFY,
+ username=args.spotify_username,
+ password=args.spotify_password,
+ )
+ )
+if args.qobuz_username and args.qobuz_password:
+ mass_conf.providers.append(
+ MusicProviderConfig(
+ type=ProviderType.QOBUZ,
+ username=args.qobuz_username,
+ password=args.qobuz_password,
+ )
+ )
+if args.tunein_username:
+ mass_conf.providers.append(
+ MusicProviderConfig(
+ type=ProviderType.TUNEIN,
+ username=args.tunein_username,
+ )
+ )
+if args.musicdir:
+ mass_conf.providers.append(
+ MusicProviderConfig(type=ProviderType.FILESYSTEM_LOCAL, path=args.musicdir)
+ )
class TestPlayer(Player):
async def play_url(self, url: str) -> None:
"""Play the specified url on the player."""
- print("play uri called: {url}")
+ print(f"stream url: {url}")
self._attr_current_url = url
self.update_state()
test_player2 = TestPlayer("test2")
await mass.players.register_player(test_player1)
await mass.players.register_player(test_player2)
+ # get full artist details
+ await mass.music.artists.get("6", ProviderType.DATABASE)
+ await mass.music.artists.albums("6", ProviderType.DATABASE)
+ await mass.music.artists.toptracks("6", ProviderType.DATABASE)
+
# try to play some playlist
test_player1.active_queue.settings.shuffle_enabled = True
test_player1.active_queue.settings.repeat_mode = RepeatMode.ALL
from music_assistant.mass import MusicAssistant
-from music_assistant.models.config import MassConfig
+from music_assistant.models.config import MassConfig, MusicProviderConfig
+from music_assistant.models.enums import ProviderType
parser = argparse.ArgumentParser(description="MusicAssistant")
parser.add_argument(
mass = MusicAssistant(
MassConfig(
database_url=MassConfig,
- spotify_enabled=True,
- spotify_username=args.username,
- spotify_password=args.password,
+ providers=[
+ MusicProviderConfig(
+ ProviderType.SPOTIFY,
+ username=args.spotify_username,
+ password=args.spotify_password,
+ )
+ ],
)
)
"""All logic for metadata retrieval."""
from __future__ import annotations
+from base64 import b64encode
from time import time
from typing import TYPE_CHECKING, Optional
+from music_assistant.helpers.database import TABLE_THUMBS
from music_assistant.helpers.images import create_thumbnail
from music_assistant.models.media_items import Album, Artist, Playlist, Radio, Track
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
-TABLE_THUMBS = "thumbnails"
-
class MetaDataController:
"""Several helpers to search and store metadata for mediaitems."""
async def setup(self):
"""Async initialize of module."""
- async with self.mass.database.get_db() as _db:
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {TABLE_THUMBS}(
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- url TEXT NOT NULL,
- size INTEGER,
- img BLOB,
- UNIQUE(url, size));"""
- )
async def get_artist_metadata(self, artist: Artist) -> None:
"""Get/update rich metadata for an artist."""
if not artist.musicbrainz_id:
artist.musicbrainz_id = await self.get_artist_musicbrainz_id(artist)
- if metadata := await self.fanarttv.get_artist_metadata(artist):
- artist.metadata.update(metadata)
- if metadata := await self.audiodb.get_artist_metadata(artist):
- artist.metadata.update(metadata)
+
+ if artist.musicbrainz_id:
+ if metadata := await self.fanarttv.get_artist_metadata(artist):
+ artist.metadata.update(metadata)
+ if metadata := await self.audiodb.get_artist_metadata(artist):
+ artist.metadata.update(metadata)
artist.metadata.last_refresh = int(time())
# NOTE: we do not have any metadata for radiso so consider this future proofing ;-)
radio.metadata.last_refresh = int(time())
- async def get_artist_musicbrainz_id(self, artist: Artist) -> str:
+ async def get_artist_musicbrainz_id(self, artist: Artist) -> str | None:
"""Fetch musicbrainz id by performing search using the artist name, albums and tracks."""
- # try with album first
- for lookup_album in await self.mass.music.artists.get_provider_artist_albums(
+ ref_albums = await self.mass.music.artists.get_provider_artist_albums(
artist.item_id, artist.provider
- ):
- if artist.name != lookup_album.artist.name:
+ )
+ # first try audiodb
+ if musicbrainz_id := await self.audiodb.get_musicbrainz_id(artist, ref_albums):
+ return musicbrainz_id
+ # try again with musicbrainz with albums with upc
+ for ref_album in ref_albums:
+ if ref_album.upc:
+ if musicbrainz_id := await self.musicbrainz.get_mb_artist_id(
+ artist.name,
+ album_upc=ref_album.upc,
+ ):
+ return musicbrainz_id
+ if ref_album.musicbrainz_id:
+ if musicbrainz_id := await self.musicbrainz.search_artist_by_album_mbid(
+ artist.name, ref_album.musicbrainz_id
+ ):
+ return musicbrainz_id
+
+ # try again with matching on track isrc
+ ref_tracks = await self.mass.music.artists.get_provider_artist_toptracks(
+ artist.item_id, artist.provider
+ )
+ for ref_track in ref_tracks:
+ if not ref_track.isrc:
continue
- musicbrainz_id = await self.musicbrainz.get_mb_artist_id(
+ if musicbrainz_id := await self.musicbrainz.get_mb_artist_id(
artist.name,
- albumname=lookup_album.name,
- album_upc=lookup_album.upc,
- )
- if musicbrainz_id:
+ track_isrc=ref_track.isrc,
+ ):
return musicbrainz_id
- # fallback to track
- for lookup_track in await self.mass.music.artists.get_provider_artist_toptracks(
- artist.item_id, artist.provider
- ):
- musicbrainz_id = await self.musicbrainz.get_mb_artist_id(
+
+ # last restort: track matching by name
+ for ref_track in ref_tracks[:10]:
+ if musicbrainz_id := await self.musicbrainz.get_mb_artist_id(
artist.name,
- trackname=lookup_track.name,
- track_isrc=lookup_track.isrc,
- )
- if musicbrainz_id:
+ trackname=ref_track.name,
+ ):
return musicbrainz_id
- # lookup failed, use the shitty workaround to use the name as id.
+ # lookup failed
self.logger.warning("Unable to get musicbrainz ID for artist %s !", artist.name)
- return artist.name
+ return None
- async def get_thumbnail(self, url, size) -> bytes:
- """Get/create thumbnail image for url."""
- match = {"url": url, "size": size}
+ async def get_thumbnail(
+ self, path: str, size: Optional[int], base64: bool = False
+ ) -> bytes | str:
+ """Get/create thumbnail image for path."""
+ match = {"path": path, "size": size}
if result := await self.mass.database.get_row(TABLE_THUMBS, match):
- return result["img"]
- # create thumbnail if it doesn't exist
- thumbnail = await create_thumbnail(self.mass, url, size)
- await self.mass.database.insert_or_replace(
- TABLE_THUMBS, {**match, "img": thumbnail}
- )
+ thumbnail = result["data"]
+ else:
+ # create thumbnail if it doesn't exist
+ thumbnail = await create_thumbnail(self.mass, path, size)
+ await self.mass.database.insert_or_replace(
+ TABLE_THUMBS, {**match, "data": thumbnail}
+ )
+ if base64:
+ enc_image = b64encode(thumbnail).decode()
+ thumbnail = f"data:image/png;base64,{enc_image}"
return thumbnail
from __future__ import annotations
from json.decoder import JSONDecodeError
-from typing import TYPE_CHECKING, Any, Dict, Optional
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
import aiohttp
from asyncio_throttle import Throttler
"strArtistLogo": ImageType.LOGO,
"strArtistCutout": ImageType.CUTOUT,
"strArtistClearart": ImageType.CLEARART,
- "strArtistWideThumb": ImageType.WIDE_THUMB,
+ "strArtistWideThumb": ImageType.LANDSCAPE,
"strArtistFanart": ImageType.FANART,
"strArtistBanner": ImageType.BANNER,
"strAlbumThumb": ImageType.THUMB,
"strAlbumThumbHQ": ImageType.THUMB,
- "strAlbumCDart": ImageType.CDART,
+ "strAlbumCDart": ImageType.DISCART,
"strAlbum3DCase": ImageType.OTHER,
"strAlbum3DFlat": ImageType.OTHER,
"strAlbum3DFace": ImageType.OTHER,
return self.__parse_track(adb_track)
return None
+ async def get_musicbrainz_id(
+ self, artist: Artist, ref_albums: List[Album]
+ ) -> str | None:
+ """Try to discover MusicBrainz ID for an artist given some reference albums."""
+ self.logger.debug(
+ "Lookup MusicbrainzID for Artist %s on TheAudioDb", artist.name
+ )
+ musicbrainz_id = None
+ if data := await self._get_data("searchalbum.php", s=artist.name):
+ # NOTE: object is 'null' when no records found instead of empty array
+ for item in data.get("album", []) or []:
+ if not compare_strings(item["strArtistStripped"], artist.name):
+ continue
+ for ref_album in ref_albums:
+ if not compare_strings(item["strAlbumStripped"], ref_album.name):
+ continue
+ # found match - update album metadata too while we're here
+ if not ref_album.musicbrainz_id:
+ ref_album.metadata = self.__parse_album(item)
+ await self.mass.music.albums.add_db_item(ref_album)
+ musicbrainz_id = item["strMusicBrainzArtistID"]
+ if musicbrainz_id:
+ self.logger.debug(
+ "Found MusicBrainzID for artist %s on TheAudioDb", artist.name
+ )
+ return musicbrainz_id
+
def __parse_artist(self, artist_obj: Dict[str, Any]) -> MediaItemMetadata:
"""Parse audiodb artist object to MediaItemMetadata."""
metadata = MediaItemMetadata()
else:
metadata.description = artist_obj.get("strBiographyEN")
# images
- metadata.images = set()
+ metadata.images = []
for key, img_type in IMG_MAPPING.items():
for postfix in ("", "2", "3", "4", "5", "6", "7", "8", "9", "10"):
if img := artist_obj.get(f"{key}{postfix}"):
- metadata.images.add(MediaItemImage(img_type, img))
+ metadata.images.append(MediaItemImage(img_type, img))
else:
break
return metadata
metadata.description = album_obj.get("strDescriptionEN")
metadata.review = album_obj.get("strReview")
# images
- metadata.images = set()
+ metadata.images = []
for key, img_type in IMG_MAPPING.items():
for postfix in ("", "2", "3", "4", "5", "6", "7", "8", "9", "10"):
if img := album_obj.get(f"{key}{postfix}"):
- metadata.images.add(MediaItemImage(img_type, img))
+ metadata.images.append(MediaItemImage(img_type, img))
else:
break
return metadata
else:
metadata.description = track_obj.get("strDescriptionEN")
# images
- metadata.images = set()
+ metadata.images = []
for key, img_type in IMG_MAPPING.items():
for postfix in ("", "2", "3", "4", "5", "6", "7", "8", "9", "10"):
if img := track_obj.get(f"{key}{postfix}"):
- metadata.images.add(MediaItemImage(img_type, img))
+ metadata.images.append(MediaItemImage(img_type, img))
else:
break
return metadata
self.logger.debug("Fetching metadata for Artist %s on Fanart.tv", artist.name)
if data := await self._get_data(f"music/{artist.musicbrainz_id}"):
metadata = MediaItemMetadata()
- metadata.images = set()
+ metadata.images = []
for key, img_type in IMG_MAPPING.items():
items = data.get(key)
if not items:
continue
for item in items:
- metadata.images.add(MediaItemImage(img_type, item["url"]))
+ metadata.images.append(MediaItemImage(img_type, item["url"]))
return metadata
return None
if data and data.get("albums"):
data = data["albums"][album.musicbrainz_id]
metadata = MediaItemMetadata()
- metadata.images = set()
+ metadata.images = []
for key, img_type in IMG_MAPPING.items():
items = data.get(key)
if not items:
continue
for item in items:
- metadata.images.add(MediaItemImage(img_type, item["url"]))
+ metadata.images.append(MediaItemImage(img_type, item["url"]))
return metadata
return None
from asyncio_throttle import Throttler
from music_assistant.helpers.cache import use_cache
-from music_assistant.helpers.compare import compare_strings, get_compare_string
+from music_assistant.helpers.compare import compare_strings
+from music_assistant.helpers.util import create_clean_string
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
"""Retrieve musicbrainz artist id by providing the artist name and albumname or upc."""
for searchartist in [
re.sub(LUCENE_SPECIAL, r"\\\1", artistname),
- get_compare_string(artistname),
+ create_clean_string(artistname),
+ artistname,
]:
if album_upc:
query = f"barcode:{album_upc}"
return artist["id"]
return ""
+ async def search_artist_by_album_mbid(
+ self, artistname, album_mbid: str
+ ) -> str | None:
+ """Retrieve musicbrainz artist id by providing the artist name and albumname or upc."""
+ result = await self.get_data(f"release-group/{album_mbid}?inc=artist-credits")
+ if result and "artist-credit" in result:
+ for strictness in [True, False]:
+ for item in result["artist-credit"]:
+ if artist := item.get("artist"):
+ if compare_strings(artistname, artist["name"], strictness):
+ return artist["id"]
+ return None
+
@use_cache(86400 * 30)
async def get_data(self, endpoint: str, **kwargs):
"""Get data from api."""
import asyncio
import statistics
-from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
from databases import Database as Db
)
from music_assistant.helpers.datetime import utc_timestamp
from music_assistant.helpers.uri import parse_uri
-from music_assistant.models.enums import MediaType
+from music_assistant.models.config import MusicProviderConfig
+from music_assistant.models.enums import MediaType, ProviderType
from music_assistant.models.errors import MusicAssistantError, SetupFailedError
from music_assistant.models.media_items import (
MediaItem,
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
-PROVIDERS = (FileSystemProvider, QobuzProvider, SpotifyProvider, TuneInProvider)
+PROV_MAP = {
+ ProviderType.FILESYSTEM_LOCAL: FileSystemProvider,
+ ProviderType.SPOTIFY: SpotifyProvider,
+ ProviderType.QOBUZ: QobuzProvider,
+ ProviderType.TUNEIN: TuneInProvider,
+}
class MusicController:
async def setup(self):
"""Async initialize of module."""
# register providers
- for prov in PROVIDERS:
- await self._register_provider(prov())
+ for prov_conf in self.mass.config.providers:
+ prov_cls = PROV_MAP[prov_conf.type]
+ await self._register_provider(prov_cls(self.mass, prov_conf), prov_conf)
+ # TODO: handle deletion of providers ?
async def start_sync(self, schedule: Optional[float] = 3) -> None:
"""
Start running the sync of all registred providers.
- schedule: schedule syncjob every X hours, set to None for just a manual sync run.
+ :param schedule: schedule syncjob every X hours, set to None for just a manual sync run.
"""
async def do_sync():
while True:
for prov in self.providers:
- await self.run_provider_sync(prov.id)
+ self.mass.add_job(
+ prov.sync_library(),
+ f"Library sync for provider {prov.name}",
+ allow_duplicate=False,
+ )
if schedule is None:
return
await asyncio.sleep(3600 * schedule)
"""Return all (available) music providers."""
return tuple(x for x in self._providers.values() if x.available)
- def get_provider(self, provider_id: str) -> MusicProvider | None:
- """Return provider/plugin by id."""
- prov = self._providers.get(provider_id, None)
- if prov is None or not prov.available:
- self.logger.warning("Provider %s is not available", provider_id)
- return prov
+ def get_provider(
+ self, provider_id: Union[str, ProviderType]
+ ) -> MusicProvider | None:
+ """Return Music provider by id (or type)."""
+ if prov := self._providers.get(provider_id):
+ return prov
+ for prov in self._providers.values():
+ if provider_id in (prov.type, prov.id, prov.type.value):
+ return prov
+ self.logger.warning("Provider %s is not available", provider_id)
+ return None
async def search(
self, search_query, media_types: List[MediaType], limit: int = 10
# TODO: sort by name and filter out duplicates ?
return await asyncio.gather(
*[
- self.search_provider(search_query, prov_id, media_types, limit)
+ self.search_provider(
+ search_query, media_types, provider_id=prov_id, limit=limit
+ )
for prov_id in provider_ids
]
)
async def search_provider(
self,
search_query: str,
- provider_id: str,
media_types: List[MediaType],
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
limit: int = 10,
) -> List[MediaItemType]:
"""
:param media_types: A list of media_types to include. All types if None.
:param limit: number of items to return in the search (per type).
"""
- if provider_id == "database":
+ if provider == ProviderType.DATABASE or provider_id == "database":
# get results from database
return (
- await self.artists.search(search_query, "database", limit)
- + await self.albums.search(search_query, "database", limit)
- + await self.tracks.search(search_query, "database", limit)
- + await self.playlists.search(search_query, "database", limit)
- + await self.radio.search(search_query, "database", limit)
+ await self.artists.search(search_query, provider, provider_id, limit)
+ + await self.albums.search(search_query, provider, provider_id, limit)
+ + await self.tracks.search(search_query, provider, provider_id, limit)
+ + await self.playlists.search(
+ search_query, provider, provider_id, limit
+ )
+ + await self.radio.search(search_query, provider, provider_id, limit)
)
- provider = self.get_provider(provider_id)
+ provider = self.get_provider(provider_id or provider)
return await provider.search(search_query, media_types, limit)
async def get_item_by_uri(
"""Fetch MediaItem by uri."""
media_type, provider, item_id = parse_uri(uri)
return await self.get_item(
- item_id, provider, media_type, force_refresh=force_refresh, lazy=lazy
+ item_id=item_id,
+ media_type=media_type,
+ provider=provider,
+ force_refresh=force_refresh,
+ lazy=lazy,
)
async def get_item(
self,
item_id: str,
- provider_id: str,
media_type: MediaType,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
force_refresh: bool = False,
lazy: bool = True,
) -> MediaItemType:
"""Get single music item by id and media type."""
+ assert provider or provider_id, "provider or provider_id must be supplied"
ctrl = self.get_controller(media_type)
return await ctrl.get(
- item_id, provider_id, force_refresh=force_refresh, lazy=lazy
+ provider_item_id=item_id,
+ provider=provider,
+ provider_id=provider_id,
+ force_refresh=force_refresh,
+ lazy=lazy,
)
async def add_to_library(
self,
media_type: MediaType,
provider_item_id: str,
- provider_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
) -> None:
"""Add an item to the library."""
ctrl = self.get_controller(media_type)
- await ctrl.add_to_library(provider_item_id, provider_id)
+ await ctrl.add_to_library(
+ provider_item_id, provider=provider, provider_id=provider_id
+ )
async def remove_from_library(
- self, media_type: MediaType, provider_item_id: str, provider_id: str
+ self,
+ media_type: MediaType,
+ provider_item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
) -> None:
"""Remove item from the library."""
ctrl = self.get_controller(media_type)
- await ctrl.remove_from_library(provider_item_id, provider_id)
+ await ctrl.remove_from_library(
+ provider_item_id, provider=provider, provider_id=provider_id
+ )
async def get_provider_mapping(
self,
- media_type: MediaType,
- provider_id: str,
- provider_item_id: str,
+ media_type: Optional[MediaType] = None,
+ provider_item_id: Optional[str] = None,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ url: Optional[str] = None,
db: Optional[Db] = None,
+ return_key: str = "item_id",
) -> int | None:
"""Lookup database id for media item from provider id."""
+ match = {}
+ if media_type is not None:
+ match["media_type"] = media_type.value
+ if provider_item_id is not None:
+ match["prov_item_id"] = provider_item_id
+ if provider is not None:
+ match["prov_type"] = provider.value
+ if provider_id is not None:
+ match["prov_id"] = provider_id
+ if url is not None:
+ match["url"] = url
if result := await self.mass.database.get_row(
TABLE_PROV_MAPPINGS,
- {
- "media_type": media_type.value,
- "provider": provider_id,
- "prov_item_id": provider_item_id,
- },
+ match,
db=db,
):
- return result["item_id"]
+ return result[return_key] if return_key else result
return None
async def get_provider_mappings(
self,
media_type: MediaType,
- provider_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
db: Optional[Db] = None,
) -> List[int]:
"""Lookup all database id's for media type for given provider id."""
+ match = {
+ "media_type": media_type.value,
+ }
+ if provider is not None:
+ match["prov_type"] = provider.value
+ if provider_id is not None:
+ match["prov_id"] = provider_id
if result := await self.mass.database.get_rows(
TABLE_PROV_MAPPINGS,
- {
- "media_type": media_type.value,
- "provider": provider_id,
- },
+ match,
db=db,
):
return [x["item_id"] for x in result]
"item_id": item_id,
"media_type": media_type.value,
"prov_item_id": prov_id.item_id,
- "provider": prov_id.provider,
+ "prov_id": prov_id.prov_id,
+ "prov_type": prov_id.prov_type.value,
"quality": prov_id.quality.value if prov_id.quality else None,
"details": prov_id.details,
"url": prov_id.url,
try:
return await self.get_item(
media_item.item_id,
- media_item.provider,
media_item.media_type,
+ provider=media_item.provider,
force_refresh=True,
lazy=False,
)
for item in await self.search(media_item.name, [media_item.media_type], 20):
if item.available:
await self.get_item(
- item.item_id, item.provider, item.media_type, lazy=False
+ item.item_id, item.media_type, item.provider, lazy=False
)
- async def set_track_loudness(self, item_id: str, provider_id: str, loudness: int):
+ async def set_track_loudness(
+ self, item_id: str, provider: ProviderType, loudness: int
+ ):
"""List integrated loudness for a track in db."""
await self.mass.database.insert_or_replace(
TABLE_TRACK_LOUDNESS,
- {"item_id": item_id, "provider": provider_id, "loudness": loudness},
+ {"item_id": item_id, "provider": provider.value, "loudness": loudness},
)
async def get_track_loudness(
- self, provider_item_id: str, provider_id: str
+ self, provider_item_id: str, provider: ProviderType
) -> float | None:
"""Get integrated loudness for a track in db."""
if result := await self.mass.database.get_row(
TABLE_TRACK_LOUDNESS,
{
"item_id": provider_item_id,
- "provider": provider_id,
+ "provider": provider.value,
},
):
return result["loudness"]
return None
- async def get_provider_loudness(self, provider_id: str) -> float | None:
+ async def get_provider_loudness(self, provider: ProviderType) -> float | None:
"""Get average integrated loudness for tracks of given provider."""
all_items = []
for db_row in await self.mass.database.get_rows(
TABLE_TRACK_LOUDNESS,
{
- "provider": provider_id,
+ "provider": provider.value,
},
):
all_items.append(db_row["loudness"])
return statistics.fmean(all_items)
return None
- async def mark_item_played(self, item_id: str, provider_id: str):
+ async def mark_item_played(self, item_id: str, provider: ProviderType):
"""Mark item as played in playlog."""
timestamp = utc_timestamp()
await self.mass.database.insert_or_replace(
TABLE_PLAYLOG,
- {"item_id": item_id, "provider": provider_id, "timestamp": timestamp},
+ {"item_id": item_id, "provider": provider.value, "timestamp": timestamp},
)
async def library_add_items(self, items: List[MediaItem]) -> None:
job_desc,
)
- async def run_provider_sync(self, provider_id: str) -> None:
- """Run/schedule library sync for a provider."""
- provider = self.get_provider(provider_id)
- if not provider:
- return
- for media_type in provider.supported_mediatypes:
- self.mass.add_job(
- self._library_items_sync(
- media_type,
- provider_id,
- ),
- f"Library sync of {media_type.value}s for provider {provider.name}",
- allow_duplicate=False,
- )
-
def get_controller(
self, media_type: MediaType
) -> ArtistsController | AlbumsController | TracksController | RadioController | PlaylistController:
if media_type == MediaType.PLAYLIST:
return self.playlists
- async def _library_items_sync(
- self, media_type: MediaType, provider_id: str
+ async def _register_provider(
+ self, provider: MusicProvider, conf: MusicProviderConfig
) -> None:
- """Sync library items for given provider."""
- music_provider = self.get_provider(provider_id)
- if not music_provider or not music_provider.available:
- return
- controller = self.get_controller(media_type)
- # create a set of all previous and current db id's
- prev_ids = set()
- for db_item in await controller.library():
- for prov_id in db_item.provider_ids:
- if prov_id.provider == provider_id:
- prev_ids.add(db_item.item_id)
- cur_ids = set()
- for prov_item in await music_provider.get_library_items(media_type):
- prov_item: MediaItemType = prov_item
-
- db_item: MediaItemType = await controller.get_db_item_by_prov_id(
- prov_item.provider, prov_item.item_id
- )
- if not db_item and media_type == MediaType.ARTIST:
- # for artists we need a fully matched item (with musicbrainz id)
- db_item = await controller.get(
- prov_item.item_id,
- prov_item.provider,
- lazy=False,
- )
- elif not db_item:
- # for other mediatypes its enough to simply dump the item in the db
- db_item = await controller.add_db_item(prov_item)
- elif db_item.metadata.checksum != prov_item.metadata.checksum:
- # item checksum changed
- # used by filesystem tracks and playlist items
- db_item = await controller.add_db_item(prov_item)
-
- cur_ids.add(db_item.item_id)
- if not db_item.in_library:
- await controller.set_db_library(db_item.item_id, True)
- # precache playlist/album tracks
- if media_type in [MediaType.PLAYLIST, MediaType.ALBUM]:
- await controller.tracks(prov_item.item_id, provider_id)
-
- # process deletions
- for item_id in prev_ids:
- if item_id not in cur_ids:
- await controller.set_db_library(item_id, False)
- # in case of filestem, removal from library means the whole item is
- # moved/deleted so we remove the prov mapping from db.
- if provider_id == "filesystem":
- if db_item := await controller.get_db_item(item_id):
- db_item.provider_ids = {
- x for x in db_item.provider_ids if x.provider != provider_id
- }
- await controller.update_db_item(item_id, db_item, True)
-
- async def _register_provider(self, provider: MusicProvider) -> None:
"""Register a music provider."""
+ if provider.id in self._providers:
+ raise SetupFailedError(
+ f"Provider with id {provider.id} is already registered"
+ )
try:
+ provider.config = conf
provider.mass = self.mass
provider.cache = self.mass.cache
- provider.logger = self.logger.getChild(provider.id)
+ provider.logger = self.logger.getChild(provider.type.value)
if await provider.setup():
self._providers[provider.id] = provider
except Exception as err: # pylint: disable=broad-except
raise SetupFailedError(
- f"Setup failed of provider {provider.id}: {str(err)}"
+ f"Setup failed of provider {provider.type.value}: {str(err)}"
) from err
from music_assistant.helpers.compare import compare_album, compare_strings
from music_assistant.helpers.database import TABLE_ALBUMS
from music_assistant.helpers.json import json_serializer
-from music_assistant.helpers.util import create_sort_name
-from music_assistant.models.enums import EventType
+from music_assistant.models.enums import EventType, ProviderType
from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
)
return album
- async def tracks(self, item_id: str, provider_id: str) -> List[Track]:
+ async def tracks(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ ) -> List[Track]:
"""Return album tracks for the given provider album id."""
- album = await self.get(item_id, provider_id)
+ album = await self.get(item_id, provider, provider_id)
# simply return the tracks from the first provider
for prov in album.provider_ids:
if tracks := await self.get_provider_album_tracks(
- prov.item_id, prov.provider
+ prov.item_id, prov.prov_id
):
return tracks
return []
- async def versions(self, item_id: str, provider_id: str) -> List[Album]:
+ async def versions(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ ) -> List[Album]:
"""Return all versions of an album we can find on all providers."""
- album = await self.get(item_id, provider_id)
- provider_ids = {item.id for item in self.mass.music.providers}
+ album = await self.get(item_id, provider, provider_id)
+ prov_types = {item.type for item in self.mass.music.providers}
search_query = f"{album.artist.name} {album.name}"
return [
prov_item
for prov_items in await asyncio.gather(
- *[self.search(search_query, prov_id) for prov_id in provider_ids]
+ *[self.search(search_query, prov_type) for prov_type in prov_types]
)
for prov_item in prov_items
if compare_strings(prov_item.artist.name, album.artist.name)
async def add(self, item: Album) -> Album:
"""Add album to local db and return the database item."""
- # make sure we have an artist
- assert item.artist
# grab additional metadata
await self.mass.metadata.get_album_metadata(item)
db_item = await self.add_db_item(item)
return db_item
async def get_provider_album_tracks(
- self, item_id: str, provider_id: str
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
) -> List[Track]:
"""Return album tracks for the given provider album id."""
- provider = self.mass.music.get_provider(provider_id)
+ provider = self.mass.music.get_provider(provider_id or provider)
if not provider:
return []
return await provider.get_album_tracks(item_id)
async def add_db_item(self, album: Album) -> Album:
"""Add a new album record to the database."""
+ assert album.provider_ids, "Album is missing provider id(s)"
cur_item = None
- if not album.sort_name:
- album.sort_name = create_sort_name(album.name)
assert album.provider_ids
async with self.mass.database.get_db() as _db:
- # always try to grab existing item by external_id
+ # always try to grab existing item by musicbrainz_id
if album.musicbrainz_id:
match = {"musicbrainz_id": album.musicbrainz_id}
cur_item = await self.mass.database.get_row(
self.db_table,
{"item_id": item_id},
{
- "name": album.name if overwrite else cur_item.name,
- "sort_name": album.sort_name if overwrite else cur_item.sort_name,
+ "name": album.name if overwrite and album.name else cur_item.name,
+ "sort_name": album.sort_name
+ if overwrite and album.sort_name
+ else cur_item.sort_name,
"version": album.version if overwrite else cur_item.version,
"year": album.year or cur_item.year,
"upc": album.upc or cur_item.upc,
This is used to link objects of different providers/qualities together.
"""
- if db_album.provider != "database":
+ if db_album.provider != ProviderType.DATABASE:
return # Matching only supported for database items
async def find_prov_match(provider: MusicProvider):
await self.mass.music.artists.update_db_item(
db_album.artist.item_id, prov_album.artist
)
+ return match_found
- # no match found
- if not match_found:
+ # try to find match on all providers
+ cur_prov_types = {x.prov_type for x in db_album.provider_ids}
+ for provider in self.mass.music.providers:
+ if provider.type in cur_prov_types:
+ continue
+ if MediaType.ALBUM not in provider.supported_mediatypes:
+ continue
+ if await find_prov_match(provider):
+ cur_prov_types.add(provider.type)
+ else:
self.logger.debug(
"Could not find match for Album %s on provider %s",
db_album.name,
provider.name,
)
- # try to find match on all providers
- for provider in self.mass.music.providers:
- if provider.id == "filesystem":
- continue
- if MediaType.ALBUM in provider.supported_mediatypes:
- await find_prov_match(provider)
-
async def _get_album_artist(
self, db_album: Album, updated_album: Optional[Album] = None
) -> ItemMapping | None:
return ItemMapping.from_item(album_artist)
if album_artist := await self.mass.music.artists.get_db_item_by_prov_id(
- album.artist.provider, album.artist.item_id
+ album.artist.item_id,
+ provider=album.artist.provider,
):
return ItemMapping.from_item(album_artist)
import asyncio
import itertools
-from typing import List
+from typing import List, Optional
from music_assistant.helpers.compare import (
compare_album,
)
from music_assistant.helpers.database import TABLE_ARTISTS
from music_assistant.helpers.json import json_serializer
-from music_assistant.helpers.util import create_sort_name
-from music_assistant.models.enums import EventType
+from music_assistant.models.enums import EventType, ProviderType
from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
media_type = MediaType.ARTIST
item_cls = Artist
- async def toptracks(self, item_id: str, provider_id: str) -> List[Track]:
+ async def toptracks(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ ) -> List[Track]:
"""Return top tracks for an artist."""
- artist = await self.get(item_id, provider_id)
+ artist = await self.get(item_id, provider, provider_id)
# get results from all providers
- # TODO: add db results
- return itertools.chain.from_iterable(
- await asyncio.gather(
- *[
- self.get_provider_artist_toptracks(item.item_id, item.provider)
- for item in artist.provider_ids
- ]
- )
- )
+ coros = [
+ self.get_provider_artist_toptracks(item.item_id, item.prov_id)
+ for item in artist.provider_ids
+ ]
+ if provider == ProviderType.DATABASE:
+ coros.append(self.get_database_artist_tracks(item_id, provider))
+ return itertools.chain.from_iterable(await asyncio.gather(*coros))
- async def albums(self, item_id: str, provider_id: str) -> List[Album]:
+ async def albums(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ ) -> List[Album]:
"""Return (all/most popular) albums for an artist."""
- artist = await self.get(item_id, provider_id)
+ artist = await self.get(item_id, provider, provider_id)
# get results from all providers
- # TODO: add db results
- return itertools.chain.from_iterable(
- await asyncio.gather(
- *[
- self.get_provider_artist_albums(item.item_id, item.provider)
- for item in artist.provider_ids
- ]
- )
- )
+ coros = [
+ self.get_provider_artist_albums(item.item_id, item.prov_id)
+ for item in artist.provider_ids
+ ]
+ if provider == ProviderType.DATABASE:
+ coros.append(self.get_database_artist_albums(item_id, provider))
+ return itertools.chain.from_iterable(await asyncio.gather(*coros))
async def add(self, item: Artist) -> Artist:
"""Add artist to local db and return the database item."""
This is used to link objects of different providers together.
"""
assert (
- db_artist.provider == "database"
+ db_artist.provider == ProviderType.DATABASE
), "Matching only supported for database items!"
- cur_providers = {item.provider for item in db_artist.provider_ids}
+ cur_prov_types = {x.prov_type for x in db_artist.provider_ids}
for provider in self.mass.music.providers:
- if provider.id in cur_providers:
- continue
- if provider.id == "filesystem":
+ if provider.type in cur_prov_types:
continue
if MediaType.ARTIST not in provider.supported_mediatypes:
continue
- if not await self._match(db_artist, provider):
+ if await self._match(db_artist, provider):
+ cur_prov_types.add(provider.type)
+ else:
self.logger.debug(
"Could not find match for Artist %s on provider %s",
db_artist.name,
return []
return await provider.get_artist_toptracks(item_id)
+ async def get_database_artist_tracks(
+ self, artist_id: str, provider: ProviderType
+ ) -> List[Track]:
+ """Return tracks for an artist in database."""
+ query = f"SELECT * FROM tracks WHERE artists LIKE '%\"{artist_id}\"%'"
+ query += " and artists LIKE '%\"{provider.value}\"%'"
+ return await self.mass.music.tracks.get_db_items(query)
+
+ async def get_database_artist_albums(
+ self, artist_id: str, provider: ProviderType
+ ) -> List[Track]:
+ """Return tracks for an artist in database."""
+ query = f"SELECT * FROM albums WHERE artist LIKE '%\"{artist_id}\"%'"
+ query += " and artist LIKE '%\"{provider.value}\"%'"
+ return await self.mass.music.albums.get_db_items(query)
+
async def get_provider_artist_albums(
self, item_id: str, provider_id: str
) -> List[Album]:
async def add_db_item(self, artist: Artist) -> Artist:
"""Add a new artist record to the database."""
- assert artist.musicbrainz_id
- assert artist.name
- assert artist.provider_ids
- match = {"musicbrainz_id": artist.musicbrainz_id}
- if cur_item := await self.mass.database.get_row(self.db_table, match):
- # update existing
- return await self.update_db_item(cur_item["item_id"], artist)
- # insert artist
+ assert artist.provider_ids, "Album is missing provider id(s)"
async with self.mass.database.get_db() as _db:
- if not artist.sort_name:
- artist.sort_name = create_sort_name(artist.name)
+ # always try to grab existing item by musicbrainz_id
+ cur_item = None
+ if artist.musicbrainz_id:
+ match = {"musicbrainz_id": artist.musicbrainz_id}
+ cur_item = await self.mass.database.get_row(
+ self.db_table, match, db=_db
+ )
+ if not cur_item:
+ # fallback to matching
+ # NOTE: we match an artist by name which could theoretically lead to collisions
+ # but the chance is so small it is not worth the additional overhead of grabbing
+ # the musicbrainz id upfront
+ match = {"sort_name": artist.sort_name}
+ for row in await self.mass.database.get_rows(
+ self.db_table, match, db=_db
+ ):
+ row_artist = Artist.from_db_row(row)
+ if compare_strings(row_artist.sort_name, artist.sort_name):
+ cur_item = row_artist
+ break
+ if cur_item:
+ # update existing
+ return await self.update_db_item(cur_item.item_id, artist)
+
+ # insert artist
new_item = await self.mass.database.insert_or_replace(
self.db_table, artist.to_db_row(), db=_db
)
ref_track.item_id, ref_track.provider
)
searchstr = f"{db_artist.name} {ref_track.name}"
- search_results = await self.mass.music.tracks.search(searchstr, provider.id)
+ search_results = await self.mass.music.tracks.search(
+ searchstr, provider.type
+ )
for search_result_item in search_results:
if compare_track(search_result_item, ref_track):
# get matching artist from track
if ref_album.album_type == AlbumType.COMPILATION:
continue
searchstr = f"{db_artist.name} {ref_album.name}"
- search_result = await self.mass.music.albums.search(searchstr, provider.id)
+ search_result = await self.mass.music.albums.search(
+ searchstr, provider.type
+ )
for search_result_item in search_result:
# artist must match 100%
if not compare_strings(db_artist.name, search_result_item.artist.name):
from __future__ import annotations
from time import time
-from typing import List
+from typing import List, Optional
from music_assistant.helpers.database import TABLE_PLAYLISTS
from music_assistant.helpers.json import json_serializer
-from music_assistant.helpers.util import create_sort_name
-from music_assistant.models.enums import EventType, MediaType
+from music_assistant.models.enums import EventType, MediaType, ProviderType
from music_assistant.models.errors import InvalidDataError, MediaNotFoundError
from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
"""Get in-library playlist by name."""
return await self.mass.database.get_row(self.db_table, {"name": name})
- async def tracks(self, item_id: str, provider_id: str) -> List[Track]:
+ async def tracks(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ ) -> List[Track]:
"""Return playlist tracks for the given provider playlist id."""
- playlist = await self.get(item_id, provider_id)
- # simply return the tracks from the first provider
- for prov in playlist.provider_ids:
- if tracks := await self.get_provider_playlist_tracks(
- prov.item_id, prov.provider
- ):
- return tracks
- return []
+ if provider == ProviderType.DATABASE or provider_id == "database":
+ playlist = await self.get_db_item(item_id)
+ prov = next(x for x in playlist.provider_ids)
+ item_id = prov.item_id
+ provider_id = prov.prov_id
+
+ provider = self.mass.music.get_provider(provider_id or provider)
+ if not provider:
+ return []
+
+ return await provider.get_playlist_tracks(item_id)
async def add(self, item: Playlist) -> Playlist:
"""Add playlist to local db and return the new database item."""
# grab all existing track ids in the playlist so we can check for duplicates
cur_playlist_track_ids = set()
count = 0
- for item in await self.tracks(playlist_prov.item_id, playlist_prov.provider):
+ for item in await self.tracks(playlist_prov.item_id, playlist_prov.prov_id):
count += 1
cur_playlist_track_ids.update(
{
i.item_id
for i in item.provider_ids
- if i.provider == playlist_prov.provider
+ if i.prov_id == playlist_prov.prov_id
}
)
# check for duplicates
for track_prov in track.provider_ids:
if (
- track_prov.provider == playlist_prov.provider
+ track_prov.prov_id == playlist_prov.prov_id
and track_prov.item_id in cur_playlist_track_ids
):
raise InvalidDataError(
):
if not track.available:
continue
- if track_version.provider == playlist_prov.provider:
- track_id_to_add = track_version.item_id
- break
- if playlist_prov.provider == "file":
+ if playlist_prov.prov_type.is_file():
# the file provider can handle uri's from all providers so simply add the uri
track_id_to_add = track.uri
break
+ if track_version.prov_id == playlist_prov.prov_id:
+ track_id_to_add = track_version.item_id
+ break
if not track_id_to_add:
raise MediaNotFoundError(
- "Track is not available on provider {playlist_prov.provider}"
+ f"Track is not available on provider {playlist_prov.prov_type}"
)
# actually add the tracks to the playlist on the provider
- provider = self.mass.music.get_provider(playlist_prov.provider)
+ provider = self.mass.music.get_provider(playlist_prov.prov_id)
await provider.add_playlist_tracks(playlist_prov.item_id, [track_id_to_add])
# update local db entry
self.mass.signal_event(
raise InvalidDataError(f"Playlist {playlist.name} is not editable")
for prov in playlist.provider_ids:
track_ids_to_remove = []
- for playlist_track in await self.get_provider_playlist_tracks(
- prov.item_id, prov.provider
- ):
+ for playlist_track in await self.tracks(prov.item_id, prov.prov_id):
if playlist_track.position not in positions:
continue
track_ids_to_remove.append(playlist_track.item_id)
# actually remove the tracks from the playlist on the provider
if track_ids_to_remove:
- provider = self.mass.music.get_provider(prov.provider)
+ provider = self.mass.music.get_provider(prov.prov_id)
await provider.remove_playlist_tracks(prov.item_id, track_ids_to_remove)
self.mass.signal_event(
MassEvent(
)
)
- async def get_provider_playlist_tracks(
- self, item_id: str, provider_id: str
- ) -> List[Track]:
- """Return playlist tracks for the given provider playlist id."""
- provider = self.mass.music.get_provider(provider_id)
- if not provider:
- return []
-
- # we need to make sure that position is set on the track
- def playlist_track_with_position(track: Track, index: int):
- if track.position is None:
- track.position = index
- return track
-
- tracks = await provider.get_playlist_tracks(item_id)
- return [
- playlist_track_with_position(track, index)
- for index, track in enumerate(tracks)
- ]
-
async def add_db_item(self, playlist: Playlist) -> Playlist:
"""Add a new playlist record to the database."""
async with self.mass.database.get_db() as _db:
else:
metadata = cur_item.metadata.update(playlist.metadata)
provider_ids = {*cur_item.provider_ids, *playlist.provider_ids}
- if not playlist.sort_name:
- playlist.sort_name = create_sort_name(playlist.name)
async with self.mass.database.get_db() as _db:
await self.mass.database.update(
from __future__ import annotations
import asyncio
-import base64
import os
-from typing import Dict, List, Optional, Tuple
+import urllib.parse
+from contextlib import asynccontextmanager
+from time import time
+from typing import Generator, List, Optional, Tuple
import aiofiles
-from tinytag.tinytag import TinyTag, TinyTagException
-
-from music_assistant.helpers.compare import compare_strings
-from music_assistant.helpers.util import parse_title_and_version, try_parse_int
+import xmltodict
+from aiofiles.threadpool.binary import AsyncFileIO
+from tinytag.tinytag import TinyTag
+
+from music_assistant.helpers.util import (
+ create_clean_string,
+ parse_title_and_version,
+ try_parse_int,
+)
+from music_assistant.models.enums import ProviderType
from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant.models.media_items import (
Album,
from music_assistant.models.provider import MusicProvider
+def scantree(path: str) -> Generator[os.DirEntry]:
+ """Recursively yield DirEntry objects for given directory."""
+ for entry in os.scandir(path):
+ if entry.is_dir(follow_symlinks=False):
+ yield from scantree(entry.path) # see below for Python 2.x
+ else:
+ yield entry
+
+
def split_items(org_str: str, splitters: Tuple[str] = None) -> Tuple[str]:
"""Split up a tags string by common splitter."""
+ if isinstance(org_str, list):
+ return org_str
if splitters is None:
splitters = ("/", ";", ",")
if org_str is None:
class FileSystemProvider(MusicProvider):
"""
- Very basic implementation of a musicprovider for local files.
+ Implementation of a musicprovider for local files.
Assumes files are stored on disk in format <artist>/<album>/<track.ext>
Reads ID3 tags from file and falls back to parsing filename
Supports m3u files only for playlists
Supports having URI's from streaming providers within m3u playlist
- Should be compatible with LMS
"""
- _attr_id = "filesystem"
_attr_name = "Filesystem"
- _playlists_dir = ""
- _music_dir = ""
+ _attr_type = ProviderType.FILESYSTEM_LOCAL
_attr_supported_mediatypes = [
- MediaType.ARTIST,
- MediaType.ALBUM,
MediaType.TRACK,
MediaType.PLAYLIST,
+ MediaType.ARTIST,
+ MediaType.ALBUM,
]
+ def __init__(self, *args, **kwargs) -> None:
+ """Initialize MusicProvider."""
+ super().__init__(*args, **kwargs)
+ self._cache_built = asyncio.Event()
+
async def setup(self) -> bool:
"""Handle async initialization of the provider."""
- if not self.mass.config.filesystem_enabled:
- return False
- self._music_dir = self.mass.config.filesystem_music_dir
- self._playlists_dir = (
- self.mass.config.filesystem_playlists_dir or self._music_dir
- )
-
- if not os.path.isdir(self._music_dir):
+ if not os.path.isdir(self.config.path):
raise MediaNotFoundError(
- f"Music Directory {self._music_dir} does not exist"
+ f"Music Directory {self.config.path} does not exist"
)
- if not os.path.isdir(self._playlists_dir):
- raise MediaNotFoundError(
- f"Playlist Directory {self._playlists_dir} does not exist"
- )
return True
- async def search(
- self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5
- ) -> List[MediaItemType]:
- """
- Perform search on musicprovider.
-
- :param search_query: Search query.
- :param media_types: A list of media_types to include. All types if None.
- :param limit: Number of items to return in the search (per type).
- """
- result = []
- for track in await self.get_library_tracks(True):
- for search_part in search_query.split(" - "):
- if media_types is None or MediaType.TRACK in media_types:
- if compare_strings(track.name, search_part):
- result.append(track)
- if media_types is None or MediaType.ALBUM in media_types:
- if track.album:
- if compare_strings(track.album.name, search_part):
- result.append(track.album)
- if media_types is None or MediaType.ARTIST in media_types:
- if track.album and track.album.artist:
- if compare_strings(track.album.artist, search_part):
- result.append(track.album.artist)
- return result
-
- async def get_library_artists(self) -> List[Artist]:
- """Retrieve all library artists."""
- result = []
- cur_ids = set()
- # for the sake of simplicity we only iterate over the files in one location only,
- # which is the library tracks where we recursively enumerate the directory structure
- # library artists = unique album artists across all tracks
- # the track listing is cached so this should be (pretty) fast
- for track in await self.get_library_tracks(True):
- if track.album is None or track.album is None:
- continue
- if track.album.artist.item_id in cur_ids:
- continue
- result.append(track.album.artist)
- cur_ids.add(track.album.artist.item_id)
- return result
+ @staticmethod
+ async def search(*args, **kwargs) -> List[MediaItemType]:
+ """Perform search on musicprovider."""
+ # items for the filesystem provider are already returned by the database
+ return []
+
+ async def sync_library(self) -> None:
+ """Run library sync for this provider."""
+ last_save = 0
+ cache_key = f"{self.id}.checksums"
+ checksums = await self.mass.cache.get(cache_key)
+ if checksums is None:
+ checksums = {}
+ # find all music files in the music directory and all subfolders
+ # we work bottom down, as-in we derive all info from the tracks
+ for entry in scantree(self.config.path):
- async def get_library_albums(self) -> List[Album]:
- """Get album folders recursively."""
- result = []
- cur_ids = set()
- # for the sake of simplicity we only iterate over the files in one location only,
- # which is the library tracks where we recurisvely enumerate the directory structure
- # library albums = unique albums across all tracks
- # the track listing is cached so this should be (pretty) fast
- for track in await self.get_library_tracks(True):
- if track.album is None:
+ # mtime is used as file checksum
+ checksum = str(entry.stat().st_mtime)
+ if checksum == checksums.get(entry.path):
continue
- if track.album.item_id in cur_ids:
- continue
- result.append(track.album)
- cur_ids.add(track.album.item_id)
- return result
-
- async def get_library_tracks(self, use_cache=False) -> List[Track]:
- """Get all tracks recursively."""
- # pylint: disable=arguments-differ
- # we cache the entire tracks listing for performance and convenience reasons
- # so we can easy retrieve the library artists and albums from the tracks listing
- cache_key = f"{self.id}.tracks"
- cache_result: Dict[str, dict] = await self.mass.cache.get(
- cache_key, checksum=self._music_dir
- )
- if cache_result is not None and use_cache:
- return [Track.from_dict(x) for x in cache_result.values()]
- if cache_result is None:
- cache_result = {}
-
- # TEMP: account for mounted network location not yet available
- prev_count = await self.mass.cache.get(f"{self.id}.count", self._music_dir)
- cur_count = 0
- retries = 0
- while retries < 10:
- cur_count = sum(len(files) for _, _, files in os.walk(self._music_dir))
- if prev_count is not None and abs(prev_count - cur_count) > 10:
- self.logger.warning("Delaying sync....")
- await asyncio.sleep(60)
- else:
- break
- if prev_count is not None and abs(prev_count - cur_count) > 100:
- self.logger.warning(
- "Many file changes detected, a database resync may be needed to solve this."
- )
- await self.mass.cache.set(f"{self.id}.count", cur_count, self._music_dir)
- # find all music files in the music directory and all subfolders
- result = []
- for _root, _dirs, _files in os.walk(self._music_dir):
- for file in _files:
- filename = os.path.join(_root, file)
- checksum = self._get_checksum(filename)
- prov_item_id = self._get_item_id(filename)
- cache_track = cache_result.get(prov_item_id)
- # we do not want to parse tags if there are no changes to the file
- # so we speedup the sync by comparing a checksum
- if cache_track and cache_track["metadata"].get("checksum") == checksum:
- # checksum did not change, use cached track
- result.append(Track.from_dict(cache_track))
- elif track := await self._parse_track(filename):
- cache_result[prov_item_id] = track.to_dict()
- result.append(track)
- # store cache listing in cache
- await self.mass.cache.set(cache_key, cache_result, self._music_dir)
- return result
-
- async def get_library_playlists(self) -> List[Playlist]:
- """Retrieve playlists from disk."""
- if not self._playlists_dir:
- return []
- result = []
- cur_ids = set()
- for filename in os.listdir(self._playlists_dir):
- filepath = os.path.join(self._playlists_dir, filename)
- if (
- os.path.isfile(filepath)
- and not filename.startswith(".")
- and filename.lower().endswith(".m3u")
- ):
- playlist = await self._parse_playlist(filepath)
- if playlist:
- result.append(playlist)
- cur_ids.add(playlist.item_id)
- return result
+ try:
+ if track := await self._parse_track(entry.path, checksum):
+ # add/update track to db
+ await self.mass.music.tracks.add_db_item(track)
+ # process album
+ if track.album:
+ await self.mass.music.albums.add_db_item(track.album)
+ # process (album)artist
+ if track.album.artist:
+ await self.mass.music.artists.add_db_item(
+ track.album.artist
+ )
+ elif playlist := await self._parse_playlist(entry.path, checksum):
+ # add/update] playlist to db
+ await self.mass.music.playlists.add_db_item(playlist)
+ except Exception: # pylint: disable=broad-except
+ # we don't want the whole sync to crash on one file so we catch all exceptions here
+ self.logger.exception("Error processing %s", entry.path)
+
+ # save current checksum cache every 5 mins for large listings
+ checksums[entry.path] = checksum
+ if (time() - last_save) > 60:
+ await self.mass.cache.set(cache_key, checksums)
+ last_save = time()
+ # TODO: Handle deletions
+ await self.mass.cache.set(cache_key, checksums)
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
- if album_artist := next(
- (
- track.album.artist
- for track in await self.get_library_tracks(True)
- if track.album is not None
- and track.album.artist is not None
- and track.album.artist.item_id == prov_artist_id
- ),
- None,
- ):
- return album_artist
- # fallback to track_artist
- for track in await self.get_library_tracks(True):
- for artist in track.artists:
- if artist.item_id == prov_artist_id:
- return artist
- return None
+ itempath = await self.get_filepath(prov_artist_id)
+ return await self._parse_artist(itempath)
async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
- return next(
- (
- track.album
- for track in await self.get_library_tracks(True)
- if track.album is not None and track.album.item_id == prov_album_id
- ),
- None,
- )
+ itempath = await self.get_filepath(prov_album_id)
+ return await self._parse_album(itempath)
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
- itempath = self._get_filename(prov_track_id)
- if not os.path.isfile(itempath):
- raise MediaNotFoundError(f"Track path does not exist: {itempath}")
+ itempath = await self.get_filepath(prov_track_id)
return await self._parse_track(itempath)
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
- itempath = self._get_filename(prov_playlist_id)
- if not os.path.isfile(itempath):
- raise MediaNotFoundError(f"playlist path does not exist: {itempath}")
+ itempath = await self.get_filepath(prov_playlist_id)
return await self._parse_playlist(itempath)
- async def get_album_tracks(self, prov_album_id) -> List[Track]:
+ async def get_album_tracks(self, prov_album_id: str) -> List[Track]:
"""Get album tracks for given album id."""
- return [
- track
- for track in await self.get_library_tracks(True)
- if track.album is not None and track.album.item_id == prov_album_id
- ]
+ itempath = await self.get_filepath(prov_album_id)
+ result = []
+ for entry in scantree(itempath):
+ # mtime is used as file checksum
+ checksum = str(entry.stat().st_mtime)
+ if track := await self._parse_track(entry.path, checksum):
+ result.append(track)
+ return result
async def get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]:
"""Get playlist tracks for given playlist id."""
result = []
- itempath = self._get_filename(prov_playlist_id)
- if not os.path.isfile(itempath):
+ itempath = await self.get_filepath(prov_playlist_id)
+ if not self.exists(itempath):
raise MediaNotFoundError(f"playlist path does not exist: {itempath}")
index = 0
- async with aiofiles.open(itempath, "r") as _file:
+ async with self.open_file(itempath, "r") as _file:
for line in await _file.readlines():
- line = line.strip()
+ line = urllib.parse.unquote(line.strip())
if line and not line.startswith("#"):
if track := await self._parse_track_from_uri(line):
+ track.position = index
result.append(track)
index += 1
return result
async def get_artist_albums(self, prov_artist_id: str) -> List[Album]:
"""Get a list of albums for the given artist."""
+ itempath = await self.get_filepath(prov_artist_id)
+ if not self.exists(itempath):
+ return await self.mass.music.artists.get_database_artist_albums(
+ prov_artist_id, self.type
+ )
result = []
- cur_ids = set()
- for track in await self.get_library_tracks(True):
- if track.album is None:
- continue
- if track.album.item_id in cur_ids:
- continue
- if track.album.artist is None:
- continue
- if track.album.artist.item_id != prov_artist_id:
- continue
- result.append(track.album)
- cur_ids.add(track.album.item_id)
+ for entry in os.scandir(itempath):
+ if entry.is_dir(follow_symlinks=False):
+ if album := await self._parse_album(entry.path):
+ result.append(album)
return result
async def get_artist_toptracks(self, prov_artist_id: str) -> List[Track]:
"""Get a list of all tracks as we have no clue about preference."""
- return [
- track
- for track in await self.get_library_tracks(True)
- if track.artists is not None
- and (
- (prov_artist_id in (x.item_id for x in track.artists))
- or (
- track.album is not None
- and track.album.artist is not None
- and track.album.artist.item_id == prov_artist_id
- )
+ itempath = await self.get_filepath(prov_artist_id)
+ if not self.exists(itempath):
+ return await self.mass.music.artists.get_database_artist_tracks(
+ prov_artist_id, self.type
)
- ]
+ result = []
+ for entry in scantree(self.config.path):
+ # mtime is used as file checksum
+ checksum = str(entry.stat().st_mtime)
+ if track := await self._parse_track(entry.path, checksum):
+ result.append(track)
+ return result
+
+ async def library_add(self, *args, **kwargs) -> bool:
+ """Add item to provider's library. Return true on succes."""
+ # already handled by database
+
+ async def library_remove(self, *args, **kwargs) -> bool:
+ """Remove item from provider's library. Return true on succes."""
+ # already handled by database
+ # TODO: do we want to process deletions here ?
+
+ async def add_playlist_tracks(
+ self, prov_playlist_id: str, prov_track_ids: List[str]
+ ) -> None:
+ """Add track(s) to playlist."""
+ itempath = await self.get_filepath(prov_playlist_id)
+ if not self.exists(itempath):
+ raise MediaNotFoundError(f"Playlist path does not exist: {itempath}")
+ async with self.open_file(itempath, "a") as _file:
+ for uri in prov_track_ids:
+ await _file.writeline(uri)
+
+ async def remove_playlist_tracks(
+ self, prov_playlist_id: str, prov_track_ids: List[str]
+ ) -> None:
+ """Remove track(s) from playlist."""
+ # TODO !
+ if MediaType.PLAYLIST in self.supported_mediatypes:
+ raise NotImplementedError
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- itempath = self._get_filename(item_id)
- if not os.path.isfile(itempath):
+ itempath = await self.get_filepath(item_id)
+ if not self.exists(itempath):
raise MediaNotFoundError(f"Track path does not exist: {itempath}")
def parse_tag():
return StreamDetails(
type=StreamType.FILE,
- provider=self.id,
+ provider=self.type,
item_id=item_id,
content_type=ContentType(itempath.split(".")[-1]),
path=itempath,
bit_depth=16, # TODO: parse bitdepth
)
- async def get_embedded_image(self, filename: str) -> str | None:
- """Return the embedded image of an audio file as base64 string."""
- if not TinyTag.is_supported(filename):
- return None
+ async def _parse_track(
+ self, track_path: str, checksum: Optional[str] = None
+ ) -> Track | None:
+ """Try to parse a track from a filename by reading its tags."""
+ if self.config.path not in track_path:
+ track_path = os.path.join(self.config.path, track_path)
+ track_path_base = self._get_relative_path(track_path)
+ track_item_id = self._get_item_id(track_path_base)
- def parse_tags():
- return TinyTag.get(filename, tags=True, image=True, ignore_errors=True)
+ if not self.exists(track_path):
+ raise MediaNotFoundError(f"Track path does not exist: {track_path}")
- tags = await self.mass.loop.run_in_executor(None, parse_tags)
- if image_data := tags.get_image():
- enc_image = base64.b64encode(image_data).decode()
- enc_image = f"data:image/png;base64,{enc_image}"
- return enc_image
+ # reading file/tags is slow so we keep a cache and checksum
+ checksum = checksum or self._get_checksum(track_path)
+ cache_key = f"{self.id}_tracks_{track_item_id}"
+ if cache := await self.mass.cache.get(cache_key, checksum):
+ return Track.from_dict(cache)
- async def _parse_track(self, filename: str) -> Track | None:
- """Try to parse a track from a filename by reading its tags."""
- if not TinyTag.is_supported(filename):
+ if not TinyTag.is_supported(track_path):
return None
- def parse_tags():
- return TinyTag.get(filename, image=True, ignore_errors=True)
-
# parse ID3 tags with TinyTag
- try:
- tags = await self.mass.loop.run_in_executor(None, parse_tags)
- except TinyTagException as err:
- self.logger.error("Error processing %s: %s", filename, str(err))
- return None
-
- prov_item_id = self._get_item_id(filename)
+ def parse_tags():
+ return TinyTag.get(track_path, image=True, ignore_errors=True)
- # work out if we have an artist/album/track.ext structure
- filename_base = filename.replace(self._music_dir, "")
- if filename_base.startswith(os.sep):
- filename_base = filename_base[1:]
- track_parts = filename_base.rsplit(os.sep)
- if len(track_parts) == 3:
- album_artist_name = track_parts[0]
- album_name = track_parts[1]
- else:
- album_artist_name = tags.albumartist
- album_name = tags.album
+ tags = await self.mass.loop.run_in_executor(None, parse_tags)
# prefer title from tag, fallback to filename
if tags.title:
track_title = tags.title
else:
- ext = filename_base.split(".")[-1]
- track_title = filename_base.replace(f".{ext}", "").replace("_", " ")
+
+ ext = track_path_base.split(".")[-1]
+ track_title = track_path_base.replace(f".{ext}", "").replace("_", " ")
self.logger.warning(
- "%s is missing ID3 tags, use filename as fallback", filename_base
+ "%s is missing ID3 tags, use filename as fallback", track_path_base
)
name, version = parse_title_and_version(track_title)
track = Track(
- item_id=prov_item_id, provider=self.id, name=name, version=version
+ item_id=track_item_id,
+ provider=self.type,
+ name=name,
+ version=version,
+ # a track on disk is always in library
+ in_library=True,
)
+ # work out if we have an artist/album/track.ext structure
+ track_parts = track_path_base.rsplit(os.sep)
+ if len(track_parts) == 3:
+ album_path = os.path.dirname(track_path)
+ artist_path = os.path.dirname(album_path)
+ album_artist = await self._parse_artist(artist_path, True)
+ track.album = await self._parse_album(album_path, album_artist, True)
+
+ if track.album is None and tags.album:
+ # no artist/album structure found, create a basic album object instead
+ if tags.albumartist:
+ album_path = f"{tags.albumartist}/{tags.album}"
+ album_artist = await self._parse_artist(tags.albumartist)
+ else:
+ album_path = tags.album
+ album_artist = None
+ track.album = await self._parse_album(album_path, album_artist)
+
+ # try to guess the album type
+ if track.album:
+ if name.lower() == track.album.name.lower():
+ track.album.album_type = AlbumType.SINGLE
+ elif track.album.artist not in (x.name for x in track.artists):
+ track.album.album_type = AlbumType.COMPILATION
+ else:
+ track.album.album_type = AlbumType.ALBUM
+
# Parse track artist(s) from artist string using common splitters used in ID3 tags
# NOTE: do not use a '/' or '&' to prevent artists like AC/DC become messed up
- track_artists_str = tags.artist or album_artist_name or FALLBACK_ARTIST
+ track_artists_str = tags.artist or FALLBACK_ARTIST
track.artists = [
- Artist(
- item_id=item,
- provider=self._attr_id,
- name=item,
- )
+ await self._parse_artist(item)
for item in split_items(track_artists_str, ARTIST_SPLITTERS)
]
# Check if track has embedded metadata
- if tags.get_image():
+ img = await self.mass.loop.run_in_executor(None, tags.get_image)
+ if not track.metadata.images and img:
# we do not actually embed the image in the metadata because that would consume too
# much space and bandwidth. Instead we set the filename as value so the image can
# be retrieved later in realtime.
- track.metadata.images = {MediaItemImage(ImageType.EMBEDDED_THUMB, filename)}
-
- # Parse album (only if we have album + album artist tags)
- if album_name and album_artist_name:
- album_id = album_name
- album_name, album_version = parse_title_and_version(album_name)
- track.album = Album(
- item_id=album_id,
- provider=self._attr_id,
- name=album_name,
- version=album_version,
- year=try_parse_int(tags.year) if tags.year else None,
- artist=Artist(
- item_id=album_artist_name,
- provider=self._attr_id,
- name=album_artist_name,
- ),
- )
- track.album.metadata.images = track.metadata.images
-
- # try to guess the album type
- if name.lower() == album_name.lower():
- track.album.album_type = AlbumType.SINGLE
- elif album_artist_name not in (x.name for x in track.artists):
- track.album.album_type = AlbumType.COMPILATION
- else:
- track.album.album_type = AlbumType.ALBUM
+ track.metadata.images = [MediaItemImage(ImageType.THUMB, track_path, True)]
+ if track.album and not track.album.metadata.images:
+ track.album.metadata.images = track.metadata.images
# parse other info
track.duration = tags.duration
if "lyrics" in tags.extra:
track.metadata.lyrics = tags.extra["lyrics"]
# store last modified time as checksum
- track.metadata.checksum = self._get_checksum(filename)
+ track.metadata.checksum = checksum
quality_details = ""
- if filename.endswith(".flac"):
+ if track_path.endswith(".flac"):
# TODO: get bit depth
quality = MediaQuality.FLAC_LOSSLESS
if tags.samplerate > 192000:
elif tags.samplerate > 48000:
quality = MediaQuality.FLAC_LOSSLESS_HI_RES_2
quality_details = f"{tags.samplerate / 1000} Khz"
- elif filename.endswith(".ogg"):
+ elif track_path.endswith(".ogg"):
quality = MediaQuality.LOSSY_OGG
quality_details = f"{tags.bitrate} kbps"
- elif filename.endswith(".m4a"):
+ elif track_path.endswith(".m4a"):
quality = MediaQuality.LOSSY_AAC
quality_details = f"{tags.bitrate} kbps"
else:
quality_details = f"{tags.bitrate} kbps"
track.add_provider_id(
MediaItemProviderId(
- provider=self.id,
- item_id=prov_item_id,
+ item_id=track_item_id,
+ prov_type=self.type,
+ prov_id=self.id,
quality=quality,
details=quality_details,
- url=filename,
+ url=track_path_base,
)
)
+ await self.mass.cache.set(cache_key, track.to_dict(), checksum, 86400 * 365 * 5)
return track
- async def _parse_playlist(self, filename: str) -> Playlist | None:
+ async def _parse_artist(self, artist_path: str, skip_cache=False) -> Artist | None:
+ """Lookup metadata in Artist folder."""
+ if self.config.path not in artist_path:
+ artist_path = os.path.join(self.config.path, artist_path)
+ artist_path_base = self._get_relative_path(artist_path)
+ artist_item_id = self._get_item_id(artist_path_base)
+ name = artist_path.split(os.sep)[-1]
+
+ cache_key = f"{self.id}.artist.{artist_item_id}"
+ if not skip_cache:
+ if cache := await self.mass.cache.get(cache_key):
+ return Artist.from_dict(cache)
+
+ artist = Artist(
+ artist_item_id,
+ self.type,
+ name,
+ provider_ids={
+ MediaItemProviderId(
+ artist_item_id, self.type, self.id, url=artist_path_base
+ )
+ },
+ )
+
+ if not self.exists(artist_path):
+ # return basic object if there is no path on disk
+ # happens if disk structure does not conform
+ return artist
+
+ # mark artist as in-library when it exists as folder on disk
+ artist.in_library = True
+
+ nfo_file = os.path.join(artist_path, "artist.nfo")
+ if self.exists(nfo_file):
+ # found NFO file with metadata
+ # https://kodi.wiki/view/NFO_files/Artists
+ async with self.open_file(nfo_file, "r") as _file:
+ data = await _file.read()
+ info = await self.mass.loop.run_in_executor(None, xmltodict.parse, data)
+ info = info["artist"]
+ artist.name = info.get("title", info.get("name", name))
+ if sort_name := info.get("sortname"):
+ artist.sort_name = sort_name
+ if musicbrainz_id := info.get("musicbrainzartistid"):
+ artist.musicbrainz_id = musicbrainz_id
+ if descripton := info.get("biography"):
+ artist.metadata.description = descripton
+ if genre := info.get("genre"):
+ artist.metadata.genres = set(split_items(genre))
+ if not artist.musicbrainz_id:
+ for uid in info.get("uniqueid", []):
+ if uid["@type"] == "MusicBrainzArtist":
+ artist.musicbrainz_id = uid["#text"]
+ # find local images
+ images = []
+ for _filename in os.listdir(artist_path):
+ ext = _filename.split(".")[-1]
+ if ext not in ("jpg", "png"):
+ continue
+ _filepath = os.path.join(artist_path, _filename)
+ for img_type in ImageType:
+ if img_type.value in _filepath:
+ images.append(MediaItemImage(img_type, _filepath, True))
+ elif _filename == "folder.jpg":
+ images.append(MediaItemImage(ImageType.THUMB, _filepath, True))
+ if images:
+ artist.metadata.images = images
+
+ await self.mass.cache.set(cache_key, artist.to_dict())
+ return artist
+
+ async def _parse_album(
+ self, album_path: str, artist: Optional[Artist] = None, skip_cache=False
+ ) -> Album | None:
+ """Lookup metadata in Album folder."""
+ if self.config.path not in album_path:
+ album_path = os.path.join(self.config.path, album_path)
+ album_path_base = self._get_relative_path(album_path)
+ album_item_id = self._get_item_id(album_path_base)
+ name = album_path.split(os.sep)[-1]
+
+ cache_key = f"{self.id}.album.{album_item_id}"
+ if not skip_cache:
+ if cache := await self.mass.cache.get(cache_key):
+ return Album.from_dict(cache)
+
+ album = Album(
+ album_item_id,
+ self.type,
+ name,
+ artist=artist,
+ provider_ids={
+ MediaItemProviderId(
+ album_item_id, self.type, self.id, url=album_path_base
+ )
+ },
+ )
+
+ if not self.exists(album_path):
+ # return basic object if there is no path on disk
+ # happens if disk structure does not conform
+ return album
+
+ # mark album as in-library when it exists as folder on disk
+ album.in_library = True
+
+ nfo_file = os.path.join(album_path, "album.nfo")
+ if self.exists(nfo_file):
+ # found NFO file with metadata
+ # https://kodi.wiki/view/NFO_files/Artists
+ async with self.open_file(nfo_file) as _file:
+ data = await _file.read()
+ info = await self.mass.loop.run_in_executor(None, xmltodict.parse, data)
+ info = info["album"]
+ album.name = info.get("title", info.get("name", name))
+ if sort_name := info.get("sortname"):
+ album.sort_name = sort_name
+ if musicbrainz_id := info.get("musicbrainzreleasegroupid"):
+ album.musicbrainz_id = musicbrainz_id
+ if description := info.get("review"):
+ album.metadata.description = description
+ if year := info.get("label"):
+ album.year = int(year)
+ if genre := info.get("genre"):
+ album.metadata.genres = set(split_items(genre))
+ for uid in info.get("uniqueid", []):
+ if uid["@type"] == "MusicBrainzReleaseGroup":
+ if not album.musicbrainz_id:
+ album.musicbrainz_id = uid["#text"]
+ if uid["@type"] == "MusicBrainzAlbumArtist":
+ if album.artist and not album.artist.musicbrainz_id:
+ album.artist.musicbrainz_id = uid["#text"]
+ # parse name/version
+ album.name, album.version = parse_title_and_version(album.name)
+ # find local images
+ images = []
+ for _filename in os.listdir(album_path):
+ ext = _filename.split(".")[-1]
+ if ext not in ("jpg", "png"):
+ continue
+ _filepath = os.path.join(album_path, _filename)
+ for img_type in ImageType:
+ if img_type.value in _filepath:
+ images.append(MediaItemImage(img_type, _filepath, True))
+ elif _filename == "folder.jpg":
+ images.append(MediaItemImage(ImageType.THUMB, _filepath, True))
+ if images:
+ album.metadata.images = images
+
+ await self.mass.cache.set(cache_key, album.to_dict())
+ return album
+
+ async def _parse_playlist(
+ self, playlist_path: str, checksum: Optional[str] = None
+ ) -> Playlist | None:
"""Parse playlist from file."""
- # use the relative filename as item_id
- filename_base = filename.replace(self._music_dir, "")
- if filename_base.startswith(os.sep):
- filename_base = filename_base[1:]
- prov_item_id = filename_base
+ if self.config.path not in playlist_path:
+ playlist_path = os.path.join(self.config.path, playlist_path)
+ playlist_path_base = self._get_relative_path(playlist_path)
+ playlist_item_id = self._get_item_id(playlist_path_base)
+ checksum = checksum or self._get_checksum(playlist_path)
+
+ if not playlist_path.endswith(".m3u"):
+ return None
- name = filename.split(os.sep)[-1].replace(".m3u", "")
+ if not self.exists(playlist_path):
+ raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}")
- playlist = Playlist(prov_item_id, provider=self.id, name=name)
+ name = playlist_path_base.split(os.sep)[-1].replace(".m3u", "")
+
+ playlist = Playlist(playlist_item_id, provider=self.type, name=name)
playlist.is_editable = True
+ playlist.in_library = True
playlist.add_provider_id(
- MediaItemProviderId(provider=self.id, item_id=prov_item_id, url=filename)
+ MediaItemProviderId(
+ item_id=playlist_item_id,
+ prov_type=self.type,
+ prov_id=self.id,
+ url=playlist_path_base,
+ )
)
playlist.owner = self._attr_name
- playlist.metadata.checksum = self._get_checksum(filename)
+ playlist.metadata.checksum = checksum
return playlist
async def _parse_track_from_uri(self, uri):
except MediaNotFoundError:
return None
- def _get_filename(self, item_id: str, playlist: bool = False) -> str:
- """Get filename for item_id."""
- if self._music_dir in item_id:
- return item_id
- if playlist:
- return os.path.join(self._playlists_dir, item_id)
- return os.path.join(self._music_dir, item_id)
-
- def _get_item_id(self, filename: str, playlist: bool = False) -> str:
- """Return item_id for given filename."""
- # we simply use the base filename as item_id
- base_path = self._playlists_dir if playlist else self._music_dir
- filename_base = filename.replace(base_path, "")
- if filename_base.startswith(os.sep):
- filename_base = filename_base[1:]
- return filename_base
+ def exists(self, file_path: str) -> bool:
+ """Return bool is this FileSystem musicprovider has given file/dir."""
+ # ensure we have a full path and not relative
+ if self.config.path not in file_path:
+ file_path = os.path.join(self.config.path, file_path)
+ return os.path.isfile(file_path) or os.path.isdir(file_path)
+
+ @asynccontextmanager
+ async def open_file(self, file_path: str, mode="rb") -> AsyncFileIO:
+ """Return (async) handle to given file."""
+ # ensure we have a full path and not relative
+ if self.config.path not in file_path:
+ file_path = os.path.join(self.config.path, file_path)
+ # remote file locations should return a tempfile here ?
+ async with aiofiles.open(file_path, mode) as _file:
+ yield _file
+
+ async def get_filepath(self, item_id: str) -> str | None:
+ """Get full filepath on disk for item_id."""
+ file_path = await self.mass.music.get_provider_mapping(
+ provider_id=self.id, provider_item_id=item_id, return_key="url"
+ )
+ if file_path is not None:
+ # ensure we have a full path and not relative
+ if self.config.path not in file_path:
+ file_path = os.path.join(self.config.path, file_path)
+ return file_path
+ return None
+
+ def _get_relative_path(self, filename: str) -> str:
+ """Get relative path for filename (without the base dir)."""
+ filename = filename.replace(self.config.path, "")
+ if filename.startswith(os.sep):
+ filename = filename[1:]
+ if filename.endswith(os.sep):
+ filename = filename[:-1]
+ return filename
+
+ def _get_item_id(self, filename: str) -> str:
+ """Create item id from filename."""
+ return create_clean_string(self._get_relative_path(filename))
@staticmethod
def _get_checksum(filename: str) -> str:
import hashlib
import time
from json import JSONDecodeError
-from typing import List, Optional
+from typing import AsyncGenerator, List, Optional
import aiohttp
from asyncio_throttle import Throttler
)
from music_assistant.helpers.cache import use_cache
from music_assistant.helpers.util import parse_title_and_version, try_parse_int
-from music_assistant.models.enums import EventType
+from music_assistant.models.enums import EventType, ProviderType
from music_assistant.models.errors import LoginFailed
from music_assistant.models.event import MassEvent
from music_assistant.models.media_items import (
class QobuzProvider(MusicProvider):
"""Provider for the Qobux music service."""
- _attr_id = "qobuz"
+ _attr_type = ProviderType.QOBUZ
_attr_name = "Qobuz"
_attr_supported_mediatypes = [
MediaType.ARTIST,
async def setup(self) -> bool:
"""Handle async initialization of the provider."""
- if not self.mass.config.qobuz_enabled:
+ if not self.config.enabled:
return False
- if not self.mass.config.qobuz_username or not self.mass.config.qobuz_password:
+ if not self.config.username or not self.config.password:
raise LoginFailed("Invalid login credentials")
# try to get a token, raise if that fails
token = await self._auth_token()
if not token:
- raise LoginFailed(
- f"Login failed for user {self.mass.config.qobuz_username}"
- )
+ raise LoginFailed(f"Login failed for user {self.config.username}")
# subscribe to stream events so we can report playback to Qobuz
self.mass.subscribe(
self.on_stream_event,
(EventType.STREAM_STARTED, EventType.STREAM_ENDED),
- id_filter=self.id,
+ id_filter=self.type.value,
)
return True
]
return result
- async def get_library_artists(self) -> List[Artist]:
+ async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve all library artists from Qobuz."""
endpoint = "favorite/getUserFavorites"
- return [
- await self._parse_artist(item)
- for item in await self._get_all_items(
- endpoint, key="artists", type="artists"
- )
- if (item and item["id"])
- ]
+ for item in await self._get_all_items(endpoint, key="artists", type="artists"):
+ if item and item["id"]:
+ yield await self._parse_artist(item)
- async def get_library_albums(self) -> List[Album]:
+ async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve all library albums from Qobuz."""
endpoint = "favorite/getUserFavorites"
- return [
- await self._parse_album(item)
- for item in await self._get_all_items(endpoint, key="albums", type="albums")
- if (item and item["id"])
- ]
+ for item in await self._get_all_items(endpoint, key="albums", type="albums"):
+ if item and item["id"]:
+ yield await self._parse_album(item)
- async def get_library_tracks(self) -> List[Track]:
+ async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from Qobuz."""
endpoint = "favorite/getUserFavorites"
- return [
- await self._parse_track(item)
- for item in await self._get_all_items(endpoint, key="tracks", type="tracks")
- if (item and item["id"])
- ]
+ for item in await self._get_all_items(endpoint, key="tracks", type="tracks"):
+ if item and item["id"]:
+ yield await self._parse_track(item)
- async def get_library_playlists(self) -> List[Playlist]:
+ async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Retrieve all library playlists from the provider."""
endpoint = "playlist/getUserPlaylists"
- return [
- await self._parse_playlist(item)
- for item in await self._get_all_items(endpoint, key="playlists")
- if (item and item["id"])
- ]
+ for item in await self._get_all_items(endpoint, key="playlists"):
+ if item and item["id"]:
+ yield await self._parse_playlist(item)
async def get_artist(self, prov_artist_id) -> Artist:
"""Get full artist details by id."""
return StreamDetails(
type=StreamType.URL,
item_id=str(item_id),
- provider=self.id,
+ provider=self.type,
path=streamdata["url"],
content_type=content_type,
sample_rate=int(streamdata["sampling_rate"] * 1000),
async def _parse_artist(self, artist_obj: dict):
"""Parse qobuz artist object to generic layout."""
artist = Artist(
- item_id=str(artist_obj["id"]), provider=self.id, name=artist_obj["name"]
+ item_id=str(artist_obj["id"]), provider=self.type, name=artist_obj["name"]
)
artist.add_provider_id(
MediaItemProviderId(
- provider=self.id,
item_id=str(artist_obj["id"]),
+ prov_type=self.type,
+ prov_id=self.id,
url=artist_obj.get(
"url", f'https://open.qobuz.com/artist/{artist_obj["id"]}'
),
)
)
if img := self.__get_image(artist_obj):
- artist.metadata.images = {MediaItemImage(ImageType.THUMB, img)}
+ artist.metadata.images = [MediaItemImage(ImageType.THUMB, img)]
if artist_obj.get("biography"):
artist.metadata.description = artist_obj["biography"].get("content")
return artist
album_obj["title"], album_obj.get("version")
)
album = Album(
- item_id=str(album_obj["id"]), provider=self.id, name=name, version=version
+ item_id=str(album_obj["id"]), provider=self.type, name=name, version=version
)
if album_obj["maximum_sampling_rate"] > 192:
quality = MediaQuality.FLAC_LOSSLESS_HI_RES_4
quality = MediaQuality.FLAC_LOSSLESS
album.add_provider_id(
MediaItemProviderId(
- provider=self.id,
item_id=str(album_obj["id"]),
+ prov_type=self.type,
+ prov_id=self.id,
quality=quality,
url=album_obj.get(
"url", f'https://open.qobuz.com/album/{album_obj["id"]}'
if "genre" in album_obj:
album.metadata.genres = {album_obj["genre"]["name"]}
if img := self.__get_image(album_obj):
- album.metadata.images = {MediaItemImage(ImageType.THUMB, img)}
+ album.metadata.images = [MediaItemImage(ImageType.THUMB, img)]
if len(album_obj["upc"]) == 13:
# qobuz writes ean as upc ?!
album.upc = album_obj["upc"][1:]
)
track = Track(
item_id=str(track_obj["id"]),
- provider=self.id,
+ provider=self.type,
name=name,
version=version,
disc_number=track_obj["media_number"],
track_number=track_obj["track_number"],
duration=track_obj["duration"],
+ position=track_obj.get("position"),
)
if track_obj.get("performer") and "Various " not in track_obj["performer"]:
artist = await self._parse_artist(track_obj["performer"])
role = performer_str.split(", ")[1]
name = performer_str.split(", ")[0]
if "artist" in role.lower():
- artist = Artist(name, self.id, name)
+ artist = Artist(name, self.type, name)
track.artists.append(artist)
# TODO: fix grabbing composer from details
if track_obj.get("parental_warning"):
track.metadata.explicit = True
if img := self.__get_image(track_obj):
- track.metadata.images = {MediaItemImage(ImageType.THUMB, img)}
+ track.metadata.images = [MediaItemImage(ImageType.THUMB, img)]
# get track quality
if track_obj["maximum_sampling_rate"] > 192:
quality = MediaQuality.FLAC_LOSSLESS_HI_RES_4
quality = MediaQuality.FLAC_LOSSLESS
track.add_provider_id(
MediaItemProviderId(
- provider=self.id,
item_id=str(track_obj["id"]),
+ prov_type=self.type,
+ prov_id=self.id,
quality=quality,
url=track_obj.get(
"url", f'https://open.qobuz.com/track/{track_obj["id"]}'
"""Parse qobuz playlist object to generic layout."""
playlist = Playlist(
item_id=str(playlist_obj["id"]),
- provider=self.id,
+ provider=self.type,
name=playlist_obj["name"],
owner=playlist_obj["owner"]["name"],
)
playlist.add_provider_id(
MediaItemProviderId(
- provider=self.id,
item_id=str(playlist_obj["id"]),
+ prov_type=self.type,
+ prov_id=self.id,
url=playlist_obj.get(
"url", f'https://open.qobuz.com/playlist/{playlist_obj["id"]}'
),
or playlist_obj["is_collaborative"]
)
if img := self.__get_image(playlist_obj):
- playlist.metadata.images = {MediaItemImage(ImageType.THUMB, img)}
+ playlist.metadata.images = [MediaItemImage(ImageType.THUMB, img)]
playlist.metadata.checksum = str(playlist_obj["updated_at"])
return playlist
if self._user_auth_info:
return self._user_auth_info["user_auth_token"]
params = {
- "username": self.mass.config.qobuz_username,
- "password": self.mass.config.qobuz_password,
+ "username": self.config.username,
+ "password": self.config.password,
"device_manufacturer_id": "music_assistant",
}
details = await self._get_data("user/login", **params)
break
if not result.get(key) or not result[key].get("items"):
break
- all_items += result[key]["items"]
+ for item in result[key]["items"]:
+ item["position"] = len(all_items) + 1
+ all_items.append(item)
if len(result[key]["items"]) < limit:
break
return all_items
import time
from json.decoder import JSONDecodeError
from tempfile import gettempdir
-from typing import List, Optional
+from typing import AsyncGenerator, List, Optional
import aiohttp
from asyncio_throttle import Throttler
)
from music_assistant.helpers.cache import use_cache
from music_assistant.helpers.util import parse_title_and_version
+from music_assistant.models.enums import ProviderType
from music_assistant.models.errors import LoginFailed
from music_assistant.models.media_items import (
Album,
class SpotifyProvider(MusicProvider):
"""Implementation of a Spotify MusicProvider."""
- _attr_id = "spotify"
+ _attr_type = ProviderType.SPOTIFY
_attr_name = "Spotify"
_attr_supported_mediatypes = [
MediaType.ARTIST,
_sp_user = None
_librespot_bin = None
_throttler = Throttler(rate_limit=4, period=1)
+ _cache_dir = CACHE_DIR
async def setup(self) -> bool:
"""Handle async initialization of the provider."""
- if not self.mass.config.spotify_enabled:
+ if not self.config.enabled:
return False
- if (
- not self.mass.config.spotify_username
- or not self.mass.config.spotify_password
- ):
+ if not self.config.username or not self.config.password:
raise LoginFailed("Invalid login credentials")
# try to get a token, raise if that fails
+ self._cache_dir = os.path.join(CACHE_DIR, self.id)
token = await self.get_token()
if not token:
- raise LoginFailed(
- f"Login failed for user {self.mass.config.spotify_username}"
- )
+ raise LoginFailed(f"Login failed for user {self.config.username}")
return True
async def search(
]
return result
- async def get_library_artists(self) -> List[Artist]:
+ async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve library artists from spotify."""
spotify_artists = await self._get_data(
"me/following", type="artist", limit=50, skip_cache=True
)
- return [
- await self._parse_artist(item)
- for item in spotify_artists["artists"]["items"]
- if (item and item["id"])
- ]
+ for item in spotify_artists["artists"]["items"]:
+ if item and item["id"]:
+ yield await self._parse_artist(item)
- async def get_library_albums(self) -> List[Album]:
+ async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve library albums from the provider."""
- return [
- await self._parse_album(item["album"])
- for item in await self._get_all_items("me/albums", skip_cache=True)
- if (item["album"] and item["album"]["id"])
- ]
+ for item in await self._get_all_items("me/albums", skip_cache=True):
+ if item["album"] and item["album"]["id"]:
+ yield await self._parse_album(item["album"])
- async def get_library_tracks(self) -> List[Track]:
+ async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from the provider."""
- return [
- await self._parse_track(item["track"])
- for item in await self._get_all_items("me/tracks", skip_cache=True)
- if (item and item["track"]["id"])
- ]
+ for item in await self._get_all_items("me/tracks", skip_cache=True):
+ if item and item["track"]["id"]:
+ yield await self._parse_track(item["track"])
- async def get_library_playlists(self) -> List[Playlist]:
+ async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Retrieve playlists from the provider."""
- return [
- await self._parse_playlist(item)
- for item in await self._get_all_items("me/playlists", skip_cache=True)
- if (item and item["id"])
- ]
+ for item in await self._get_all_items("me/playlists", skip_cache=True):
+ if item and item["id"]:
+ yield await self._parse_playlist(item)
async def get_artist(self, prov_artist_id) -> Artist:
"""Get full artist details by id."""
# make sure that the token is still valid by just requesting it
await self.get_token()
librespot = await self.get_librespot_binary()
- librespot_exec = f'{librespot} -c "{CACHE_DIR}" --pass-through -b 320 --single-track spotify://track:{track.item_id}'
+ librespot_exec = f'{librespot} -c "{self._cache_dir}" --pass-through -b 320 --single-track spotify://track:{track.item_id}'
return StreamDetails(
type=StreamType.EXECUTABLE,
item_id=track.item_id,
- provider=self.id,
+ provider=self.type,
path=librespot_exec,
content_type=ContentType.OGG,
sample_rate=44100,
async def _parse_artist(self, artist_obj):
"""Parse spotify artist object to generic layout."""
artist = Artist(
- item_id=artist_obj["id"], provider=self.id, name=artist_obj["name"]
+ item_id=artist_obj["id"], provider=self.type, name=artist_obj["name"]
)
artist.add_provider_id(
MediaItemProviderId(
- provider=self.id,
item_id=artist_obj["id"],
+ prov_type=self.type,
+ prov_id=self.id,
url=artist_obj["external_urls"]["spotify"],
)
)
for img in artist_obj["images"]:
img_url = img["url"]
if "2a96cbd8b46e442fc41c2b86b821562f" not in img_url:
- artist.metadata.images = {MediaItemImage(ImageType.THUMB, img_url)}
+ artist.metadata.images = [MediaItemImage(ImageType.THUMB, img_url)]
break
return artist
"""Parse spotify album object to generic layout."""
name, version = parse_title_and_version(album_obj["name"])
album = Album(
- item_id=album_obj["id"], provider=self.id, name=name, version=version
+ item_id=album_obj["id"], provider=self.type, name=name, version=version
)
for artist in album_obj["artists"]:
album.artist = await self._parse_artist(artist)
if "genres" in album_obj:
album.metadata.genre = set(album_obj["genres"])
if album_obj.get("images"):
- album.metadata.images = {
+ album.metadata.images = [
MediaItemImage(ImageType.THUMB, album_obj["images"][0]["url"])
- }
+ ]
if "external_ids" in album_obj and album_obj["external_ids"].get("upc"):
album.upc = album_obj["external_ids"]["upc"]
if "label" in album_obj:
album.metadata.explicit = album_obj["explicit"]
album.add_provider_id(
MediaItemProviderId(
- provider=self.id,
item_id=album_obj["id"],
+ prov_type=self.type,
+ prov_id=self.id,
quality=MediaQuality.LOSSY_OGG,
url=album_obj["external_urls"]["spotify"],
)
name, version = parse_title_and_version(track_obj["name"])
track = Track(
item_id=track_obj["id"],
- provider=self.id,
+ provider=self.type,
name=name,
version=version,
duration=track_obj["duration_ms"] / 1000,
disc_number=track_obj["disc_number"],
track_number=track_obj["track_number"],
+ position=track_obj.get("position"),
)
if artist:
track.artists.append(artist)
if "album" in track_obj:
track.album = await self._parse_album(track_obj["album"])
if track_obj["album"].get("images"):
- track.metadata.images = {
+ track.metadata.images = [
MediaItemImage(
ImageType.THUMB, track_obj["album"]["images"][0]["url"]
)
- }
+ ]
if track_obj.get("copyright"):
track.metadata.copyright = track_obj["copyright"]
if track_obj.get("explicit"):
track.metadata.popularity = track_obj["popularity"]
track.add_provider_id(
MediaItemProviderId(
- provider=self.id,
item_id=track_obj["id"],
+ prov_type=self.type,
+ prov_id=self.id,
quality=MediaQuality.LOSSY_OGG,
url=track_obj["external_urls"]["spotify"],
available=not track_obj["is_local"] and track_obj["is_playable"],
"""Parse spotify playlist object to generic layout."""
playlist = Playlist(
item_id=playlist_obj["id"],
- provider=self.id,
+ provider=self.type,
name=playlist_obj["name"],
owner=playlist_obj["owner"]["display_name"],
)
playlist.add_provider_id(
MediaItemProviderId(
- provider=self.id,
item_id=playlist_obj["id"],
+ prov_type=self.type,
+ prov_id=self.id,
url=playlist_obj["external_urls"]["spotify"],
)
)
or playlist_obj["collaborative"]
)
if playlist_obj.get("images"):
- playlist.metadata.images = {
+ playlist.metadata.images = [
MediaItemImage(ImageType.THUMB, playlist_obj["images"][0]["url"])
- }
+ ]
playlist.metadata.checksum = str(playlist_obj["snapshot_id"])
return playlist
# return existing token if we have one in memory
if (
self._auth_token
- and os.path.isdir(CACHE_DIR)
+ and os.path.isdir(self._cache_dir)
and (self._auth_token["expiresAt"] > int(time.time()) + 20)
):
return self._auth_token
tokeninfo = {}
- if (
- not self.mass.config.spotify_username
- or not self.mass.config.spotify_password
- ):
+ if not self.config.username or not self.config.password:
return tokeninfo
# retrieve token with librespot
tokeninfo = await self._get_token()
)
self._auth_token = tokeninfo
else:
- self.logger.error(
- "Login failed for user %s", self.mass.config.spotify_username
- )
+ self.logger.error("Login failed for user %s", self.config.username)
return tokeninfo
async def _get_token(self):
await self.get_librespot_binary(),
"-O",
"-c",
- CACHE_DIR,
+ self._cache_dir,
"-a",
"-u",
- self.mass.config.spotify_username,
+ self.config.username,
"-p",
- self.mass.config.spotify_password,
+ self.config.password,
]
librespot = await asyncio.create_subprocess_exec(*args)
await librespot.wait()
"--scope",
scope,
"-c",
- CACHE_DIR,
+ self._cache_dir,
]
librespot = await asyncio.create_subprocess_exec(
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
return None
@use_cache(3600 * 24)
- async def _get_all_items(self, endpoint, key="items", **kwargs):
+ async def _get_all_items(self, endpoint, key="items", **kwargs) -> List[dict]:
"""Get all items from a paged list."""
limit = 50
offset = 0
offset += limit
if not result or key not in result or not result[key]:
break
- all_items += result[key]
+ for item in result[key]:
+ item["position"] = len(all_items) + 1
+ all_items.append(item)
if len(result[key]) < limit:
break
return all_items
"""Tune-In musicprovider support for MusicAssistant."""
from __future__ import annotations
-from typing import List, Optional
+from typing import AsyncGenerator, List, Optional
from asyncio_throttle import Throttler
from music_assistant.helpers.cache import use_cache
-from music_assistant.helpers.util import create_sort_name
+from music_assistant.helpers.util import create_clean_string
+from music_assistant.models.enums import ProviderType
from music_assistant.models.errors import LoginFailed
from music_assistant.models.media_items import (
ContentType,
class TuneInProvider(MusicProvider):
"""Provider implementation for Tune In."""
- _attr_id = "tunein"
+ _attr_type = ProviderType.TUNEIN
_attr_name = "Tune-in Radio"
_attr_supported_mediatypes = [MediaType.RADIO]
_throttler = Throttler(rate_limit=1, period=1)
async def setup(self) -> bool:
"""Handle async initialization of the provider."""
- if not self.mass.config.tunein_enabled:
+ if not self.config.enabled:
return False
- if not self.mass.config.tunein_username:
+ if not self.config.username:
raise LoginFailed("Username is invalid")
- if "@" in self.mass.config.tunein_username:
- raise LoginFailed("You must provide the TuneIn username, not email")
return True
async def search(
:param media_types: A list of media_types to include. All types if None.
:param limit: Number of items to return in the search (per type).
"""
- result = []
+ # pylint: disable=no-self-use
# TODO: search for radio stations
- return result
+ return []
- async def get_library_radios(self) -> List[Radio]:
+ async def get_library_radios(self) -> AsyncGenerator[Radio, None]:
"""Retrieve library/subscribed radio stations from the provider."""
- async def parse_items(items: List[dict], folder: str = None) -> List[Radio]:
- result = []
+ async def parse_items(
+ items: List[dict], folder: str = None
+ ) -> AsyncGenerator[Radio, None]:
for item in items:
item_type = item.get("type", "")
if item_type == "audio":
"Tune.ashx", id=item["preset_id"]
)
for stream in stream_info["body"]:
- result.append(await self._parse_radio(item, stream, folder))
+ yield await self._parse_radio(item, stream, folder)
elif item_type == "link":
# stations are in sublevel (new style)
if sublevel := await self.__get_data(item["URL"], render="json"):
- result += await parse_items(sublevel["body"], item["text"])
+ async for subitem in parse_items(
+ sublevel["body"], item["text"]
+ ):
+ yield subitem
elif item.get("children"):
# stations are in sublevel (old style ?)
- result += await parse_items(item["children"], item["text"])
- return result
+ async for subitem in parse_items(item["children"], item["text"]):
+ yield subitem
data = await self.__get_data("Browse.ashx", c="presets")
if data and "body" in data:
- return await parse_items(data["body"])
- return []
+ async for item in parse_items(data["body"]):
+ yield item
async def get_radio(self, prov_radio_id: str) -> Radio:
"""Get radio station details."""
name = name.split(" | ")[1]
name = name.split(" (")[0]
item_id = f'{details["preset_id"]}--{stream["media_type"]}'
- radio = Radio(item_id=item_id, provider=self.id, name=name)
+ radio = Radio(item_id=item_id, provider=self.type, name=name)
if stream["media_type"] == "aac":
quality = MediaQuality.LOSSY_AAC
elif stream["media_type"] == "ogg":
quality = MediaQuality.LOSSY_MP3
radio.add_provider_id(
MediaItemProviderId(
- provider=self.id,
item_id=item_id,
+ prov_type=self.type,
+ prov_id=self.id,
quality=quality,
details=stream["url"],
)
radio.sort_name = f'{folder}-{details["preset_number"]}'
elif preset_number:
radio.sort_name = details["preset_number"]
- radio.sort_name += create_sort_name(name)
+ radio.sort_name += create_clean_string(name)
if "text" in details:
radio.metadata.description = details["text"]
# images
if img := details.get("image"):
- radio.metadata.images = {MediaItemImage(ImageType.THUMB, img)}
+ radio.metadata.images = [MediaItemImage(ImageType.THUMB, img)]
if img := details.get("logo"):
- radio.metadata.images = {MediaItemImage(ImageType.LOGO, img)}
+ radio.metadata.images = [MediaItemImage(ImageType.LOGO, img)]
return radio
async def get_stream_details(self, item_id: str) -> StreamDetails:
return StreamDetails(
type=StreamType.URL,
item_id=item_id,
- provider=self.id,
+ provider=self.type,
path=stream["url"],
content_type=ContentType(stream["media_type"]),
sample_rate=44100,
else:
url = f"https://opml.radiotime.com/{endpoint}"
kwargs["formats"] = "ogg,aac,wma,mp3"
- kwargs["username"] = self.mass.config.tunein_username
+ kwargs["username"] = self.config.username
kwargs["partnerId"] = "1"
kwargs["render"] = "json"
async with self._throttler:
from music_assistant.helpers.database import TABLE_RADIOS
from music_assistant.helpers.json import json_serializer
-from music_assistant.helpers.util import create_sort_name
from music_assistant.models.enums import EventType, MediaType
from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
async def add_db_item(self, radio: Radio) -> Radio:
"""Add a new radio record to the database."""
- if not radio.sort_name:
- radio.sort_name = create_sort_name(radio.name)
assert radio.provider_ids
async with self.mass.database.get_db() as _db:
match = {"name": radio.name}
else:
metadata = cur_item.metadata.update(radio.metadata)
provider_ids = {*cur_item.provider_ids, *radio.provider_ids}
- if not radio.sort_name:
- radio.sort_name = create_sort_name(radio.name)
match = {"item_id": item_id}
await self.mass.database.update(
)
from music_assistant.helpers.database import TABLE_TRACKS
from music_assistant.helpers.json import json_serializer
-from music_assistant.helpers.util import create_sort_name
-from music_assistant.models.enums import EventType, MediaType
+from music_assistant.models.enums import EventType, MediaType, ProviderType
from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import ItemMapping, Track
)
return db_item
- async def versions(self, item_id: str, provider_id: str) -> List[Track]:
+ async def versions(
+ self,
+ item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ ) -> List[Track]:
"""Return all versions of a track we can find on all providers."""
- track = await self.get(item_id, provider_id)
- provider_ids = {item.id for item in self.mass.music.providers}
+ track = await self.get(item_id, provider, provider_id)
+ prov_types = {item.types for item in self.mass.music.providers}
first_artist = next(iter(track.artists))
search_query = f"{first_artist.name} {track.name}"
return [
prov_item
for prov_items in await asyncio.gather(
- *[self.search(search_query, prov_id) for prov_id in provider_ids]
+ *[self.search(search_query, prov_type) for prov_type in prov_types]
)
for prov_item in prov_items
if compare_artists(prov_item.artists, track.artists)
This is used to link objects of different providers/qualities together.
"""
- if db_track.provider != "database":
+ if db_track.provider != ProviderType.DATABASE:
return # Matching only supported for database items
if isinstance(db_track.album, ItemMapping):
# matching only works if we have a full track object
for provider in self.mass.music.providers:
if MediaType.TRACK not in provider.supported_mediatypes:
continue
- if provider.id == "filesystem":
+ if provider.type.is_file():
continue
self.logger.debug(
"Trying to match track %s on provider %s", db_track.name, provider.name
searchstr = f"{db_track_artist.name} {db_track.name}"
if db_track.version:
searchstr += " " + db_track.version
- search_result = await self.search(searchstr, provider.id)
+ search_result = await self.search(searchstr, provider.type)
for search_result_item in search_result:
if not search_result_item.available:
continue
async def add_db_item(self, track: Track) -> Track:
"""Add a new track record to the database."""
assert track.artists, "Track is missing artist(s)"
- assert track.provider_ids
- if not track.sort_name:
- track.sort_name = create_sort_name(track.name)
+ assert track.provider_ids, "Track is missing provider id(s)"
cur_item = None
async with self.mass.database.get_db() as _db:
track_album = await self._get_track_album(track)
)
# return created object
+ self.logger.debug("added %s to database: %s", track.name, item_id)
return await self.get_db_item(item_id, db=_db)
async def update_db_item(
"""Update Track record in the database, merging data."""
async with self.mass.database.get_db() as _db:
cur_item = await self.get_db_item(item_id, db=_db)
- track_album = await self._get_track_album(track)
+ track_album = await self._get_track_album(cur_item, track)
if overwrite:
provider_ids = track.provider_ids
track_artists = track.artists
- track_album = track.album or cur_item.album
else:
provider_ids = {*cur_item.provider_ids, *track.provider_ids}
track_artists = cur_item.artists + track.artists
- track_album = cur_item.album or track.album
metadata = cur_item.metadata.update(track.metadata, overwrite)
- if track_album and not isinstance(track_album, ItemMapping):
- track_album = ItemMapping.from_item(
- await self.get_db_item_by_prov_id(
- track_album.provider, track_album.item_id, db=_db
- )
- or await self.mass.music.albums.add_db_item(track_album)
- )
# we store a mapping to artists on the track for easier access/listings
track_artists = await self._get_track_artists(track, track_artists)
cur_ids = {x.item_id for x in track_artists}
track_artist = (
await self.mass.music.artists.get_db_item_by_prov_id(
- item.provider, item.item_id
+ provider_item_id=item.item_id,
+ provider=item.provider,
)
or item
)
if isinstance(track.album, ItemMapping):
return track.album
- if track.album.provider == "database":
+ if track.album.provider == ProviderType.DATABASE:
return ItemMapping.from_item(track.album)
if track.album.musicbrainz_id:
return ItemMapping.from_item(track_album)
if track_album := await self.mass.music.albums.get_db_item_by_prov_id(
- track.album.provider, track.album.item_id
+ provider_item_id=track.album.item_id,
+ provider=track.album.provider,
):
return ItemMapping.from_item(track_album)
return f"http://{self._ip}:{self._port}/{queue_id}/{child_player}.{ext}"
return f"http://{self._ip}:{self._port}/{queue_id}.{ext}"
- async def get_preview_url(self, provider: str, track_id: str) -> str:
+ async def get_preview_url(self, provider_id: str, track_id: str) -> str:
"""Return url to short preview sample."""
- track = await self.mass.music.tracks.get_provider_item(track_id, provider)
+ track = await self.mass.music.tracks.get_provider_item(track_id, provider_id)
if preview := track.metadata.preview:
return preview
enc_track_id = urllib.parse.quote(track_id)
- return f"http://{self._ip}:{self._port}/preview?provider={provider}&item_id={enc_track_id}"
+ return f"http://{self._ip}:{self._port}/preview?provider_id={provider_id}&item_id={enc_track_id}"
async def setup(self) -> None:
"""Async initialize of module."""
async def serve_preview(self, request: web.Request):
"""Serve short preview sample."""
- provider = request.query["provider"]
+ provider_id = request.query["provider_id"]
item_id = urllib.parse.unquote(request.query["item_id"])
resp = web.StreamResponse(
status=200, reason="OK", headers={"Content-Type": "audio/mp3"}
)
await resp.prepare(request)
- async for _, chunk in get_preview_stream(self.mass, provider, item_id):
+ async for _, chunk in get_preview_stream(self.mass, provider_id, item_id):
await resp.write(chunk)
return resp
from music_assistant.helpers.process import AsyncProcess, check_output
from music_assistant.helpers.util import create_tempfile
-from music_assistant.models.enums import EventType
+from music_assistant.models.enums import EventType, ProviderType
from music_assistant.models.errors import AudioError, MediaNotFoundError
from music_assistant.models.event import MassEvent
from music_assistant.models.media_items import (
# special case: a plain url was added to the queue
streamdetails = StreamDetails(
type=StreamType.URL,
- provider="url",
+ provider=ProviderType.URL,
item_id=queue_item.item_id,
path=queue_item.uri,
content_type=ContentType.try_parse(queue_item.uri),
full_item = await mass.music.get_item_by_uri(queue_item.uri)
# sort by quality and check track availability
for prov_media in sorted(
- full_item.provider_ids, key=lambda x: x.quality, reverse=True
+ full_item.provider_ids, key=lambda x: x.quality or 0, reverse=True
):
if not prov_media.available:
continue
# get streamdetails from provider
- music_prov = mass.music.get_provider(prov_media.provider)
+ music_prov = mass.music.get_provider(prov_media.prov_id)
if not music_prov or not music_prov.available:
continue # provider temporary unavailable ?
async def get_gain_correct(
- mass: MusicAssistant, queue_id: str, item_id: str, provider_id: str
+ mass: MusicAssistant, queue_id: str, item_id: str, provider: ProviderType
) -> Tuple[float, float]:
"""Get gain correction for given queue / track combination."""
queue = mass.players.get_player_queue(queue_id)
if not queue or not queue.settings.volume_normalization_enabled:
return 0
target_gain = queue.settings.volume_normalization_target
- track_loudness = await mass.music.get_track_loudness(item_id, provider_id)
+ track_loudness = await mass.music.get_track_loudness(item_id, provider)
if track_loudness is None:
# fallback to provider average
- fallback_track_loudness = await mass.music.get_provider_loudness(provider_id)
+ fallback_track_loudness = await mass.music.get_provider_loudness(provider)
if fallback_track_loudness is None:
# fallback to some (hopefully sane) average value for now
fallback_track_loudness = -8.5
mass.signal_event(
MassEvent(
EventType.STREAM_STARTED,
- object_id=streamdetails.provider,
+ object_id=streamdetails.provider.value,
data=streamdetails,
)
)
mass.signal_event(
MassEvent(
EventType.STREAM_ENDED,
- object_id=streamdetails.provider,
+ object_id=streamdetails.provider.value,
data=streamdetails,
)
)
async def get_preview_stream(
mass: MusicAssistant,
- provider: str,
+ provider_id: str,
track_id: str,
) -> AsyncGenerator[Tuple[bool, bytes], None]:
"""Get the audio stream for the given streamdetails."""
- music_prov = mass.music.get_provider(provider)
+ music_prov = mass.music.get_provider(provider_id)
streamdetails = await music_prov.get_stream_details(track_id)
- mass.signal_event(
- MassEvent(
- EventType.STREAM_STARTED,
- object_id=streamdetails.provider,
- data=streamdetails,
- )
- )
if streamdetails.type == StreamType.EXECUTABLE:
# stream from executable
input_args = [
cacheobject matches the checkum provided
"""
cur_time = int(time.time())
- checksum = self._get_checksum(checksum)
# try memory cache first
cache_data = self._mem_cache.get(cache_key)
async def set(self, cache_key, data, checksum="", expiration=(86400 * 30)):
"""Set data in cache."""
- checksum = self._get_checksum(checksum)
expires = int(time.time() + expiration)
self._mem_cache[cache_key] = (data, checksum, expires)
if (expires - time.time()) < 3600 * 4:
# reschedule self
self.mass.loop.call_later(3600, self.__schedule_cleanup_task)
- @staticmethod
- def _get_checksum(stringinput):
- """Get int checksum from string."""
- if not stringinput:
- return 0
- stringinput = str(stringinput)
- return functools.reduce(lambda x, y: x + y, map(ord, stringinput))
-
def use_cache(expiration=86400 * 30):
"""Return decorator that can be used to cache a method's result."""
for key in sorted(kwargs.keys()):
cache_key_parts.append(f"{key}{kwargs[key]}")
cache_key = ".".join(cache_key_parts)
+
cachedata = await method_class.cache.get(cache_key, checksum=cache_checksum)
if not skip_cache and cachedata is not None:
"""Several helper/utils to compare objects."""
from __future__ import annotations
-import re
from typing import List, Union
-import unidecode
-
+from music_assistant.helpers.util import create_clean_string
from music_assistant.models.media_items import (
Album,
Artist,
)
-def get_compare_string(input_str):
- """Return clean lowered string for compare actions."""
- unaccented_string = unidecode.unidecode(input_str)
- return re.sub(r"[^a-zA-Z0-9]", "", unaccented_string).lower()
-
-
def compare_strings(str1, str2, strict=False):
"""Compare strings and return True if we have an (almost) perfect match."""
- match = str1.lower() == str2.lower()
- if not match and not strict:
- match = get_compare_string(str1) == get_compare_string(str2)
- return match
+ if not strict:
+ return create_clean_string(str1) == create_clean_string(str2)
+ return str1.lower().strip() == str2.lower().strip()
def compare_version(left_version: str, right_version: str):
"""Compare two lists of artist and return True if both lists match."""
matches = 0
for left_artist in left_artists:
+ if not left_artist.sort_name:
+ left_artist.sort_name = create_clean_string(left_artist.name)
for right_artist in right_artists:
- if compare_strings(left_artist.name, right_artist.name):
+ if not right_artist.sort_name:
+ right_artist.sort_name = create_clean_string(right_artist.name)
+ if left_artist.sort_name == right_artist.sort_name:
matches += 1
return len(left_artists) == matches
):
return True
- if isinstance(left_album, Album) and isinstance(right_album, Album):
- # prefer match on UPC
- if left_album.upc and right_album.upc:
- if (left_album.upc in right_album.upc) or (
- right_album.upc in left_album.upc
- ):
- return True
- # prefer match on musicbrainz_id
- if left_album.musicbrainz_id and right_album.musicbrainz_id:
- if left_album.musicbrainz_id == right_album.musicbrainz_id:
+ # prefer match on UPC
+ if getattr(left_album, "upc", None) and getattr(right_album, "upc", None):
+ if (left_album.upc in right_album.upc) or (right_album.upc in left_album.upc):
+ return True
+ # prefer match on musicbrainz_id
+ if getattr(left_album, "musicbrainz_id", None) and getattr(
+ right_album, "musicbrainz_id", None
+ ):
+ return left_album.musicbrainz_id == right_album.musicbrainz_id
- return True
# fallback to comparing
+ if not left_album.sort_name:
+ left_album.sort_name = create_clean_string(left_album.name)
+ if not right_album.sort_name:
+ right_album.sort_name = create_clean_string(right_album.name)
if not compare_strings(left_album.name, right_album.name):
return False
if not compare_version(left_album.version, right_album.version):
return False
- if not left_album.artist or not right_album.artist:
+ # album artist must be either set on both or not at all
+ if left_album.artist and not right_album.artist:
return False
- if not compare_strings(left_album.artist.name, right_album.artist.name):
+ if right_album.artist and not left_album.artist:
return False
- # 100% match, all criteria passed
- return True
+ if left_album.artist and right_album.artist:
+ if not left_album.artist.sort_name:
+ left_album.artist.sort_name = create_clean_string(left_album.artist.name)
+ if not right_album.artist.sort_name:
+ right_album.artist.sort_name = create_clean_string(right_album.artist.name)
+ if left_album.artist.sort_name != right_album.artist.sort_name:
+ return False
+ return left_album.sort_name == right_album.sort_name
def compare_track(left_track: Track, right_track: Track):
# musicbrainz_id is always 100% accurate match
return True
# track name and version must match
- if not compare_strings(left_track.name, right_track.name):
+ if not left_track.sort_name:
+ left_track.sort_name = create_clean_string(left_track.name)
+ if not right_track.sort_name:
+ right_track.sort_name = create_clean_string(right_track.name)
+ if not left_track.sort_name != right_track.sort_name:
return False
if not compare_version(left_track.version, right_track.version):
return False
if isinstance(left_track.album, ItemMapping) and isinstance(
right_track.album, ItemMapping
):
- if compare_strings(left_track.album.name, right_track.album.name):
+ if left_track.album.sort_name == right_track.album.sort_name:
return True
if abs(left_track.duration - right_track.duration) <= 2:
# 100% match, all criteria passed
from music_assistant.mass import MusicAssistant
-SCHEMA_VERSION = 7
+SCHEMA_VERSION = 9
TABLE_PROV_MAPPINGS = "provider_mappings"
TABLE_TRACK_LOUDNESS = "track_loudness"
TABLE_RADIOS = "radios"
TABLE_CACHE = "cache"
TABLE_SETTINGS = "settings"
+TABLE_THUMBS = "thumbnails"
class Database:
# always create db tables if they don't exist to prevent errors trying to access them later
await self.__create_database_tables(db)
- if prev_version < 7:
+ if prev_version < 9:
# refactored file provider, start clean just in case.
await db.execute("DROP TABLE IF EXISTS filesystem_mappings")
await db.execute(f"DROP TABLE IF EXISTS {TABLE_ARTISTS}")
await db.execute(f"DROP TABLE IF EXISTS {TABLE_PLAYLISTS}")
await db.execute(f"DROP TABLE IF EXISTS {TABLE_RADIOS}")
await db.execute(f"DROP TABLE IF EXISTS {TABLE_PROV_MAPPINGS}")
+ await db.execute(f"DROP TABLE IF EXISTS {TABLE_CACHE}")
+ await db.execute(f"DROP TABLE IF EXISTS {TABLE_THUMBS}")
# recreate missing tables
await self.__create_database_tables(db)
item_id INTEGER NOT NULL,
media_type TEXT NOT NULL,
prov_item_id TEXT NOT NULL,
- provider TEXT NOT NULL,
+ prov_type TEXT NOT NULL,
+ prov_id TEXT NOT NULL,
quality INTEGER NULL,
details TEXT NULL,
url TEXT NULL,
- UNIQUE(item_id, media_type, prov_item_id, provider)
+ UNIQUE(item_id, media_type, prov_item_id, prov_id)
);"""
)
await db.execute(
item_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
sort_name TEXT NOT NULL,
- musicbrainz_id TEXT NOT NULL UNIQUE,
+ musicbrainz_id TEXT,
in_library BOOLEAN DEFAULT 0,
metadata json,
provider_ids json
)
await db.execute(
f"""CREATE TABLE IF NOT EXISTS {TABLE_CACHE}(
- key TEXT UNIQUE, expires INTEGER, data TEXT, checksum INTEGER)"""
+ key TEXT UNIQUE NOT NULL, expires INTEGER NOT NULL, data TEXT, checksum TEXT NULL)"""
+ )
+ await db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_THUMBS}(
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ path TEXT NOT NULL,
+ size INTEGER NULL,
+ data BLOB,
+ UNIQUE(path, size));"""
)
from __future__ import annotations
from io import BytesIO
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional
from PIL import Image
+from tinytag import TinyTag
from music_assistant.models.enums import ImageType, MediaType
from music_assistant.models.media_items import ItemMapping, MediaItemType
from music_assistant.mass import MusicAssistant
-async def create_thumbnail(mass: MusicAssistant, url, size: int = 150) -> bytes:
+async def create_thumbnail(
+ mass: MusicAssistant, path: str, size: Optional[int]
+) -> bytes:
"""Create thumbnail from image url."""
- async with mass.http_session.get(url, verify_ssl=False) as response:
- assert response.status == 200
- img_data = BytesIO(await response.read())
- img = Image.open(img_data)
+ if not size:
+ size = 200
+ img_data = None
+ if path.startswith("http"):
+ async with mass.http_session.get(path, verify_ssl=False) as response:
+ assert response.status == 200
+ img_data = await response.read()
+ else:
+ # assume file from file provider, we need to fetch it here...
+ for prov in mass.music.providers:
+ if not prov.type.is_file():
+ continue
+ if not prov.exists(path):
+ continue
+ if TinyTag.is_supported(path):
+ # embedded image in music file
+ def get_embedded_image():
+ tags = TinyTag.get(path, image=True)
+ return tags.get_image()
+
+ img_data = await mass.loop.run_in_executor(None, get_embedded_image)
+ else:
+ # regular image file on disk
+ async with prov.open_file(path) as _file:
+ img_data = BytesIO(await _file.read())
+ break
+ if not img_data:
+ raise FileNotFoundError(f"Image not found: {path}")
+
+ def _create_image():
+ data = BytesIO(img_data)
+ img = Image.open(data)
img.thumbnail((size, size), Image.ANTIALIAS)
- img.save(format="png")
- return img_data.getvalue()
+ img.save(data, format="png")
+ return data.getvalue()
+
+ return await mass.loop.run_in_executor(None, _create_image)
async def get_image_url(
mass: MusicAssistant,
media_item: MediaItemType,
img_type: ImageType = ImageType.THUMB,
-):
+) -> str:
"""Get url to image for given media media_item."""
if not media_item:
return None
for img in media_item.metadata.images:
if img.type == img_type:
return img.url
- if img_type == ImageType.THUMB and img.type == ImageType.EMBEDDED_THUMB:
- if file_prov := mass.music.get_provider("filesystem"):
- return await file_prov.get_embedded_image(img.url)
# retry with track's album
if media_item.media_type == MediaType.TRACK and media_item.album:
from typing import Tuple
-from music_assistant.models.enums import MediaType
+from music_assistant.models.enums import MediaType, ProviderType
from music_assistant.models.errors import MusicAssistantError
-def parse_uri(uri: str) -> Tuple[MediaType, str, str]:
+def parse_uri(uri: str) -> Tuple[MediaType, ProviderType, str]:
"""
Try to parse URI to Mass identifiers.
if uri.startswith("https://open."):
# public share URL (e.g. Spotify or Qobuz, not sure about others)
# https://open.spotify.com/playlist/5lH9NjOeJvctAO92ZrKQNB?si=04a63c8234ac413e
- provider = uri.split(".")[1]
+ provider = ProviderType.parse(uri.split(".")[1])
media_type_str = uri.split("/")[3]
media_type = MediaType(media_type_str)
item_id = uri.split("/")[4].split("?")[0]
elif "://" in uri:
# music assistant-style uri
# provider://media_type/item_id
- provider = uri.split("://")[0]
+ provider = ProviderType.parse(uri.split("://")[0])
media_type_str = uri.split("/")[2]
media_type = MediaType(media_type_str)
item_id = uri.split(f"{media_type_str}/")[1]
elif ":" in uri:
# spotify new-style uri
provider, media_type_str, item_id = uri.split(":")
+ provider = ProviderType.parse(provider)
media_type = MediaType(media_type_str)
else:
raise KeyError
return (media_type, provider, item_id)
-def create_uri(media_type: MediaType, provider: str, item_id: str) -> str:
+def create_uri(media_type: MediaType, provider: ProviderType, item_id: str) -> str:
"""Create Music Assistant URI from MediaItem values."""
- return f"{provider}://{media_type.value}/{item_id}"
+ return f"{provider.value}://{media_type.value}/{item_id}"
import os
import platform
+import re
import socket
import tempfile
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar
import memory_tempfile
+import unidecode
# pylint: disable=invalid-name
T = TypeVar("T")
return possible_bool in ["true", "True", "1", "on", "ON", 1]
-def create_sort_name(name):
- """Return sort name."""
- sort_name = name
- for item in ["The ", "De ", "de ", "Les "]:
- if name.startswith(item):
- sort_name = "".join(name.split(item)[1:])
- return sort_name.lower()
+def create_clean_string(input_str: str) -> str:
+ """Return clean lowered string for compare actions."""
+ input_str = input_str.lower().strip()
+ for item in ["the ", "de ", "les "]:
+ if input_str.startswith(item):
+ input_str = input_str.replace(item, "")
+ unaccented_string = unidecode.unidecode(input_str)
+ return re.sub(r"[^a-zA-Z0-9]", "", unaccented_string)
def parse_title_and_version(title: str, track_version: str = None):
"""Model for the Music Assisant runtime config."""
-from dataclasses import dataclass
-from typing import Optional
+from dataclasses import dataclass, field
+from typing import List, Optional
from databases import DatabaseURL
from music_assistant.helpers.util import get_ip, select_stream_port
+from music_assistant.models.enums import ProviderType
+
+
+@dataclass(frozen=True)
+class MusicProviderConfig:
+ """Base Model for a MusicProvider config."""
+
+ type: ProviderType
+ enabled: bool = True
+ username: Optional[str] = None
+ password: Optional[str] = None
+ path: Optional[str] = None
+ # no need to override the id unless you really know what you're doing ;-)
+ id: Optional[str] = None
+
+ def __post_init__(self):
+ """Call after init."""
+ # create a default (hopefully unique enough) id from type + username/path
+ if not self.id:
+ prov_id = f"{self.type.value}_"
+ base_str = (self.path or self.username).lower()
+ prov_id += (
+ base_str.replace(".", "").replace("_", "").split("@")[0][1::2]
+ ) + base_str[-1]
+ super().__setattr__("id", prov_id)
+ elif not self.id:
+ self.id = self.type.value
@dataclass(frozen=True)
database_url: DatabaseURL
- spotify_enabled: bool = False
- spotify_username: Optional[str] = None
- spotify_password: Optional[str] = None
-
- qobuz_enabled: bool = False
- qobuz_username: Optional[str] = None
- qobuz_password: Optional[str] = None
-
- tunein_enabled: bool = False
- tunein_username: Optional[str] = None
-
- filesystem_enabled: bool = False
- filesystem_music_dir: Optional[str] = None
- filesystem_playlists_dir: Optional[str] = None
+ providers: List[MusicProviderConfig] = field(default_factory=list)
# advanced settings
- max_simultaneous_jobs: int = 10
+ max_simultaneous_jobs: int = 5
stream_port: int = select_stream_port()
stream_ip: str = get_ip()
"""Enum wth image types."""
THUMB = "thumb"
- WIDE_THUMB = "wide_thumb"
+ LANDSCAPE = "landscape"
FANART = "fanart"
LOGO = "logo"
CLEARART = "clearart"
BANNER = "banner"
CUTOUT = "cutout"
BACK = "back"
- CDART = "cdart"
- EMBEDDED_THUMB = "embedded_thumb"
+ DISCART = "discart"
OTHER = "other"
CANCELLED = "cancelled"
FINISHED = "success"
ERROR = "error"
+
+
+class ProviderType(Enum):
+ """Enum with supported music providers."""
+
+ FILESYSTEM_LOCAL = "file"
+ FILESYSTEM_SMB = "smb"
+ FILESYSTEM_GOOGLE_DRIVE = "gdrive"
+ FILESYSTEM_ONEDRIVE = "onedrive"
+ SPOTIFY = "spotify"
+ QOBUZ = "qobuz"
+ TUNEIN = "tunein"
+ DATABASE = "database" # internal only
+ URL = "url" # internal only
+
+ def is_file(self) -> bool:
+ """Return if type is one of the filesystem providers."""
+ return self in (
+ self.FILESYSTEM_LOCAL,
+ self.FILESYSTEM_SMB,
+ self.FILESYSTEM_GOOGLE_DRIVE,
+ self.FILESYSTEM_ONEDRIVE,
+ )
+
+ @classmethod
+ def parse(cls: "ProviderType", val: str) -> "ProviderType":
+ """Try to parse ContentType from provider id."""
+ if isinstance(val, ProviderType):
+ return val
+ for mem in ProviderType:
+ if val.startswith(mem.value):
+ return mem
+ raise ValueError(f"Unable to parse ProviderType from {val}")
from music_assistant.models.errors import MediaNotFoundError, ProviderUnavailableError
-from .enums import MediaType
+from .enums import MediaType, ProviderType
from .media_items import MediaItemType
if TYPE_CHECKING:
async def get(
self,
provider_item_id: str,
- provider_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
force_refresh: bool = False,
lazy: bool = True,
details: ItemCls = None,
) -> ItemCls:
"""Return (full) details for a single media item."""
- db_item = await self.get_db_item_by_prov_id(provider_id, provider_item_id)
+ assert provider or provider_id, "provider or provider_id must be supplied"
+ db_item = await self.get_db_item_by_prov_id(
+ provider_item_id=provider_item_id,
+ provider=provider,
+ provider_id=provider_id,
+ )
if db_item and (time() - db_item.last_refresh) > REFRESH_INTERVAL:
force_refresh = True
if db_item and force_refresh:
provider_id, provider_item_id = await self.get_provider_id(db_item)
elif db_item:
return db_item
- if not details:
+ if not details and provider_id:
details = await self.get_provider_item(provider_item_id, provider_id)
+ if not details and provider:
+ # check providers for given provider type one by one
+ for prov in self.mass.music.providers:
+ if not prov.available:
+ continue
+ if prov.type == provider:
+ try:
+ details = await self.get_provider_item(
+ provider_item_id, prov.id
+ )
+ except MediaNotFoundError:
+ pass
+ else:
+ break
if not lazy:
return await self.add(details)
self.mass.add_job(self.add(details), f"Add {details.uri} to database")
return db_item if db_item else details
async def search(
- self, search_query: str, provider_id: str, limit: int = 25
+ self,
+ search_query: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ limit: int = 25,
) -> List[ItemCls]:
"""Search database or provider with given query."""
- if provider_id == "database":
+ if provider == ProviderType.DATABASE or provider_id == "database":
return [
self.item_cls.from_db_row(db_row)
for db_row in await self.mass.database.search(
)
]
- provider = self.mass.music.get_provider(provider_id)
+ provider = self.mass.music.get_provider(provider_id or provider)
if not provider:
return {}
return await provider.search(
limit,
)
- async def add_to_library(self, provider_item_id: str, provider_id: str) -> None:
+ async def add_to_library(
+ self,
+ provider_item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ ) -> None:
"""Add an item to the library."""
# make sure we have a valid full item
- db_item = await self.get(provider_item_id, provider_id, lazy=False)
+ db_item = await self.get(
+ provider_item_id, provider=provider, provider_id=provider_id, lazy=False
+ )
# add to provider libraries
for prov_id in db_item.provider_ids:
- if prov := self.mass.music.get_provider(prov_id.provider):
+ if prov := self.mass.music.get_provider(prov_id.prov_id):
await prov.library_add(prov_id.item_id, self.media_type)
# mark as library item in internal db
if not db_item.in_library:
await self.set_db_library(db_item.item_id, True)
async def remove_from_library(
- self, provider_item_id: str, provider_id: str
+ self,
+ provider_item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
) -> None:
"""Remove item from the library."""
# make sure we have a valid full item
- db_item = await self.get(provider_item_id, provider_id, lazy=False)
+ db_item = await self.get(
+ provider_item_id, provider=provider, provider_id=provider_id, lazy=False
+ )
# add to provider's libraries
for prov_id in db_item.provider_ids:
- if prov := self.mass.music.get_provider(prov_id.provider):
+ if prov := self.mass.music.get_provider(prov_id.prov_id):
await prov.library_remove(prov_id.item_id, self.media_type)
# unmark as library item in internal db
if db_item.in_library:
async def get_provider_id(self, item: ItemCls) -> Tuple[str, str]:
"""Return provider and item id."""
- if item.provider == "database":
+ if item.provider == ProviderType.DATABASE:
# make sure we have a full object
item = await self.get_db_item(item.item_id)
for prov in item.provider_ids:
# returns the first provider that is available
if not prov.available:
continue
- if self.mass.music.get_provider(prov.provider):
- return (prov.provider, prov.item_id)
+ if self.mass.music.get_provider(prov.prov_id):
+ return (prov.prov_id, prov.item_id)
return None, None
async def get_db_items(
async def get_db_item_by_prov_id(
self,
- provider_id: str,
provider_item_id: str,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
db: Optional[Db] = None,
) -> ItemCls | None:
"""Get the database album for the given prov_id."""
- if provider_id == "database":
+ assert provider or provider_id, "provider or provider_id must be supplied"
+ if provider == ProviderType.DATABASE or provider_id == "database":
return await self.get_db_item(provider_item_id, db=db)
if item_id := await self.mass.music.get_provider_mapping(
- self.media_type, provider_id, provider_item_id
+ self.media_type, provider_item_id, provider, provider_id=provider_id, db=db
):
return await self.get_db_item(item_id, db=db)
return None
async def get_db_items_by_prov_id(
- self, provider_id: str, db: Optional[Db] = None
+ self,
+ provider: Optional[ProviderType] = None,
+ provider_id: Optional[str] = None,
+ db: Optional[Db] = None,
) -> List[ItemCls]:
"""Fetch all records from database for given provider."""
+ assert provider or provider_id, "provider or provider_id must be supplied"
db_ids = await self.mass.music.get_provider_mappings(
- self.media_type, provider_id, db=db
+ self.media_type, provider=provider, provider_id=provider_id, db=db
)
query = f"SELECT * FROM tracks WHERE item_id in {str(tuple(db_ids))}"
return await self.get_db_items(query, db=db)
{"in_library": in_library},
)
- async def get_provider_item(self, item_id: str, provider_id: str) -> ItemCls:
+ async def get_provider_item(
+ self,
+ item_id: str,
+ provider_id: str,
+ ) -> ItemCls:
"""Return item details for the given provider item id."""
if provider_id == "database":
item = await self.get_db_item(item_id)
item = await provider.get_item(self.media_type, item_id)
if not item:
raise MediaNotFoundError(
- f"{self.media_type.value} {item_id} not found on provider {provider_id}"
+ f"{self.media_type.value} {item_id} not found on provider {provider.name}"
)
return item
from music_assistant.helpers.json import json
from music_assistant.helpers.uri import create_uri
-from music_assistant.helpers.util import create_sort_name
+from music_assistant.helpers.util import create_clean_string, merge_lists
from music_assistant.models.enums import (
AlbumType,
ContentType,
LinkType,
MediaQuality,
MediaType,
+ ProviderType,
StreamType,
)
class MediaItemProviderId(DataClassDictMixin):
"""Model for a MediaItem's provider id."""
- provider: str
item_id: str
+ prov_type: ProviderType
+ prov_id: str
available: bool = True
quality: Optional[MediaQuality] = None
details: Optional[str] = None
def __hash__(self):
"""Return custom hash."""
- return hash((self.provider, self.item_id, self.quality))
+ return hash((self.prov_id, self.item_id, self.quality))
@dataclass(frozen=True)
type: ImageType
url: str
+ is_file: bool = False # indicator that image is local filepath instead of url
def __hash__(self):
"""Return custom hash."""
description: Optional[str] = None
review: Optional[str] = None
explicit: Optional[bool] = None
- images: Optional[Set[MediaItemImage]] = None
+ images: Optional[List[MediaItemImage]] = None
genres: Optional[Set[str]] = None
mood: Optional[str] = None
style: Optional[str] = None
def update(
self,
new_values: "MediaItemMetadata",
- allow_overwrite: bool = False,
+ allow_overwrite: bool = True,
) -> "MediaItemMetadata":
"""Update metadata (in-place) with new values."""
for fld in fields(self):
if new_val is None:
continue
cur_val = getattr(self, fld.name)
- if isinstance(cur_val, set):
+ if isinstance(cur_val, list):
+ merge_lists(cur_val, new_val)
+ elif isinstance(cur_val, set):
cur_val.update(new_val)
elif cur_val is None or allow_overwrite:
setattr(self, fld.name, new_val)
"""Base representation of a media item."""
item_id: str
- provider: str
+ provider: ProviderType
name: str
# optional fields below
provider_ids: Set[MediaItemProviderId] = field(default_factory=set)
- sort_name: Optional[str] = None
+
metadata: MediaItemMetadata = field(default_factory=MediaItemMetadata)
in_library: bool = False
media_type: MediaType = MediaType.UNKNOWN
- uri: str = ""
+ # sort_name and uri are auto generated, do not override unless needed
+ sort_name: Optional[str] = None
+ uri: Optional[str] = None
def __post_init__(self):
"""Call after init."""
if not self.uri:
self.uri = create_uri(self.media_type, self.provider, self.item_id)
if not self.sort_name:
- self.sort_name = create_sort_name(self.name)
- if not self.provider_ids:
- self.add_provider_id(MediaItemProviderId(self.provider, self.item_id))
+ self.sort_name = create_clean_string(self.name)
@classmethod
def from_db_row(cls, db_row: Mapping):
self.provider_ids = {
x
for x in self.provider_ids
- if not (x.item_id == prov_id.item_id and x.provider == prov_id.provider)
+ if not (x.item_id == prov_id.item_id and x.prov_id == prov_id.prov_id)
}
self.provider_ids.add(prov_id)
class ItemMapping(DataClassDictMixin):
"""Representation of a minimized item object."""
+ media_type: MediaType
item_id: str
- provider: str
- name: str = ""
+ provider: ProviderType
+ name: str
+ sort_name: str
+ uri: str
version: str = ""
- media_type: MediaType = MediaType.ARTIST
- uri: str = ""
-
- def __post_init__(self):
- """Call after init."""
- if not self.uri:
- self.uri = create_uri(self.media_type, self.provider, self.item_id)
@classmethod
def from_item(cls, item: "MediaItem"):
"""Model for a radio station."""
media_type: MediaType = MediaType.RADIO
- duration: int = 86400
+ duration: int = 0
def to_db_row(self) -> dict:
"""Create dict from item suitable for db."""
"""Model for streamdetails."""
type: StreamType
- provider: str
+ provider: ProviderType
item_id: str
path: str
content_type: ContentType
def __str__(self):
"""Return pretty printable string of object."""
- return f"{self.type.value}/{self.content_type.value} - {self.provider}/{self.item_id}"
+ return f"{self.type.value}/{self.content_type.value} - {self.provider.value}/{self.item_id}"
# collect tracks to play
if media_item.media_type == MediaType.ARTIST:
tracks = await self.mass.music.artists.toptracks(
- media_item.item_id, provider_id=media_item.provider
+ media_item.item_id, provider=media_item.provider
)
elif media_item.media_type == MediaType.ALBUM:
tracks = await self.mass.music.albums.tracks(
- media_item.item_id, provider_id=media_item.provider
+ media_item.item_id, provider=media_item.provider
)
elif media_item.media_type == MediaType.PLAYLIST:
tracks = await self.mass.music.playlists.tracks(
- media_item.item_id, provider_id=media_item.provider
+ media_item.item_id, provider=media_item.provider
)
elif media_item.media_type == MediaType.RADIO:
# single radio
tracks = [
await self.mass.music.radio.get(
- media_item.item_id, provider_id=media_item.provider
+ media_item.item_id, provider=media_item.provider
)
]
else:
# single track
tracks = [
await self.mass.music.tracks.get(
- media_item.item_id, provider_id=media_item.provider
+ media_item.item_id, provider=media_item.provider
)
]
for track in tracks:
from __future__ import annotations
from abc import abstractmethod
-from logging import Logger
-from typing import TYPE_CHECKING, List, Optional
+from typing import TYPE_CHECKING, AsyncGenerator, List, Optional
-from music_assistant.models.enums import MediaType
+from music_assistant.models.config import MusicProviderConfig
+from music_assistant.models.enums import MediaType, ProviderType
from music_assistant.models.media_items import (
Album,
Artist,
class MusicProvider:
"""Model for a Music Provider."""
- _attr_id: str = None
_attr_name: str = None
+ _attr_type: ProviderType = None
_attr_available: bool = True
_attr_supported_mediatypes: List[MediaType] = []
- mass: MusicAssistant = None # set by setup
- cache: MusicAssistant = None # set by setup
- logger: Logger = None # set by setup
+
+ def __init__(self, mass: MusicAssistant, config: MusicProviderConfig) -> None:
+ """Initialize MusicProvider."""
+ self.mass = mass
+ self.config = config
+ self.logger = mass.logger
+ self.cache = mass.cache
@abstractmethod
async def setup(self) -> bool:
"""
@property
- def id(self) -> str:
- """Return provider ID for this provider."""
- return self._attr_id
+ def type(self) -> ProviderType:
+ """Return provider type for this provider."""
+ return self._attr_type
@property
def name(self) -> str:
"""Return provider Name for this provider."""
+ if sum(1 for x in self.mass.music.providers if x.type == self.type) > 1:
+ append_str = self.config.path or self.config.username
+ return f"{self._attr_name} ({append_str})"
return self._attr_name
@property
"""
raise NotImplementedError
- async def get_library_artists(self) -> List[Artist]:
+ async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve library artists from the provider."""
if MediaType.ARTIST in self.supported_mediatypes:
raise NotImplementedError
- async def get_library_albums(self) -> List[Album]:
+ async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve library albums from the provider."""
if MediaType.ALBUM in self.supported_mediatypes:
raise NotImplementedError
- async def get_library_tracks(self) -> List[Track]:
+ async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from the provider."""
if MediaType.TRACK in self.supported_mediatypes:
raise NotImplementedError
- async def get_library_playlists(self) -> List[Playlist]:
+ async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Retrieve library/subscribed playlists from the provider."""
if MediaType.PLAYLIST in self.supported_mediatypes:
raise NotImplementedError
- async def get_library_radios(self) -> List[Radio]:
+ async def get_library_radios(self) -> AsyncGenerator[Radio, None]:
"""Retrieve library/subscribed radio stations from the provider."""
if MediaType.RADIO in self.supported_mediatypes:
raise NotImplementedError
"""Get streamdetails for a track/radio."""
raise NotImplementedError
- # some helper methods below
- async def get_library_items(self, media_type: MediaType) -> List[MediaItemType]:
- """Return library items for given media_type."""
- if media_type == MediaType.ARTIST:
- return await self.get_library_artists()
- if media_type == MediaType.ALBUM:
- return await self.get_library_albums()
- if media_type == MediaType.TRACK:
- return await self.get_library_tracks()
- if media_type == MediaType.PLAYLIST:
- return await self.get_library_playlists()
- if media_type == MediaType.RADIO:
- return await self.get_library_radios()
-
async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType:
"""Get single MediaItem from provider."""
if media_type == MediaType.ARTIST:
if media_type == MediaType.RADIO:
return await self.get_radio(prov_item_id)
- async def sync(self) -> None:
- """Run/schedule sync for this provider."""
- await self.mass.music.run_provider_sync(self.id)
+ async def sync_library(self) -> None:
+ """Run library sync for this provider."""
+ # this reference implementation can be overridden with provider specific approach
+ # this logic is aimed at streaming/online providers,
+ # which all have more or less the same structure.
+ # filesystem implementation(s) just override this.
+ for media_type in self.supported_mediatypes:
+
+ self.logger.debug("Start sync of %s items.", media_type.value)
+ controller = self.mass.music.get_controller(media_type)
+
+ # create a set of all previous and current db id's
+ prev_ids = set()
+ for db_item in await controller.library():
+ for prov_id in db_item.provider_ids:
+ if prov_id.prov_id == self.id:
+ prev_ids.add(db_item.item_id)
+ cur_ids = set()
+ async for prov_item in self._get_library_gen(media_type)():
+ prov_item: MediaItemType = prov_item
+
+ db_item: MediaItemType = await controller.get_db_item_by_prov_id(
+ provider_item_id=prov_item.item_id,
+ provider=prov_item.provider,
+ )
+ if not db_item:
+ # dump the item in the db, rich metadata is lazy loaded later
+ db_item = await controller.add_db_item(prov_item)
+ elif (
+ db_item.metadata.checksum and prov_item.metadata.checksum
+ ) and db_item.metadata.checksum != prov_item.metadata.checksum:
+ # item checksum changed
+ db_item = await controller.update_db_item(
+ db_item.item_id, prov_item
+ )
+ cur_ids.add(db_item.item_id)
+ if not db_item.in_library:
+ await controller.set_db_library(db_item.item_id, True)
+
+ # process deletions
+ for item_id in prev_ids:
+ if item_id not in cur_ids:
+ # only mark the item as not in library and leave the metadata in db
+ await controller.set_db_library(item_id, False)
+
+ # DO NOT OVERRIDE BELOW
+
+ @property
+ def id(self) -> str:
+ """Return unique provider id to distinguish multiple instances of the same provider."""
+ return self.config.id
+
+ def _get_library_gen(self, media_type: MediaType) -> AsyncGenerator[MediaItemType]:
+ """Return library generator for given media_type."""
+ if media_type == MediaType.ARTIST:
+ return self.get_library_artists
+ if media_type == MediaType.ALBUM:
+ return self.get_library_albums
+ if media_type == MediaType.TRACK:
+ return self.get_library_tracks
+ if media_type == MediaType.PLAYLIST:
+ return self.get_library_playlists
+ if media_type == MediaType.RADIO:
+ return self.get_library_radios
unidecode>=1.0,<=1.3.4
mashumaro>=3.0,<=3.1
tinytag>=1.6,<=1.8.1
+xmltodict>=0.12.0,<=0.13.0
from music_assistant.helpers import uri, util
from music_assistant.models import media_items
+from music_assistant.models.enums import ProviderType
from music_assistant.models.errors import MusicAssistantError
test_uri = "spotify://track/123456789"
media_type, provider, item_id = uri.parse_uri(test_uri)
assert media_type == media_items.MediaType.TRACK
- assert provider == "spotify"
+ assert provider == ProviderType.SPOTIFY
assert item_id == "123456789"
# test spotify uri
test_uri = "spotify:track:123456789"
media_type, provider, item_id = uri.parse_uri(test_uri)
assert media_type == media_items.MediaType.TRACK
- assert provider == "spotify"
+ assert provider == ProviderType.SPOTIFY
assert item_id == "123456789"
# test public play/open url
test_uri = (
)
media_type, provider, item_id = uri.parse_uri(test_uri)
assert media_type == media_items.MediaType.PLAYLIST
- assert provider == "spotify"
+ assert provider == ProviderType.SPOTIFY
assert item_id == "5lH9NjOeJvctAO92ZrKQNB"
# test filename with slashes as item_id
test_uri = "filesystem://track/Artist/Album/Track.flac"
media_type, provider, item_id = uri.parse_uri(test_uri)
assert media_type == media_items.MediaType.TRACK
- assert provider == "filesystem"
+ assert provider == ProviderType.FILESYSTEM_LOCAL
assert item_id == "Artist/Album/Track.flac"
# test invalid uri
with raises(MusicAssistantError):