From: Marcel van der Veldt Date: Thu, 12 May 2022 00:07:14 +0000 (+0200) Subject: Refactor config and setup (#309) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=eae7edc37588616014f50da912b18a70e3cce893;p=music-assistant-server.git Refactor config and setup (#309) * Allow sync to be started manually --- diff --git a/examples/full.py b/examples/full.py index 72a6f5ff..a01b739c 100644 --- a/examples/full.py +++ b/examples/full.py @@ -5,12 +5,10 @@ import logging import os from music_assistant.mass import MusicAssistant +from music_assistant.models.config import MassConfig from music_assistant.models.player import Player, PlayerState from music_assistant.models.player_queue import RepeatMode -from music_assistant.providers.filesystem import FileSystemProvider -from music_assistant.providers.qobuz import QobuzProvider -from music_assistant.providers.spotify import SpotifyProvider -from music_assistant.providers.tunein import TuneInProvider + parser = argparse.ArgumentParser(description="MusicAssistant") parser.add_argument( @@ -76,15 +74,20 @@ if not os.path.isdir(data_dir): db_file = os.path.join(data_dir, "music_assistant.db") -providers = [] -if args.spotify_username and args.spotify_password: - providers.append(SpotifyProvider(args.spotify_username, args.spotify_password)) -if args.qobuz_username and args.qobuz_password: - providers.append(QobuzProvider(args.qobuz_username, args.qobuz_password)) -if args.tunein_username: - providers.append(TuneInProvider(args.tunein_username)) -if args.musicdir: - providers.append(FileSystemProvider(args.musicdir, args.playlistdir)) +mass_conf = MassConfig( + database_url=f"sqlite:///{db_file}", + spotify_enabled=args.spotify_username and args.spotify_password, + spotify_username=args.spotify_username, + spotify_password=args.spotify_password, + qobuz_enabled=args.qobuz_username and args.qobuz_password, + qobuz_username=args.qobuz_username, + qobuz_password=args.qobuz_password, + tunein_enabled=args.tunein_username is not None, + tunein_username=args.tunein_username, + filesystem_enabled=args.musicdir is not None, + filesystem_music_dir=args.musicdir, + filesystem_playlists_dir=args.playlistdir, +) class TestPlayer(Player): @@ -147,11 +150,11 @@ async def main(): asyncio.get_event_loop().set_debug(args.debug) - async with MusicAssistant(f"sqlite:///{db_file}") as mass: + async with MusicAssistant(mass_conf) as mass: + + # start sync + await mass.music.start_sync() - # register music provider(s) - for prov in providers: - await mass.music.register_provider(prov) # get some data artists = await mass.music.artists.count() print(f"Got {artists} artists in library") diff --git a/music_assistant/controllers/music/__init__.py b/music_assistant/controllers/music/__init__.py index 1546b61b..02a36328 100755 --- a/music_assistant/controllers/music/__init__.py +++ b/music_assistant/controllers/music/__init__.py @@ -19,14 +19,8 @@ from music_assistant.helpers.database import ( ) from music_assistant.helpers.datetime import utc_timestamp from music_assistant.helpers.uri import parse_uri -from music_assistant.helpers.util import run_periodic -from music_assistant.models.enums import EventType, MediaType -from music_assistant.models.errors import ( - AlreadyRegisteredError, - MusicAssistantError, - SetupFailedError, -) -from music_assistant.models.event import MassEvent +from music_assistant.models.enums import MediaType +from music_assistant.models.errors import MusicAssistantError, SetupFailedError from music_assistant.models.media_items import ( MediaItem, MediaItemProviderId, @@ -34,9 +28,16 @@ from music_assistant.models.media_items import ( ) from music_assistant.models.provider import MusicProvider +from .providers.filesystem import FileSystemProvider +from .providers.qobuz import QobuzProvider +from .providers.spotify import SpotifyProvider +from .providers.tunein import TuneInProvider + if TYPE_CHECKING: from music_assistant.mass import MusicAssistant +PROVIDERS = (FileSystemProvider, QobuzProvider, SpotifyProvider, TuneInProvider) + class MusicController: """Several helpers around the musicproviders.""" @@ -54,7 +55,26 @@ class MusicController: async def setup(self): """Async initialize of module.""" - self.mass.create_task(self.__periodic_sync) + # register providers + for prov in PROVIDERS: + await self._register_provider(prov()) + + async def start_sync(self, schedule: Optional[float] = 3) -> None: + """ + Start running the sync of all registred providers. + + schedule: schedule syncjob every X hours, set to None for just a manual sync run. + """ + + async def do_sync(): + while True: + for prov in self.providers: + await self.run_provider_sync(prov.id) + if schedule is None: + return + await asyncio.sleep(3600 * schedule) + + self.mass.create_task(do_sync()) @property def provider_count(self) -> int: @@ -73,32 +93,6 @@ class MusicController: self.logger.warning("Provider %s is not available", provider_id) return prov - async def register_provider(self, provider: MusicProvider) -> None: - """Register a music provider.""" - if provider.id in self._providers: - raise AlreadyRegisteredError( - f"Provider {provider.id} is already registered" - ) - try: - provider.mass = self.mass - provider.cache = self.mass.cache - provider.logger = self.logger.getChild(provider.id) - await provider.setup() - except Exception as err: # pylint: disable=broad-except - raise SetupFailedError( - f"Setup failed of provider {provider.id}: {str(err)}" - ) from err - else: - self._providers[provider.id] = provider - self.mass.signal_event( - MassEvent( - EventType.PROVIDER_REGISTERED, - object_id=provider.id, - data=provider.id, - ) - ) - self.mass.create_task(self.run_provider_sync(provider.id)) - async def search( self, search_query, media_types: List[MediaType], limit: int = 10 ) -> List[MediaItemType]: @@ -359,13 +353,8 @@ class MusicController: job_desc, ) - async def trigger_sync(self) -> None: - """Trigger sync of all providers.""" - for prov in self.providers: - await self.run_provider_sync(prov.id) - async def run_provider_sync(self, provider_id: str) -> None: - """Run library sync for a provider.""" + """Run/schedule library sync for a provider.""" provider = self.get_provider(provider_id) if not provider: return @@ -379,6 +368,21 @@ class MusicController: allow_duplicate=False, ) + def get_controller( + self, media_type: MediaType + ) -> ArtistsController | AlbumsController | TracksController | RadioController | PlaylistController: + """Return controller for MediaType.""" + if media_type == MediaType.ARTIST: + return self.artists + if media_type == MediaType.ALBUM: + return self.albums + if media_type == MediaType.TRACK: + return self.tracks + if media_type == MediaType.RADIO: + return self.radio + if media_type == MediaType.PLAYLIST: + return self.playlists + async def _library_items_sync( self, media_type: MediaType, provider_id: str ) -> None: @@ -431,29 +435,19 @@ class MusicController: if provider_id == "filesystem": if db_item := await controller.get_db_item(item_id): db_item.provider_ids = { - x - for x in db_item.provider_ids - if not (x.provider == provider_id) + x for x in db_item.provider_ids if x.provider != provider_id } await controller.update_db_item(item_id, db_item, True) - def get_controller( - self, media_type: MediaType - ) -> ArtistsController | AlbumsController | TracksController | RadioController | PlaylistController: - """Return controller for MediaType.""" - if media_type == MediaType.ARTIST: - return self.artists - if media_type == MediaType.ALBUM: - return self.albums - if media_type == MediaType.TRACK: - return self.tracks - if media_type == MediaType.RADIO: - return self.radio - if media_type == MediaType.PLAYLIST: - return self.playlists - - @run_periodic(3 * 3600, True) - async def __periodic_sync(self): - """Periodically sync all providers.""" - for prov in self.providers: - await self.run_provider_sync(prov.id) + async def _register_provider(self, provider: MusicProvider) -> None: + """Register a music provider.""" + try: + provider.mass = self.mass + provider.cache = self.mass.cache + provider.logger = self.logger.getChild(provider.id) + if await provider.setup(): + self._providers[provider.id] = provider + except Exception as err: # pylint: disable=broad-except + raise SetupFailedError( + f"Setup failed of provider {provider.id}: {str(err)}" + ) from err diff --git a/music_assistant/controllers/music/providers/__init__.py b/music_assistant/controllers/music/providers/__init__.py new file mode 100644 index 00000000..01895ef6 --- /dev/null +++ b/music_assistant/controllers/music/providers/__init__.py @@ -0,0 +1 @@ +"""Package with Music Providers.""" diff --git a/music_assistant/controllers/music/providers/filesystem.py b/music_assistant/controllers/music/providers/filesystem.py new file mode 100644 index 00000000..a9a1707a --- /dev/null +++ b/music_assistant/controllers/music/providers/filesystem.py @@ -0,0 +1,531 @@ +"""Filesystem musicprovider support for MusicAssistant.""" +from __future__ import annotations + +import base64 +import os +from typing import Dict, List, Optional, Tuple + +import aiofiles +from tinytag.tinytag import TinyTag, TinyTagException + +from music_assistant.helpers.compare import compare_strings +from music_assistant.helpers.util import parse_title_and_version, try_parse_int +from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError +from music_assistant.models.media_items import ( + Album, + AlbumType, + Artist, + ContentType, + ImageType, + MediaItemImage, + MediaItemProviderId, + MediaItemType, + MediaQuality, + MediaType, + Playlist, + StreamDetails, + StreamType, + Track, +) +from music_assistant.models.provider import MusicProvider + + +def split_items(org_str: str, splitters: Tuple[str] = None) -> Tuple[str]: + """Split up a tags string by common splitter.""" + if splitters is None: + splitters = ("/", ";", ",") + if org_str is None: + return tuple() + for splitter in splitters: + if splitter in org_str: + return tuple((x.strip() for x in org_str.split(splitter))) + return (org_str,) + + +FALLBACK_ARTIST = "Various Artists" +ARTIST_SPLITTERS = (";", ",", "Featuring", " Feat. ", " Feat ", "feat.", " & ") + + +class FileSystemProvider(MusicProvider): + """ + Very basic implementation of a musicprovider for local files. + + Assumes files are stored on disk in format // + Reads ID3 tags from file and falls back to parsing filename + Supports m3u files only for playlists + Supports having URI's from streaming providers within m3u playlist + Should be compatible with LMS + """ + + _attr_id = "filesystem" + _attr_name = "Filesystem" + _playlists_dir = "" + _music_dir = "" + _attr_supported_mediatypes = [ + MediaType.ARTIST, + MediaType.ALBUM, + MediaType.TRACK, + MediaType.PLAYLIST, + ] + + async def setup(self) -> bool: + """Handle async initialization of the provider.""" + if not self.mass.config.filesystem_enabled: + return False + + self._music_dir = self.mass.config.filesystem_music_dir + self._playlists_dir = ( + self.mass.config.filesystem_playlists_dir or self._music_dir + ) + + if not os.path.isdir(self._music_dir): + raise MediaNotFoundError( + f"Music Directory {self._music_dir} does not exist" + ) + + if not os.path.isdir(self._playlists_dir): + raise MediaNotFoundError( + f"Playlist Directory {self._playlists_dir} does not exist" + ) + return True + + async def search( + self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5 + ) -> List[MediaItemType]: + """ + Perform search on musicprovider. + + :param search_query: Search query. + :param media_types: A list of media_types to include. All types if None. + :param limit: Number of items to return in the search (per type). + """ + result = [] + for track in await self.get_library_tracks(True): + for search_part in search_query.split(" - "): + if media_types is None or MediaType.TRACK in media_types: + if compare_strings(track.name, search_part): + result.append(track) + if media_types is None or MediaType.ALBUM in media_types: + if track.album: + if compare_strings(track.album.name, search_part): + result.append(track.album) + if media_types is None or MediaType.ARTIST in media_types: + if track.album and track.album.artist: + if compare_strings(track.album.artist, search_part): + result.append(track.album.artist) + return result + + async def get_library_artists(self) -> List[Artist]: + """Retrieve all library artists.""" + result = [] + cur_ids = set() + # for the sake of simplicity we only iterate over the files in one location only, + # which is the library tracks where we recursively enumerate the directory structure + # library artists = unique album artists across all tracks + # the track listing is cached so this should be (pretty) fast + for track in await self.get_library_tracks(True): + if track.album is None or track.album is None: + continue + if track.album.artist.item_id in cur_ids: + continue + result.append(track.album.artist) + cur_ids.add(track.album.artist.item_id) + return result + + async def get_library_albums(self) -> List[Album]: + """Get album folders recursively.""" + result = [] + cur_ids = set() + # for the sake of simplicity we only iterate over the files in one location only, + # which is the library tracks where we recurisvely enumerate the directory structure + # library albums = unique albums across all tracks + # the track listing is cached so this should be (pretty) fast + for track in await self.get_library_tracks(True): + if track.album is None: + continue + if track.album.item_id in cur_ids: + continue + result.append(track.album) + cur_ids.add(track.album.item_id) + return result + + async def get_library_tracks(self, use_cache=False) -> List[Track]: + """Get all tracks recursively.""" + # pylint: disable=arguments-differ + # we cache the entire tracks listing for performance and convenience reasons + # so we can easy retrieve the library artists and albums from the tracks listing + cache_key = f"{self.id}.tracks" + cache_result: Dict[str, dict] = await self.mass.cache.get( + cache_key, checksum=self._music_dir + ) + if cache_result is not None and use_cache: + return [Track.from_dict(x) for x in cache_result.values()] + if cache_result is None: + cache_result = {} + + # find all music files in the music directory and all subfolders + result = [] + for _root, _dirs, _files in os.walk(self._music_dir): + for file in _files: + filename = os.path.join(_root, file) + checksum = self._get_checksum(filename) + prov_item_id = self._get_item_id(filename) + cache_track = cache_result.get(prov_item_id) + # we do not want to parse tags if there are no changes to the file + # so we speedup the sync by comparing a checksum + if cache_track and cache_track["metadata"].get("checksum") == checksum: + # checksum did not change, use cached track + result.append(Track.from_dict(cache_track)) + elif track := await self._parse_track(filename): + cache_result[prov_item_id] = track.to_dict() + result.append(track) + # store cache listing in cache + await self.mass.cache.set(cache_key, cache_result, self._music_dir) + return result + + async def get_library_playlists(self) -> List[Playlist]: + """Retrieve playlists from disk.""" + if not self._playlists_dir: + return [] + result = [] + cur_ids = set() + for filename in os.listdir(self._playlists_dir): + filepath = os.path.join(self._playlists_dir, filename) + if ( + os.path.isfile(filepath) + and not filename.startswith(".") + and filename.lower().endswith(".m3u") + ): + playlist = await self._parse_playlist(filepath) + if playlist: + result.append(playlist) + cur_ids.add(playlist.item_id) + return result + + async def get_artist(self, prov_artist_id: str) -> Artist: + """Get full artist details by id.""" + if album_artist := next( + ( + track.album.artist + for track in await self.get_library_tracks(True) + if track.album is not None + and track.album.artist is not None + and track.album.artist.item_id == prov_artist_id + ), + None, + ): + return album_artist + # fallback to track_artist + for track in await self.get_library_tracks(True): + for artist in track.artists: + if artist.item_id == prov_artist_id: + return artist + return None + + async def get_album(self, prov_album_id: str) -> Album: + """Get full album details by id.""" + return next( + ( + track.album + for track in await self.get_library_tracks(True) + if track.album is not None and track.album.item_id == prov_album_id + ), + None, + ) + + async def get_track(self, prov_track_id: str) -> Track: + """Get full track details by id.""" + itempath = self._get_filename(prov_track_id) + if not os.path.isfile(itempath): + raise MediaNotFoundError(f"Track path does not exist: {itempath}") + return await self._parse_track(itempath) + + async def get_playlist(self, prov_playlist_id: str) -> Playlist: + """Get full playlist details by id.""" + itempath = self._get_filename(prov_playlist_id) + if not os.path.isfile(itempath): + raise MediaNotFoundError(f"playlist path does not exist: {itempath}") + return await self._parse_playlist(itempath) + + async def get_album_tracks(self, prov_album_id) -> List[Track]: + """Get album tracks for given album id.""" + return [ + track + for track in await self.get_library_tracks(True) + if track.album is not None and track.album.item_id == prov_album_id + ] + + async def get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]: + """Get playlist tracks for given playlist id.""" + result = [] + itempath = self._get_filename(prov_playlist_id) + if not os.path.isfile(itempath): + raise MediaNotFoundError(f"playlist path does not exist: {itempath}") + index = 0 + async with aiofiles.open(itempath, "r") as _file: + for line in await _file.readlines(): + line = line.strip() + if line and not line.startswith("#"): + if track := await self._parse_track_from_uri(line): + result.append(track) + index += 1 + return result + + async def get_artist_albums(self, prov_artist_id: str) -> List[Album]: + """Get a list of albums for the given artist.""" + result = [] + cur_ids = set() + for track in await self.get_library_tracks(True): + if track.album is None: + continue + if track.album.item_id in cur_ids: + continue + if track.album.artist is None: + continue + if track.album.artist.item_id != prov_artist_id: + continue + result.append(track.album) + cur_ids.add(track.album.item_id) + return result + + async def get_artist_toptracks(self, prov_artist_id: str) -> List[Track]: + """Get a list of all tracks as we have no clue about preference.""" + return [ + track + for track in await self.get_library_tracks(True) + if track.artists is not None + and ( + (prov_artist_id in (x.item_id for x in track.artists)) + or ( + track.album is not None + and track.album.artist is not None + and track.album.artist.item_id == prov_artist_id + ) + ) + ] + + async def get_stream_details(self, item_id: str) -> StreamDetails: + """Return the content details for the given track when it will be streamed.""" + itempath = self._get_filename(item_id) + if not os.path.isfile(itempath): + raise MediaNotFoundError(f"Track path does not exist: {itempath}") + + def parse_tag(): + return TinyTag.get(itempath) + + tags = await self.mass.loop.run_in_executor(None, parse_tag) + + return StreamDetails( + type=StreamType.FILE, + provider=self.id, + item_id=item_id, + content_type=ContentType(itempath.split(".")[-1]), + path=itempath, + sample_rate=tags.samplerate or 44100, + bit_depth=16, # TODO: parse bitdepth + ) + + async def get_embedded_image(self, filename: str) -> str | None: + """Return the embedded image of an audio file as base64 string.""" + if not TinyTag.is_supported(filename): + return None + + def parse_tags(): + return TinyTag.get(filename, tags=True, image=True, ignore_errors=True) + + tags = await self.mass.loop.run_in_executor(None, parse_tags) + if image_data := tags.get_image(): + enc_image = base64.b64encode(image_data).decode() + enc_image = f"data:image/png;base64,{enc_image}" + return enc_image + + async def _parse_track(self, filename: str) -> Track | None: + """Try to parse a track from a filename by reading its tags.""" + if not TinyTag.is_supported(filename): + return None + + def parse_tags(): + return TinyTag.get(filename, image=True, ignore_errors=True) + + # parse ID3 tags with TinyTag + try: + tags = await self.mass.loop.run_in_executor(None, parse_tags) + except TinyTagException as err: + self.logger.error("Error processing %s: %s", filename, str(err)) + + prov_item_id = self._get_item_id(filename) + + # work out if we have an artist/album/track.ext structure + filename_base = filename.replace(self._music_dir, "") + if filename_base.startswith(os.sep): + filename_base = filename_base[1:] + track_parts = filename_base.rsplit(os.sep) + if len(track_parts) == 3: + album_artist_name = track_parts[0] + album_name = track_parts[1] + else: + album_artist_name = tags.albumartist + album_name = tags.album + + # prefer title from tag, fallback to filename + if tags.title: + track_title = tags.title + else: + ext = filename_base.split(".")[-1] + track_title = filename_base.replace(f".{ext}", "").replace("_", " ") + self.logger.warning( + "%s is missing ID3 tags, use filename as fallback", filename_base + ) + + name, version = parse_title_and_version(track_title) + track = Track( + item_id=prov_item_id, provider=self.id, name=name, version=version + ) + + # Parse track artist(s) from artist string using common splitters used in ID3 tags + # NOTE: do not use a '/' or '&' to prevent artists like AC/DC become messed up + track_artists_str = tags.artist or album_artist_name or FALLBACK_ARTIST + track.artists = [ + Artist( + item_id=item, + provider=self._attr_id, + name=item, + ) + for item in split_items(track_artists_str, ARTIST_SPLITTERS) + ] + + # Check if track has embedded metadata + if tags.get_image(): + # we do not actually embed the image in the metadata because that would consume too + # much space and bandwidth. Instead we set the filename as value so the image can + # be retrieved later in realtime. + track.metadata.images = {MediaItemImage(ImageType.EMBEDDED_THUMB, filename)} + + # Parse album (only if we have album + album artist tags) + if album_name and album_artist_name: + album_id = album_name + album_name, album_version = parse_title_and_version(album_name) + track.album = Album( + item_id=album_id, + provider=self._attr_id, + name=album_name, + version=album_version, + year=try_parse_int(tags.year) if tags.year else None, + artist=Artist( + item_id=album_artist_name, + provider=self._attr_id, + name=album_artist_name, + ), + ) + track.album.metadata.images = track.metadata.images + + # try to guess the album type + if name.lower() == album_name.lower(): + track.album.album_type = AlbumType.SINGLE + elif album_artist_name not in (x.name for x in track.artists): + track.album.album_type = AlbumType.COMPILATION + else: + track.album.album_type = AlbumType.ALBUM + + # parse other info + track.duration = tags.duration + track.metadata.genres = set(split_items(tags.genre)) + track.disc_number = try_parse_int(tags.disc) + track.track_number = try_parse_int(tags.track) + track.isrc = tags.extra.get("isrc", "") + if "copyright" in tags.extra: + track.metadata.copyright = tags.extra["copyright"] + if "lyrics" in tags.extra: + track.metadata.lyrics = tags.extra["lyrics"] + # store last modified time as checksum + track.metadata.checksum = self._get_checksum(filename) + + quality_details = "" + if filename.endswith(".flac"): + # TODO: get bit depth + quality = MediaQuality.FLAC_LOSSLESS + if tags.samplerate > 192000: + quality = MediaQuality.FLAC_LOSSLESS_HI_RES_4 + elif tags.samplerate > 96000: + quality = MediaQuality.FLAC_LOSSLESS_HI_RES_3 + elif tags.samplerate > 48000: + quality = MediaQuality.FLAC_LOSSLESS_HI_RES_2 + quality_details = f"{tags.samplerate / 1000} Khz" + elif filename.endswith(".ogg"): + quality = MediaQuality.LOSSY_OGG + quality_details = f"{tags.bitrate} kbps" + elif filename.endswith(".m4a"): + quality = MediaQuality.LOSSY_AAC + quality_details = f"{tags.bitrate} kbps" + else: + quality = MediaQuality.LOSSY_MP3 + quality_details = f"{tags.bitrate} kbps" + track.add_provider_id( + MediaItemProviderId( + provider=self.id, + item_id=prov_item_id, + quality=quality, + details=quality_details, + url=filename, + ) + ) + return track + + async def _parse_playlist(self, filename: str) -> Playlist | None: + """Parse playlist from file.""" + # use the relative filename as item_id + filename_base = filename.replace(self._music_dir, "") + if filename_base.startswith(os.sep): + filename_base = filename_base[1:] + prov_item_id = filename_base + + name = filename.split(os.sep)[-1].replace(".m3u", "") + + playlist = Playlist(prov_item_id, provider=self.id, name=name) + playlist.is_editable = True + playlist.add_provider_id( + MediaItemProviderId(provider=self.id, item_id=prov_item_id, url=filename) + ) + playlist.owner = self._attr_name + playlist.metadata.checksum = self._get_checksum(filename) + return playlist + + async def _parse_track_from_uri(self, uri): + """Try to parse a track from an uri found in playlist.""" + if "://" in uri: + # track is uri from external provider? + try: + return await self.mass.music.get_item_by_uri(uri) + except MusicAssistantError as err: + self.logger.warning( + "Could not parse uri %s to track: %s", uri, str(err) + ) + return None + # try to treat uri as filename + try: + return await self.get_track(uri) + except MediaNotFoundError: + return None + + def _get_filename(self, item_id: str, playlist: bool = False) -> str: + """Get filename for item_id.""" + if self._music_dir in item_id: + return item_id + if playlist: + return os.path.join(self._playlists_dir, item_id) + return os.path.join(self._music_dir, item_id) + + def _get_item_id(self, filename: str, playlist: bool = False) -> str: + """Return item_id for given filename.""" + # we simply use the base filename as item_id + base_path = self._playlists_dir if playlist else self._music_dir + filename_base = filename.replace(base_path, "") + if filename_base.startswith(os.sep): + filename_base = filename_base[1:] + return filename_base + + @staticmethod + def _get_checksum(filename: str) -> str: + """Get checksum for file.""" + # use last modified time as checksum + return str(os.path.getmtime(filename)) diff --git a/music_assistant/controllers/music/providers/librespot/linux/librespot-aarch64 b/music_assistant/controllers/music/providers/librespot/linux/librespot-aarch64 new file mode 100755 index 00000000..5359098f Binary files /dev/null and b/music_assistant/controllers/music/providers/librespot/linux/librespot-aarch64 differ diff --git a/music_assistant/controllers/music/providers/librespot/linux/librespot-arm b/music_assistant/controllers/music/providers/librespot/linux/librespot-arm new file mode 100755 index 00000000..5cd38c75 Binary files /dev/null and b/music_assistant/controllers/music/providers/librespot/linux/librespot-arm differ diff --git a/music_assistant/controllers/music/providers/librespot/linux/librespot-armhf b/music_assistant/controllers/music/providers/librespot/linux/librespot-armhf new file mode 100755 index 00000000..18c2e05b Binary files /dev/null and b/music_assistant/controllers/music/providers/librespot/linux/librespot-armhf differ diff --git a/music_assistant/controllers/music/providers/librespot/linux/librespot-armv7 b/music_assistant/controllers/music/providers/librespot/linux/librespot-armv7 new file mode 100755 index 00000000..0a792b2e Binary files /dev/null and b/music_assistant/controllers/music/providers/librespot/linux/librespot-armv7 differ diff --git a/music_assistant/controllers/music/providers/librespot/linux/librespot-x86_64 b/music_assistant/controllers/music/providers/librespot/linux/librespot-x86_64 new file mode 100755 index 00000000..e025abdb Binary files /dev/null and b/music_assistant/controllers/music/providers/librespot/linux/librespot-x86_64 differ diff --git a/music_assistant/controllers/music/providers/librespot/osx/librespot b/music_assistant/controllers/music/providers/librespot/osx/librespot new file mode 100755 index 00000000..c1b37543 Binary files /dev/null and b/music_assistant/controllers/music/providers/librespot/osx/librespot differ diff --git a/music_assistant/controllers/music/providers/librespot/windows/librespot.exe b/music_assistant/controllers/music/providers/librespot/windows/librespot.exe new file mode 100755 index 00000000..a973f4e1 Binary files /dev/null and b/music_assistant/controllers/music/providers/librespot/windows/librespot.exe differ diff --git a/music_assistant/controllers/music/providers/qobuz.py b/music_assistant/controllers/music/providers/qobuz.py new file mode 100644 index 00000000..0f8f10a9 --- /dev/null +++ b/music_assistant/controllers/music/providers/qobuz.py @@ -0,0 +1,748 @@ +"""Qobuz musicprovider support for MusicAssistant.""" +from __future__ import annotations + +import datetime +import hashlib +import time +from json import JSONDecodeError +from typing import List, Optional + +import aiohttp +from asyncio_throttle import Throttler + +from music_assistant.helpers.app_vars import ( # pylint: disable=no-name-in-module + app_var, +) +from music_assistant.helpers.cache import use_cache +from music_assistant.helpers.util import parse_title_and_version, try_parse_int +from music_assistant.models.enums import EventType +from music_assistant.models.errors import LoginFailed +from music_assistant.models.event import MassEvent +from music_assistant.models.media_items import ( + Album, + AlbumType, + Artist, + ContentType, + ImageType, + MediaItemImage, + MediaItemProviderId, + MediaItemType, + MediaQuality, + MediaType, + Playlist, + StreamDetails, + StreamType, + Track, +) +from music_assistant.models.provider import MusicProvider + + +class QobuzProvider(MusicProvider): + """Provider for the Qobux music service.""" + + _attr_id = "qobuz" + _attr_name = "Qobuz" + _attr_supported_mediatypes = [ + MediaType.ARTIST, + MediaType.ALBUM, + MediaType.TRACK, + MediaType.PLAYLIST, + ] + _user_auth_info = None + _throttler = Throttler(rate_limit=4, period=1) + + async def setup(self) -> bool: + """Handle async initialization of the provider.""" + if not self.mass.config.qobuz_enabled: + return False + if not self.mass.config.qobuz_username or not self.mass.config.qobuz_password: + raise LoginFailed("Invalid login credentials") + # try to get a token, raise if that fails + token = await self._auth_token() + if not token: + raise LoginFailed( + f"Login failed for user {self.mass.config.qobuz_username}" + ) + # subscribe to stream events so we can report playback to Qobuz + self.mass.subscribe( + self.on_stream_event, + (EventType.STREAM_STARTED, EventType.STREAM_ENDED), + id_filter=self.id, + ) + return True + + async def search( + self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5 + ) -> List[MediaItemType]: + """ + Perform search on musicprovider. + + :param search_query: Search query. + :param media_types: A list of media_types to include. All types if None. + :param limit: Number of items to return in the search (per type). + """ + result = [] + params = {"query": search_query, "limit": limit} + if len(media_types) == 1: + # qobuz does not support multiple searchtypes, falls back to all if no type given + if media_types[0] == MediaType.ARTIST: + params["type"] = "artists" + if media_types[0] == MediaType.ALBUM: + params["type"] = "albums" + if media_types[0] == MediaType.TRACK: + params["type"] = "tracks" + if media_types[0] == MediaType.PLAYLIST: + params["type"] = "playlists" + if searchresult := await self._get_data("catalog/search", **params): + if "artists" in searchresult: + result += [ + await self._parse_artist(item) + for item in searchresult["artists"]["items"] + if (item and item["id"]) + ] + if "albums" in searchresult: + result += [ + await self._parse_album(item) + for item in searchresult["albums"]["items"] + if (item and item["id"]) + ] + if "tracks" in searchresult: + result += [ + await self._parse_track(item) + for item in searchresult["tracks"]["items"] + if (item and item["id"]) + ] + if "playlists" in searchresult: + result += [ + await self._parse_playlist(item) + for item in searchresult["playlists"]["items"] + if (item and item["id"]) + ] + return result + + async def get_library_artists(self) -> List[Artist]: + """Retrieve all library artists from Qobuz.""" + endpoint = "favorite/getUserFavorites" + return [ + await self._parse_artist(item) + for item in await self._get_all_items( + endpoint, key="artists", type="artists" + ) + if (item and item["id"]) + ] + + async def get_library_albums(self) -> List[Album]: + """Retrieve all library albums from Qobuz.""" + endpoint = "favorite/getUserFavorites" + return [ + await self._parse_album(item) + for item in await self._get_all_items(endpoint, key="albums", type="albums") + if (item and item["id"]) + ] + + async def get_library_tracks(self) -> List[Track]: + """Retrieve library tracks from Qobuz.""" + endpoint = "favorite/getUserFavorites" + return [ + await self._parse_track(item) + for item in await self._get_all_items(endpoint, key="tracks", type="tracks") + if (item and item["id"]) + ] + + async def get_library_playlists(self) -> List[Playlist]: + """Retrieve all library playlists from the provider.""" + endpoint = "playlist/getUserPlaylists" + return [ + await self._parse_playlist(item) + for item in await self._get_all_items(endpoint, key="playlists") + if (item and item["id"]) + ] + + async def get_artist(self, prov_artist_id) -> Artist: + """Get full artist details by id.""" + params = {"artist_id": prov_artist_id} + artist_obj = await self._get_data("artist/get", **params) + return ( + await self._parse_artist(artist_obj) + if artist_obj and artist_obj["id"] + else None + ) + + async def get_album(self, prov_album_id) -> Album: + """Get full album details by id.""" + params = {"album_id": prov_album_id} + album_obj = await self._get_data("album/get", **params) + return ( + await self._parse_album(album_obj) + if album_obj and album_obj["id"] + else None + ) + + async def get_track(self, prov_track_id) -> Track: + """Get full track details by id.""" + params = {"track_id": prov_track_id} + track_obj = await self._get_data("track/get", **params) + return ( + await self._parse_track(track_obj) + if track_obj and track_obj["id"] + else None + ) + + async def get_playlist(self, prov_playlist_id) -> Playlist: + """Get full playlist details by id.""" + params = {"playlist_id": prov_playlist_id} + playlist_obj = await self._get_data("playlist/get", **params) + return ( + await self._parse_playlist(playlist_obj) + if playlist_obj and playlist_obj["id"] + else None + ) + + async def get_album_tracks(self, prov_album_id) -> List[Track]: + """Get all album tracks for given album id.""" + params = {"album_id": prov_album_id} + return [ + await self._parse_track(item) + for item in await self._get_all_items("album/get", **params, key="tracks") + if (item and item["id"]) + ] + + async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]: + """Get all playlist tracks for given playlist id.""" + playlist = await self.get_playlist(prov_playlist_id) + endpoint = "playlist/get" + return [ + await self._parse_track(item) + for item in await self._get_all_items( + endpoint, + key="tracks", + playlist_id=prov_playlist_id, + extra="tracks", + cache_checksum=playlist.metadata.checksum, + ) + if (item and item["id"]) + ] + + async def get_artist_albums(self, prov_artist_id) -> List[Album]: + """Get a list of albums for the given artist.""" + endpoint = "artist/get" + return [ + await self._parse_album(item) + for item in await self._get_all_items( + endpoint, key="albums", artist_id=prov_artist_id, extra="albums" + ) + if (item and item["id"] and str(item["artist"]["id"]) == prov_artist_id) + ] + + async def get_artist_toptracks(self, prov_artist_id) -> List[Track]: + """Get a list of most popular tracks for the given artist.""" + result = await self._get_data( + "artist/get", + artist_id=prov_artist_id, + extra="playlists", + offset=0, + limit=25, + ) + if result and result["playlists"]: + return [ + await self._parse_track(item) + for item in result["playlists"][0]["tracks"]["items"] + if (item and item["id"]) + ] + # fallback to search + artist = await self.get_artist(prov_artist_id) + searchresult = await self._get_data( + "catalog/search", query=artist.name, limit=25, type="tracks" + ) + return [ + await self._parse_track(item) + for item in searchresult["tracks"]["items"] + if ( + item + and item["id"] + and "performer" in item + and str(item["performer"]["id"]) == str(prov_artist_id) + ) + ] + + async def get_similar_artists(self, prov_artist_id): + """Get similar artists for given artist.""" + # https://www.qobuz.com/api.json/0.2/artist/getSimilarArtists?artist_id=220020&offset=0&limit=3 + + async def library_add(self, prov_item_id, media_type: MediaType): + """Add item to library.""" + result = None + if media_type == MediaType.ARTIST: + result = await self._get_data( + "favorite/create", {"artist_ids": prov_item_id} + ) + elif media_type == MediaType.ALBUM: + result = await self._get_data( + "favorite/create", {"album_ids": prov_item_id} + ) + elif media_type == MediaType.TRACK: + result = await self._get_data( + "favorite/create", {"track_ids": prov_item_id} + ) + elif media_type == MediaType.PLAYLIST: + result = await self._get_data( + "playlist/subscribe", {"playlist_id": prov_item_id} + ) + return result + + async def library_remove(self, prov_item_id, media_type: MediaType): + """Remove item from library.""" + result = None + if media_type == MediaType.ARTIST: + result = await self._get_data( + "favorite/delete", {"artist_ids": prov_item_id} + ) + elif media_type == MediaType.ALBUM: + result = await self._get_data( + "favorite/delete", {"album_ids": prov_item_id} + ) + elif media_type == MediaType.TRACK: + result = await self._get_data( + "favorite/delete", {"track_ids": prov_item_id} + ) + elif media_type == MediaType.PLAYLIST: + playlist = await self.get_playlist(prov_item_id) + if playlist.is_editable: + result = await self._get_data( + "playlist/delete", {"playlist_id": prov_item_id} + ) + else: + result = await self._get_data( + "playlist/unsubscribe", {"playlist_id": prov_item_id} + ) + return result + + async def add_playlist_tracks( + self, prov_playlist_id: str, prov_track_ids: List[str] + ) -> None: + """Add track(s) to playlist.""" + return await self._get_data( + "playlist/addTracks", + playlist_id=prov_playlist_id, + track_ids=",".join(prov_track_ids), + playlist_track_ids=",".join(prov_track_ids), + ) + + async def remove_playlist_tracks( + self, prov_playlist_id: str, prov_track_ids: List[str] + ) -> None: + """Remove track(s) from playlist.""" + playlist_track_ids = set() + for track in await self._get_all_items( + "playlist/get", key="tracks", playlist_id=prov_playlist_id, extra="tracks" + ): + if str(track["id"]) in prov_track_ids: + playlist_track_ids.add(str(track["playlist_track_id"])) + return await self._get_data( + "playlist/deleteTracks", + playlist_id=prov_playlist_id, + playlist_track_ids=",".join(playlist_track_ids), + ) + + async def get_stream_details(self, item_id: str) -> StreamDetails: + """Return the content details for the given track when it will be streamed.""" + streamdata = None + for format_id in [27, 7, 6, 5]: + # it seems that simply requesting for highest available quality does not work + # from time to time the api response is empty for this request ?! + result = await self._get_data( + "track/getFileUrl", + sign_request=True, + format_id=format_id, + track_id=item_id, + intent="stream", + skip_cache=True, + ) + if result and result.get("url"): + streamdata = result + break + if not streamdata: + self.logger.error("Unable to retrieve stream details for track %s", item_id) + return None + if streamdata["mime_type"] == "audio/mpeg": + content_type = ContentType.MPEG + elif streamdata["mime_type"] == "audio/flac": + content_type = ContentType.FLAC + else: + self.logger.error("Unsupported mime type for track %s", item_id) + return None + return StreamDetails( + type=StreamType.URL, + item_id=str(item_id), + provider=self.id, + path=streamdata["url"], + content_type=content_type, + sample_rate=int(streamdata["sampling_rate"] * 1000), + bit_depth=streamdata["bit_depth"], + details=streamdata, # we need these details for reporting playback + ) + + async def on_stream_event(self, event: MassEvent): + """ + Received event from mass. + + We use this to report playback start/stop to qobuz. + """ + if not self._user_auth_info: + return + # TODO: need to figure out if the streamed track is purchased by user + # https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx + # {"albums":{"total":0,"items":[]},"tracks":{"total":0,"items":[]},"user":{"id":xxxx,"login":"xxxxx"}} + if event.type == EventType.STREAM_STARTED: + # report streaming started to qobuz + device_id = self._user_auth_info["user"]["device"]["id"] + credential_id = self._user_auth_info["user"]["credential"]["id"] + user_id = self._user_auth_info["user"]["id"] + format_id = event.data.details["format_id"] + timestamp = int(time.time()) + events = [ + { + "online": True, + "sample": False, + "intent": "stream", + "device_id": device_id, + "track_id": str(event.data.item_id), + "purchase": False, + "date": timestamp, + "credential_id": credential_id, + "user_id": user_id, + "local": False, + "format_id": format_id, + } + ] + await self._post_data("track/reportStreamingStart", data=events) + elif event.type == EventType.STREAM_ENDED: + # report streaming ended to qobuz + user_id = self._user_auth_info["user"]["id"] + await self._get_data( + "/track/reportStreamingEnd", + user_id=user_id, + track_id=str(event.data.item_id), + duration=try_parse_int(event.data.seconds_played), + ) + + async def _parse_artist(self, artist_obj: dict): + """Parse qobuz artist object to generic layout.""" + artist = Artist( + item_id=str(artist_obj["id"]), provider=self.id, name=artist_obj["name"] + ) + artist.add_provider_id( + MediaItemProviderId( + provider=self.id, + item_id=str(artist_obj["id"]), + url=artist_obj.get( + "url", f'https://open.qobuz.com/artist/{artist_obj["id"]}' + ), + ) + ) + if img := self.__get_image(artist_obj): + artist.metadata.images = {MediaItemImage(ImageType.THUMB, img)} + if artist_obj.get("biography"): + artist.metadata.description = artist_obj["biography"].get("content") + return artist + + async def _parse_album(self, album_obj: dict, artist_obj: dict = None): + """Parse qobuz album object to generic layout.""" + if not artist_obj and "artist" not in album_obj: + # artist missing in album info, return full abum instead + return await self.get_album(album_obj["id"]) + name, version = parse_title_and_version( + album_obj["title"], album_obj.get("version") + ) + album = Album( + item_id=str(album_obj["id"]), provider=self.id, name=name, version=version + ) + if album_obj["maximum_sampling_rate"] > 192: + quality = MediaQuality.FLAC_LOSSLESS_HI_RES_4 + elif album_obj["maximum_sampling_rate"] > 96: + quality = MediaQuality.FLAC_LOSSLESS_HI_RES_3 + elif album_obj["maximum_sampling_rate"] > 48: + quality = MediaQuality.FLAC_LOSSLESS_HI_RES_2 + elif album_obj["maximum_bit_depth"] > 16: + quality = MediaQuality.FLAC_LOSSLESS_HI_RES_1 + elif album_obj.get("format_id", 0) == 5: + quality = MediaQuality.LOSSY_AAC + else: + quality = MediaQuality.FLAC_LOSSLESS + album.add_provider_id( + MediaItemProviderId( + provider=self.id, + item_id=str(album_obj["id"]), + quality=quality, + url=album_obj.get( + "url", f'https://open.qobuz.com/album/{album_obj["id"]}' + ), + details=f'{album_obj["maximum_sampling_rate"]}kHz {album_obj["maximum_bit_depth"]}bit', + available=album_obj["streamable"] and album_obj["displayable"], + ) + ) + + if artist_obj: + album.artist = artist_obj + else: + album.artist = await self._parse_artist(album_obj["artist"]) + if ( + album_obj.get("product_type", "") == "single" + or album_obj.get("release_type", "") == "single" + ): + album.album_type = AlbumType.SINGLE + elif ( + album_obj.get("product_type", "") == "compilation" + or "Various" in album.artist.name + ): + album.album_type = AlbumType.COMPILATION + elif ( + album_obj.get("product_type", "") == "album" + or album_obj.get("release_type", "") == "album" + ): + album.album_type = AlbumType.ALBUM + if "genre" in album_obj: + album.metadata.genres = {album_obj["genre"]["name"]} + if img := self.__get_image(album_obj): + album.metadata.images = {MediaItemImage(ImageType.THUMB, img)} + if len(album_obj["upc"]) == 13: + # qobuz writes ean as upc ?! + album.upc = album_obj["upc"][1:] + else: + album.upc = album_obj["upc"] + if "label" in album_obj: + album.metadata.label = album_obj["label"]["name"] + if album_obj.get("released_at"): + album.year = datetime.datetime.fromtimestamp(album_obj["released_at"]).year + if album_obj.get("copyright"): + album.metadata.copyright = album_obj["copyright"] + if album_obj.get("description"): + album.metadata.description = album_obj["description"] + return album + + async def _parse_track(self, track_obj: dict): + """Parse qobuz track object to generic layout.""" + name, version = parse_title_and_version( + track_obj["title"], track_obj.get("version") + ) + track = Track( + item_id=str(track_obj["id"]), + provider=self.id, + name=name, + version=version, + disc_number=track_obj["media_number"], + track_number=track_obj["track_number"], + duration=track_obj["duration"], + ) + if track_obj.get("performer") and "Various " not in track_obj["performer"]: + artist = await self._parse_artist(track_obj["performer"]) + if artist: + track.artists.append(artist) + if not track.artists: + # try to grab artist from album + if ( + track_obj.get("album") + and track_obj["album"].get("artist") + and "Various " not in track_obj["album"]["artist"] + ): + artist = await self._parse_artist(track_obj["album"]["artist"]) + if artist: + track.artists.append(artist) + if not track.artists: + # last resort: parse from performers string + for performer_str in track_obj["performers"].split(" - "): + role = performer_str.split(", ")[1] + name = performer_str.split(", ")[0] + if "artist" in role.lower(): + artist = Artist(name, self.id, name) + track.artists.append(artist) + # TODO: fix grabbing composer from details + + if "album" in track_obj: + album = await self._parse_album(track_obj["album"]) + if album: + track.album = album + if track_obj.get("isrc"): + track.isrc = track_obj["isrc"] + if track_obj.get("performers"): + track.metadata.performers = { + x.strip() for x in track_obj["performers"].split("-") + } + if track_obj.get("copyright"): + track.metadata.copyright = track_obj["copyright"] + if track_obj.get("audio_info"): + track.metadata.replaygain = track_obj["audio_info"]["replaygain_track_gain"] + if track_obj.get("parental_warning"): + track.metadata.explicit = True + if img := self.__get_image(track_obj): + track.metadata.images = {MediaItemImage(ImageType.THUMB, img)} + # get track quality + if track_obj["maximum_sampling_rate"] > 192: + quality = MediaQuality.FLAC_LOSSLESS_HI_RES_4 + elif track_obj["maximum_sampling_rate"] > 96: + quality = MediaQuality.FLAC_LOSSLESS_HI_RES_3 + elif track_obj["maximum_sampling_rate"] > 48: + quality = MediaQuality.FLAC_LOSSLESS_HI_RES_2 + elif track_obj["maximum_bit_depth"] > 16: + quality = MediaQuality.FLAC_LOSSLESS_HI_RES_1 + elif track_obj.get("format_id", 0) == 5: + quality = MediaQuality.LOSSY_AAC + else: + quality = MediaQuality.FLAC_LOSSLESS + track.add_provider_id( + MediaItemProviderId( + provider=self.id, + item_id=str(track_obj["id"]), + quality=quality, + url=track_obj.get( + "url", f'https://open.qobuz.com/track/{track_obj["id"]}' + ), + details=f'{track_obj["maximum_sampling_rate"]}kHz {track_obj["maximum_bit_depth"]}bit', + available=track_obj["streamable"] and track_obj["displayable"], + ) + ) + return track + + async def _parse_playlist(self, playlist_obj): + """Parse qobuz playlist object to generic layout.""" + playlist = Playlist( + item_id=str(playlist_obj["id"]), + provider=self.id, + name=playlist_obj["name"], + owner=playlist_obj["owner"]["name"], + ) + playlist.add_provider_id( + MediaItemProviderId( + provider=self.id, + item_id=str(playlist_obj["id"]), + url=playlist_obj.get( + "url", f'https://open.qobuz.com/playlist/{playlist_obj["id"]}' + ), + ) + ) + playlist.is_editable = ( + playlist_obj["owner"]["id"] == self._user_auth_info["user"]["id"] + or playlist_obj["is_collaborative"] + ) + if img := self.__get_image(playlist_obj): + playlist.metadata.images = {MediaItemImage(ImageType.THUMB, img)} + playlist.metadata.checksum = str(playlist_obj["updated_at"]) + return playlist + + async def _auth_token(self): + """Login to qobuz and store the token.""" + if self._user_auth_info: + return self._user_auth_info["user_auth_token"] + params = { + "username": self.mass.config.qobuz_username, + "password": self.mass.config.qobuz_password, + "device_manufacturer_id": "music_assistant", + } + details = await self._get_data("user/login", **params) + if details and "user" in details: + self._user_auth_info = details + self.logger.info( + "Succesfully logged in to Qobuz as %s", details["user"]["display_name"] + ) + self.mass.metadata.preferred_language = details["user"]["country_code"] + return details["user_auth_token"] + + @use_cache(3600 * 24) + async def _get_all_items(self, endpoint, key="tracks", **kwargs): + """Get all items from a paged list.""" + limit = 50 + offset = 0 + all_items = [] + while True: + kwargs["limit"] = limit + kwargs["offset"] = offset + result = await self._get_data(endpoint, skip_cache=True, **kwargs) + offset += limit + if not result: + break + if not result.get(key) or not result[key].get("items"): + break + all_items += result[key]["items"] + if len(result[key]["items"]) < limit: + break + return all_items + + @use_cache(3600 * 2) + async def _get_data(self, endpoint, sign_request=False, **kwargs): + """Get data from api.""" + url = f"http://www.qobuz.com/api.json/0.2/{endpoint}" + headers = {"X-App-Id": app_var(0)} + if endpoint != "user/login": + auth_token = await self._auth_token() + if not auth_token: + self.logger.debug("Not logged in") + return None + headers["X-User-Auth-Token"] = auth_token + if sign_request: + signing_data = "".join(endpoint.split("/")) + keys = list(kwargs.keys()) + keys.sort() + for key in keys: + signing_data += f"{key}{kwargs[key]}" + request_ts = str(time.time()) + request_sig = signing_data + request_ts + app_var(1) + request_sig = str(hashlib.md5(request_sig.encode()).hexdigest()) + kwargs["request_ts"] = request_ts + kwargs["request_sig"] = request_sig + kwargs["app_id"] = app_var(0) + kwargs["user_auth_token"] = await self._auth_token() + async with self._throttler: + async with self.mass.http_session.get( + url, headers=headers, params=kwargs, verify_ssl=False + ) as response: + try: + result = await response.json() + if "error" in result or ( + "status" in result and "error" in result["status"] + ): + self.logger.error("%s - %s", endpoint, result) + return None + except ( + aiohttp.ContentTypeError, + JSONDecodeError, + ) as err: + self.logger.error("%s - %s", endpoint, str(err)) + return None + return result + + async def _post_data(self, endpoint, params=None, data=None): + """Post data to api.""" + if not params: + params = {} + if not data: + data = {} + url = f"http://www.qobuz.com/api.json/0.2/{endpoint}" + params["app_id"] = app_var(0) + params["user_auth_token"] = await self._auth_token() + async with self.mass.http_session.post( + url, params=params, json=data, verify_ssl=False + ) as response: + result = await response.json() + if "error" in result or ( + "status" in result and "error" in result["status"] + ): + self.logger.error("%s - %s", endpoint, result) + return None + return result + + def __get_image(self, obj: dict) -> Optional[str]: + """Try to parse image from Qobuz media object.""" + if obj.get("image"): + for key in ["extralarge", "large", "medium", "small"]: + if obj["image"].get(key): + if "2a96cbd8b46e442fc41c2b86b821562f" in obj["image"][key]: + continue + return obj["image"][key] + if obj.get("images300"): + # playlists seem to use this strange format + return obj["images300"][0] + if obj.get("album"): + return self.__get_image(obj["album"]) + if obj.get("artist"): + return self.__get_image(obj["artist"]) + return None diff --git a/music_assistant/controllers/music/providers/spotify.py b/music_assistant/controllers/music/providers/spotify.py new file mode 100644 index 00000000..7d51066c --- /dev/null +++ b/music_assistant/controllers/music/providers/spotify.py @@ -0,0 +1,670 @@ +"""Spotify musicprovider support for MusicAssistant.""" +from __future__ import annotations + +import asyncio +import json +import os +import platform +import time +from json.decoder import JSONDecodeError +from tempfile import gettempdir +from typing import List, Optional + +import aiohttp +from asyncio_throttle import Throttler + +from music_assistant.helpers.app_vars import ( # noqa # pylint: disable=no-name-in-module + app_var, +) +from music_assistant.helpers.cache import use_cache +from music_assistant.helpers.util import parse_title_and_version +from music_assistant.models.errors import LoginFailed +from music_assistant.models.media_items import ( + Album, + AlbumType, + Artist, + ContentType, + ImageType, + MediaItemImage, + MediaItemProviderId, + MediaItemType, + MediaQuality, + MediaType, + Playlist, + StreamDetails, + StreamType, + Track, +) +from music_assistant.models.provider import MusicProvider + +CACHE_DIR = gettempdir() + + +class SpotifyProvider(MusicProvider): + """Implementation of a Spotify MusicProvider.""" + + _attr_id = "spotify" + _attr_name = "Spotify" + _attr_supported_mediatypes = [ + MediaType.ARTIST, + MediaType.ALBUM, + MediaType.TRACK, + MediaType.PLAYLIST + # TODO: Return spotify radio + ] + _auth_token = None + _sp_user = None + _librespot_bin = None + _throttler = Throttler(rate_limit=4, period=1) + + async def setup(self) -> bool: + """Handle async initialization of the provider.""" + if not self.mass.config.spotify_enabled: + return False + if ( + not self.mass.config.spotify_username + or not self.mass.config.spotify_password + ): + raise LoginFailed("Invalid login credentials") + # try to get a token, raise if that fails + token = await self.get_token() + if not token: + raise LoginFailed( + f"Login failed for user {self.mass.config.spotify_username}" + ) + return True + + async def search( + self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5 + ) -> List[MediaItemType]: + """ + Perform search on musicprovider. + + :param search_query: Search query. + :param media_types: A list of media_types to include. All types if None. + :param limit: Number of items to return in the search (per type). + """ + result = [] + searchtypes = [] + if MediaType.ARTIST in media_types: + searchtypes.append("artist") + if MediaType.ALBUM in media_types: + searchtypes.append("album") + if MediaType.TRACK in media_types: + searchtypes.append("track") + if MediaType.PLAYLIST in media_types: + searchtypes.append("playlist") + searchtype = ",".join(searchtypes) + if searchresult := await self._get_data( + "search", q=search_query, type=searchtype, limit=limit + ): + if "artists" in searchresult: + result += [ + await self._parse_artist(item) + for item in searchresult["artists"]["items"] + if (item and item["id"]) + ] + if "albums" in searchresult: + result += [ + await self._parse_album(item) + for item in searchresult["albums"]["items"] + if (item and item["id"]) + ] + if "tracks" in searchresult: + result += [ + await self._parse_track(item) + for item in searchresult["tracks"]["items"] + if (item and item["id"]) + ] + if "playlists" in searchresult: + result += [ + await self._parse_playlist(item) + for item in searchresult["playlists"]["items"] + if (item and item["id"]) + ] + return result + + async def get_library_artists(self) -> List[Artist]: + """Retrieve library artists from spotify.""" + spotify_artists = await self._get_data( + "me/following", type="artist", limit=50, skip_cache=True + ) + return [ + await self._parse_artist(item) + for item in spotify_artists["artists"]["items"] + if (item and item["id"]) + ] + + async def get_library_albums(self) -> List[Album]: + """Retrieve library albums from the provider.""" + return [ + await self._parse_album(item["album"]) + for item in await self._get_all_items("me/albums", skip_cache=True) + if (item["album"] and item["album"]["id"]) + ] + + async def get_library_tracks(self) -> List[Track]: + """Retrieve library tracks from the provider.""" + return [ + await self._parse_track(item["track"]) + for item in await self._get_all_items("me/tracks", skip_cache=True) + if (item and item["track"]["id"]) + ] + + async def get_library_playlists(self) -> List[Playlist]: + """Retrieve playlists from the provider.""" + return [ + await self._parse_playlist(item) + for item in await self._get_all_items("me/playlists", skip_cache=True) + if (item and item["id"]) + ] + + async def get_artist(self, prov_artist_id) -> Artist: + """Get full artist details by id.""" + artist_obj = await self._get_data(f"artists/{prov_artist_id}") + return await self._parse_artist(artist_obj) if artist_obj else None + + async def get_album(self, prov_album_id) -> Album: + """Get full album details by id.""" + album_obj = await self._get_data(f"albums/{prov_album_id}") + return await self._parse_album(album_obj) if album_obj else None + + async def get_track(self, prov_track_id) -> Track: + """Get full track details by id.""" + track_obj = await self._get_data(f"tracks/{prov_track_id}") + return await self._parse_track(track_obj) if track_obj else None + + async def get_playlist(self, prov_playlist_id) -> Playlist: + """Get full playlist details by id.""" + playlist_obj = await self._get_data(f"playlists/{prov_playlist_id}") + return await self._parse_playlist(playlist_obj) if playlist_obj else None + + async def get_album_tracks(self, prov_album_id) -> List[Track]: + """Get all album tracks for given album id.""" + return [ + await self._parse_track(item) + for item in await self._get_all_items(f"albums/{prov_album_id}/tracks") + if (item and item["id"]) + ] + + async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]: + """Get all playlist tracks for given playlist id.""" + playlist = await self.get_playlist(prov_playlist_id) + return [ + await self._parse_track(item["track"]) + for item in await self._get_all_items( + f"playlists/{prov_playlist_id}/tracks", + cache_checksum=playlist.metadata.checksum, + ) + if (item and item["track"] and item["track"]["id"]) + ] + + async def get_artist_albums(self, prov_artist_id) -> List[Album]: + """Get a list of all albums for the given artist.""" + return [ + await self._parse_album(item) + for item in await self._get_all_items( + f"artists/{prov_artist_id}/albums?include_groups=album,single,compilation" + ) + if (item and item["id"]) + ] + + async def get_artist_toptracks(self, prov_artist_id) -> List[Track]: + """Get a list of 10 most popular tracks for the given artist.""" + artist = await self.get_artist(prov_artist_id) + endpoint = f"artists/{prov_artist_id}/top-tracks" + items = await self._get_data(endpoint) + return [ + await self._parse_track(item, artist=artist) + for item in items["tracks"] + if (item and item["id"]) + ] + + async def library_add(self, prov_item_id, media_type: MediaType): + """Add item to library.""" + result = False + if media_type == MediaType.ARTIST: + result = await self._put_data( + "me/following", {"ids": prov_item_id, "type": "artist"} + ) + elif media_type == MediaType.ALBUM: + result = await self._put_data("me/albums", {"ids": prov_item_id}) + elif media_type == MediaType.TRACK: + result = await self._put_data("me/tracks", {"ids": prov_item_id}) + elif media_type == MediaType.PLAYLIST: + result = await self._put_data( + f"playlists/{prov_item_id}/followers", data={"public": False} + ) + return result + + async def library_remove(self, prov_item_id, media_type: MediaType): + """Remove item from library.""" + result = False + if media_type == MediaType.ARTIST: + result = await self._delete_data( + "me/following", {"ids": prov_item_id, "type": "artist"} + ) + elif media_type == MediaType.ALBUM: + result = await self._delete_data("me/albums", {"ids": prov_item_id}) + elif media_type == MediaType.TRACK: + result = await self._delete_data("me/tracks", {"ids": prov_item_id}) + elif media_type == MediaType.PLAYLIST: + result = await self._delete_data(f"playlists/{prov_item_id}/followers") + return result + + async def add_playlist_tracks( + self, prov_playlist_id: str, prov_track_ids: List[str] + ): + """Add track(s) to playlist.""" + track_uris = [] + for track_id in prov_track_ids: + track_uris.append(f"spotify:track:{track_id}") + data = {"uris": track_uris} + return await self._post_data(f"playlists/{prov_playlist_id}/tracks", data=data) + + async def remove_playlist_tracks( + self, prov_playlist_id: str, prov_track_ids: List[str] + ) -> None: + """Remove track(s) from playlist.""" + track_uris = [] + for track_id in prov_track_ids: + track_uris.append({"uri": f"spotify:track:{track_id}"}) + data = {"tracks": track_uris} + return await self._delete_data( + f"playlists/{prov_playlist_id}/tracks", data=data + ) + + async def get_stream_details(self, item_id: str) -> StreamDetails: + """Return the content details for the given track when it will be streamed.""" + # make sure a valid track is requested. + track = await self.get_track(item_id) + if not track: + return None + # make sure that the token is still valid by just requesting it + await self.get_token() + librespot = await self.get_librespot_binary() + librespot_exec = f'{librespot} -c "{CACHE_DIR}" --pass-through -b 320 --single-track spotify://track:{track.item_id}' + return StreamDetails( + type=StreamType.EXECUTABLE, + item_id=track.item_id, + provider=self.id, + path=librespot_exec, + content_type=ContentType.OGG, + sample_rate=44100, + bit_depth=16, + ) + + async def _parse_artist(self, artist_obj): + """Parse spotify artist object to generic layout.""" + artist = Artist( + item_id=artist_obj["id"], provider=self.id, name=artist_obj["name"] + ) + artist.add_provider_id( + MediaItemProviderId( + provider=self.id, + item_id=artist_obj["id"], + url=artist_obj["external_urls"]["spotify"], + ) + ) + if "genres" in artist_obj: + artist.metadata.genres = set(artist_obj["genres"]) + if artist_obj.get("images"): + for img in artist_obj["images"]: + img_url = img["url"] + if "2a96cbd8b46e442fc41c2b86b821562f" not in img_url: + artist.metadata.images = {MediaItemImage(ImageType.THUMB, img_url)} + break + return artist + + async def _parse_album(self, album_obj: dict): + """Parse spotify album object to generic layout.""" + name, version = parse_title_and_version(album_obj["name"]) + album = Album( + item_id=album_obj["id"], provider=self.id, name=name, version=version + ) + for artist in album_obj["artists"]: + album.artist = await self._parse_artist(artist) + if album.artist: + break + if album_obj["album_type"] == "single": + album.album_type = AlbumType.SINGLE + elif album_obj["album_type"] == "compilation": + album.album_type = AlbumType.COMPILATION + elif album_obj["album_type"] == "album": + album.album_type = AlbumType.ALBUM + if "genres" in album_obj: + album.metadata.genre = set(album_obj["genres"]) + if album_obj.get("images"): + album.metadata.images = { + MediaItemImage(ImageType.THUMB, album_obj["images"][0]["url"]) + } + if "external_ids" in album_obj and album_obj["external_ids"].get("upc"): + album.upc = album_obj["external_ids"]["upc"] + if "label" in album_obj: + album.metadata.label = album_obj["label"] + if album_obj.get("release_date"): + album.year = int(album_obj["release_date"].split("-")[0]) + if album_obj.get("copyrights"): + album.metadata.copyright = album_obj["copyrights"][0]["text"] + if album_obj.get("explicit"): + album.metadata.explicit = album_obj["explicit"] + album.add_provider_id( + MediaItemProviderId( + provider=self.id, + item_id=album_obj["id"], + quality=MediaQuality.LOSSY_OGG, + url=album_obj["external_urls"]["spotify"], + ) + ) + return album + + async def _parse_track(self, track_obj, artist=None): + """Parse spotify track object to generic layout.""" + name, version = parse_title_and_version(track_obj["name"]) + track = Track( + item_id=track_obj["id"], + provider=self.id, + name=name, + version=version, + duration=track_obj["duration_ms"] / 1000, + disc_number=track_obj["disc_number"], + track_number=track_obj["track_number"], + ) + if artist: + track.artists.append(artist) + for track_artist in track_obj.get("artists", []): + artist = await self._parse_artist(track_artist) + if artist and artist.item_id not in {x.item_id for x in track.artists}: + track.artists.append(artist) + + track.metadata.explicit = track_obj["explicit"] + if "preview_url" in track_obj: + track.metadata.preview = track_obj["preview_url"] + if "external_ids" in track_obj and "isrc" in track_obj["external_ids"]: + track.isrc = track_obj["external_ids"]["isrc"] + if "album" in track_obj: + track.album = await self._parse_album(track_obj["album"]) + if track_obj["album"].get("images"): + track.metadata.images = { + MediaItemImage( + ImageType.THUMB, track_obj["album"]["images"][0]["url"] + ) + } + if track_obj.get("copyright"): + track.metadata.copyright = track_obj["copyright"] + if track_obj.get("explicit"): + track.metadata.explicit = True + if track_obj.get("popularity"): + track.metadata.popularity = track_obj["popularity"] + track.add_provider_id( + MediaItemProviderId( + provider=self.id, + item_id=track_obj["id"], + quality=MediaQuality.LOSSY_OGG, + url=track_obj["external_urls"]["spotify"], + available=not track_obj["is_local"] and track_obj["is_playable"], + ) + ) + return track + + async def _parse_playlist(self, playlist_obj): + """Parse spotify playlist object to generic layout.""" + playlist = Playlist( + item_id=playlist_obj["id"], + provider=self.id, + name=playlist_obj["name"], + owner=playlist_obj["owner"]["display_name"], + ) + playlist.add_provider_id( + MediaItemProviderId( + provider=self.id, + item_id=playlist_obj["id"], + url=playlist_obj["external_urls"]["spotify"], + ) + ) + playlist.is_editable = ( + playlist_obj["owner"]["id"] == self._sp_user["id"] + or playlist_obj["collaborative"] + ) + if playlist_obj.get("images"): + playlist.metadata.images = { + MediaItemImage(ImageType.THUMB, playlist_obj["images"][0]["url"]) + } + playlist.metadata.checksum = str(playlist_obj["snapshot_id"]) + return playlist + + async def get_token(self): + """Get auth token on spotify.""" + # return existing token if we have one in memory + if ( + self._auth_token + and os.path.isdir(CACHE_DIR) + and (self._auth_token["expiresAt"] > int(time.time()) + 20) + ): + return self._auth_token + tokeninfo = {} + if ( + not self.mass.config.spotify_username + or not self.mass.config.spotify_password + ): + return tokeninfo + # retrieve token with librespot + tokeninfo = await self._get_token() + if tokeninfo: + self._auth_token = tokeninfo + self._sp_user = await self._get_data("me") + self.mass.metadata.preferred_language = self._sp_user["country"] + self.logger.info( + "Succesfully logged in to Spotify as %s", self._sp_user["id"] + ) + self._auth_token = tokeninfo + else: + self.logger.error( + "Login failed for user %s", self.mass.config.spotify_username + ) + return tokeninfo + + async def _get_token(self): + """Get spotify auth token with librespot bin.""" + # authorize with username and password (NOTE: this can also be Spotify Connect) + args = [ + await self.get_librespot_binary(), + "-O", + "-c", + CACHE_DIR, + "-a", + "-u", + self.mass.config.spotify_username, + "-p", + self.mass.config.spotify_password, + ] + librespot = await asyncio.create_subprocess_exec(*args) + await librespot.wait() + # get token with (authorized) librespot + scopes = [ + "user-read-playback-state", + "user-read-currently-playing", + "user-modify-playback-state", + "playlist-read-private", + "playlist-read-collaborative", + "playlist-modify-public", + "playlist-modify-private", + "user-follow-modify", + "user-follow-read", + "user-library-read", + "user-library-modify", + "user-read-private", + "user-read-email", + "user-read-birthdate", + "user-top-read", + ] + scope = ",".join(scopes) + args = [ + await self.get_librespot_binary(), + "-O", + "-t", + "--client-id", + app_var(2), + "--scope", + scope, + "-c", + CACHE_DIR, + ] + librespot = await asyncio.create_subprocess_exec( + *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT + ) + stdout, _ = await librespot.communicate() + try: + result = json.loads(stdout) + except JSONDecodeError: + self.logger.warning( + "Error while retrieving Spotify token, details: %s", stdout + ) + return None + # transform token info to spotipy compatible format + if result and "accessToken" in result: + tokeninfo = result + tokeninfo["expiresAt"] = tokeninfo["expiresIn"] + int(time.time()) + return tokeninfo + return None + + @use_cache(3600 * 24) + async def _get_all_items(self, endpoint, key="items", **kwargs): + """Get all items from a paged list.""" + limit = 50 + offset = 0 + all_items = [] + while True: + kwargs["limit"] = limit + kwargs["offset"] = offset + result = await self._get_data(endpoint, skip_cache=True, **kwargs) + offset += limit + if not result or key not in result or not result[key]: + break + all_items += result[key] + if len(result[key]) < limit: + break + return all_items + + @use_cache(3600 * 2) + async def _get_data(self, endpoint, **kwargs): + """Get data from api.""" + url = f"https://api.spotify.com/v1/{endpoint}" + kwargs["market"] = "from_token" + kwargs["country"] = "from_token" + token = await self.get_token() + if not token: + return None + headers = {"Authorization": f'Bearer {token["accessToken"]}'} + async with self._throttler: + async with self.mass.http_session.get( + url, headers=headers, params=kwargs, verify_ssl=False + ) as response: + try: + result = await response.json() + if "error" in result or ( + "status" in result and "error" in result["status"] + ): + self.logger.error("%s - %s", endpoint, result) + return None + except ( + aiohttp.ContentTypeError, + JSONDecodeError, + ) as err: + self.logger.error("%s - %s", endpoint, str(err)) + return None + return result + + async def _delete_data(self, endpoint, data=None, **kwargs): + """Delete data from api.""" + url = f"https://api.spotify.com/v1/{endpoint}" + token = await self.get_token() + if not token: + return None + headers = {"Authorization": f'Bearer {token["accessToken"]}'} + async with self.mass.http_session.delete( + url, headers=headers, params=kwargs, json=data, verify_ssl=False + ) as response: + return await response.text() + + async def _put_data(self, endpoint, data=None, **kwargs): + """Put data on api.""" + url = f"https://api.spotify.com/v1/{endpoint}" + token = await self.get_token() + if not token: + return None + headers = {"Authorization": f'Bearer {token["accessToken"]}'} + async with self.mass.http_session.put( + url, headers=headers, params=kwargs, json=data, verify_ssl=False + ) as response: + return await response.text() + + async def _post_data(self, endpoint, data=None, **kwargs): + """Post data on api.""" + url = f"https://api.spotify.com/v1/{endpoint}" + token = await self.get_token() + if not token: + return None + headers = {"Authorization": f'Bearer {token["accessToken"]}'} + async with self.mass.http_session.post( + url, headers=headers, params=kwargs, json=data, verify_ssl=False + ) as response: + return await response.text() + + async def get_librespot_binary(self): + """Find the correct librespot binary belonging to the platform.""" + if self._librespot_bin is not None: + return self._librespot_bin + + async def check_librespot(librespot_path: str) -> str | None: + try: + librespot = await asyncio.create_subprocess_exec( + *[librespot_path, "-V"], stdout=asyncio.subprocess.PIPE + ) + stdout, _ = await librespot.communicate() + if librespot.returncode == 0 and b"librespot" in stdout: + self._librespot_bin = librespot_path + return librespot_path + except OSError: + return None + + base_path = os.path.join(os.path.dirname(__file__), "librespot") + if platform.system() == "Windows": + if librespot := await check_librespot( + os.path.join(base_path, "windows", "librespot.exe") + ): + return librespot + if platform.system() == "Darwin": + # macos binary is x86_64 intel + if librespot := await check_librespot( + os.path.join(base_path, "osx", "librespot") + ): + return librespot + + if platform.system() == "Linux": + architecture = platform.machine() + if architecture in ["AMD64", "x86_64"]: + # generic linux x86_64 binary + if librespot := await check_librespot( + os.path.join( + base_path, + "linux", + "librespot-x86_64", + ) + ): + return librespot + + # arm architecture... try all options one by one... + for arch in ["aarch64", "armv7", "armhf", "arm"]: + if librespot := await check_librespot( + os.path.join( + base_path, + "linux", + f"librespot-{arch}", + ) + ): + return librespot + + raise RuntimeError( + f"Unable to locate Libespot for platform {platform.system()}" + ) diff --git a/music_assistant/controllers/music/providers/tunein.py b/music_assistant/controllers/music/providers/tunein.py new file mode 100644 index 00000000..2700e1ac --- /dev/null +++ b/music_assistant/controllers/music/providers/tunein.py @@ -0,0 +1,185 @@ +"""Tune-In musicprovider support for MusicAssistant.""" +from __future__ import annotations + +from typing import List, Optional + +from asyncio_throttle import Throttler + +from music_assistant.helpers.cache import use_cache +from music_assistant.helpers.util import create_sort_name +from music_assistant.models.errors import LoginFailed +from music_assistant.models.media_items import ( + ContentType, + ImageType, + MediaItemImage, + MediaItemProviderId, + MediaItemType, + MediaQuality, + MediaType, + Radio, + StreamDetails, + StreamType, +) +from music_assistant.models.provider import MusicProvider + + +class TuneInProvider(MusicProvider): + """Provider implementation for Tune In.""" + + _attr_id = "tunein" + _attr_name = "Tune-in Radio" + _attr_supported_mediatypes = [MediaType.RADIO] + _throttler = Throttler(rate_limit=1, period=1) + + async def setup(self) -> bool: + """Handle async initialization of the provider.""" + if not self.mass.config.tunein_enabled: + return False + if not self.mass.config.tunein_username: + raise LoginFailed("Username is invalid") + if "@" in self.mass.config.tunein_username: + raise LoginFailed("You must provide the TuneIn username, not email") + return True + + async def search( + self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5 + ) -> List[MediaItemType]: + """ + Perform search on musicprovider. + + :param search_query: Search query. + :param media_types: A list of media_types to include. All types if None. + :param limit: Number of items to return in the search (per type). + """ + result = [] + # TODO: search for radio stations + return result + + async def get_library_radios(self) -> List[Radio]: + """Retrieve library/subscribed radio stations from the provider.""" + + async def parse_items(items: List[dict], folder: str = None) -> List[Radio]: + result = [] + for item in items: + item_type = item.get("type", "") + if item_type == "audio": + if "preset_id" not in item: + continue + # each radio station can have multiple streams add each one as different quality + stream_info = await self.__get_data( + "Tune.ashx", id=item["preset_id"] + ) + for stream in stream_info["body"]: + result.append(await self._parse_radio(item, stream, folder)) + elif item_type == "link": + # stations are in sublevel (new style) + if sublevel := await self.__get_data(item["URL"], render="json"): + result += await parse_items(sublevel["body"], item["text"]) + elif item.get("children"): + # stations are in sublevel (old style ?) + result += await parse_items(item["children"], item["text"]) + return result + + data = await self.__get_data("Browse.ashx", c="presets") + if data and "body" in data: + return await parse_items(data["body"]) + return [] + + async def get_radio(self, prov_radio_id: str) -> Radio: + """Get radio station details.""" + prov_radio_id, media_type = prov_radio_id.split("--", 1) + params = {"c": "composite", "detail": "listing", "id": prov_radio_id} + result = await self.__get_data("Describe.ashx", **params) + if result and result.get("body") and result["body"][0].get("children"): + item = result["body"][0]["children"][0] + stream_info = await self.__get_data("Tune.ashx", id=prov_radio_id) + for stream in stream_info["body"]: + if stream["media_type"] != media_type: + continue + return await self._parse_radio(item, stream) + return None + + async def _parse_radio( + self, details: dict, stream: dict, folder: Optional[str] = None + ) -> Radio: + """Parse Radio object from json obj returned from api.""" + if "name" in details: + name = details["name"] + else: + # parse name from text attr + name = details["text"] + if " | " in name: + name = name.split(" | ")[1] + name = name.split(" (")[0] + item_id = f'{details["preset_id"]}--{stream["media_type"]}' + radio = Radio(item_id=item_id, provider=self.id, name=name) + if stream["media_type"] == "aac": + quality = MediaQuality.LOSSY_AAC + elif stream["media_type"] == "ogg": + quality = MediaQuality.LOSSY_OGG + else: + quality = MediaQuality.LOSSY_MP3 + radio.add_provider_id( + MediaItemProviderId( + provider=self.id, + item_id=item_id, + quality=quality, + details=stream["url"], + ) + ) + # preset number is used for sorting (not present at stream time) + preset_number = details.get("preset_number") + if preset_number and folder: + radio.sort_name = f'{folder}-{details["preset_number"]}' + elif preset_number: + radio.sort_name = details["preset_number"] + radio.sort_name += create_sort_name(name) + if "text" in details: + radio.metadata.description = details["text"] + # images + if img := details.get("image"): + radio.metadata.images = {MediaItemImage(ImageType.THUMB, img)} + if img := details.get("logo"): + radio.metadata.images = {MediaItemImage(ImageType.LOGO, img)} + return radio + + async def get_stream_details(self, item_id: str) -> StreamDetails: + """Get streamdetails for a radio station.""" + item_id, media_type = item_id.split("--", 1) + stream_info = await self.__get_data("Tune.ashx", id=item_id) + for stream in stream_info["body"]: + if stream["media_type"] == media_type: + return StreamDetails( + type=StreamType.URL, + item_id=item_id, + provider=self.id, + path=stream["url"], + content_type=ContentType(stream["media_type"]), + sample_rate=44100, + bit_depth=16, + media_type=MediaType.RADIO, + details=stream, + ) + return None + + @use_cache(3600 * 2) + async def __get_data(self, endpoint: str, **kwargs): + """Get data from api.""" + if endpoint.startswith("http"): + url = endpoint + else: + url = f"https://opml.radiotime.com/{endpoint}" + kwargs["formats"] = "ogg,aac,wma,mp3" + kwargs["username"] = self.mass.config.tunein_username + kwargs["partnerId"] = "1" + kwargs["render"] = "json" + async with self._throttler: + async with self.mass.http_session.get( + url, params=kwargs, verify_ssl=False + ) as response: + result = await response.json() + if not result or "error" in result: + self.logger.error(url) + self.logger.error(kwargs) + result = None + return result diff --git a/music_assistant/controllers/stream.py b/music_assistant/controllers/stream.py index a963a15a..ae4b2de7 100644 --- a/music_assistant/controllers/stream.py +++ b/music_assistant/controllers/stream.py @@ -19,7 +19,6 @@ from music_assistant.helpers.audio import ( strip_silence, ) from music_assistant.helpers.process import AsyncProcess -from music_assistant.helpers.util import get_ip, select_stream_port from music_assistant.models.enums import ( ContentType, CrossFadeMode, @@ -37,12 +36,12 @@ if TYPE_CHECKING: class StreamController: """Controller to stream audio to players.""" - def __init__(self, mass: MusicAssistant, port: Optional[int] = None): + def __init__(self, mass: MusicAssistant): """Initialize instance.""" self.mass = mass self.logger = mass.logger.getChild("stream") - self._port = port or select_stream_port() - self._ip: str = get_ip() + self._port = mass.config.stream_port + self._ip = mass.config.stream_ip self._subscribers: Dict[str, Set[str]] = {} self._client_queues: Dict[str, Dict[str, asyncio.Queue]] = {} self._stream_tasks: Dict[str, Task] = {} diff --git a/music_assistant/helpers/database.py b/music_assistant/helpers/database.py index 3f807a0c..6a8801a1 100755 --- a/music_assistant/helpers/database.py +++ b/music_assistant/helpers/database.py @@ -5,7 +5,6 @@ from contextlib import asynccontextmanager from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional from databases import Database as Db -from databases import DatabaseURL if TYPE_CHECKING: from music_assistant.mass import MusicAssistant @@ -28,9 +27,9 @@ TABLE_SETTINGS = "settings" class Database: """Class that holds the (logic to the) database.""" - def __init__(self, mass: MusicAssistant, url: DatabaseURL): + def __init__(self, mass: MusicAssistant): """Initialize class.""" - self.url = url + self.url = mass.config.database_url self.mass = mass self.logger = mass.logger.getChild("db") diff --git a/music_assistant/helpers/util.py b/music_assistant/helpers/util.py index 2060de83..8ef1b29a 100755 --- a/music_assistant/helpers/util.py +++ b/music_assistant/helpers/util.py @@ -1,7 +1,6 @@ """Helper and utility functions.""" from __future__ import annotations -import asyncio import os import platform import socket @@ -18,24 +17,6 @@ CALLBACK_TYPE = Callable[[], None] # pylint: enable=invalid-name -def run_periodic(delay: float, later: bool = False): - """Run a coroutine at interval.""" - - def scheduler(fcn): - async def wrapper(*args, **kwargs): - while True: - if later: - await asyncio.sleep(delay) - await fcn(*args, **kwargs) - else: - await fcn(*args, **kwargs) - await asyncio.sleep(delay) - - return wrapper - - return scheduler - - def filename_from_string(string): """Create filename from unsafe string.""" keepcharacters = (" ", ".", "_") diff --git a/music_assistant/mass.py b/music_assistant/mass.py index 6455c8d1..2a7b0ce2 100644 --- a/music_assistant/mass.py +++ b/music_assistant/mass.py @@ -13,7 +13,6 @@ from typing import Any, Callable, Coroutine, Deque, List, Optional, Tuple, Type, from uuid import uuid4 import aiohttp -from databases import DatabaseURL from music_assistant.controllers.metadata import MetaDataController from music_assistant.controllers.music import MusicController @@ -22,6 +21,7 @@ from music_assistant.controllers.stream import StreamController from music_assistant.helpers.cache import Cache from music_assistant.helpers.database import Database from music_assistant.models.background_job import BackgroundJob +from music_assistant.models.config import MassConfig from music_assistant.models.enums import EventType, JobStatus from music_assistant.models.event import MassEvent @@ -30,25 +30,24 @@ EventSubscriptionType = Tuple[ EventCallBackType, Optional[Tuple[EventType]], Optional[Tuple[str]] ] -MAX_SIMULTANEOUS_JOBS = 5 - class MusicAssistant: """Main MusicAssistant object.""" def __init__( self, - db_url: DatabaseURL, + config: MassConfig, session: Optional[aiohttp.ClientSession] = None, ) -> None: """ Create an instance of MusicAssistant. - db_url: Database connection string/url. + conf: Music Assistant runtimestartup Config stream_port: TCP port used for streaming audio. session: Optionally provide an aiohttp clientsession """ + self.config = config self.loop: asyncio.AbstractEventLoop = None self.http_session: aiohttp.ClientSession = session self.http_session_provided = session is not None @@ -59,7 +58,7 @@ class MusicAssistant: self._jobs_event = asyncio.Event() # init core controllers - self.database = Database(self, db_url) + self.database = Database(self) self.cache = Cache(self) self.metadata = MetaDataController(self) self.music = MusicController(self) @@ -212,7 +211,7 @@ class MusicAssistant: self._jobs_event.clear() # make sure we're not running more jobs than allowed running_jobs = tuple(x for x in self._jobs if x.status == JobStatus.RUNNING) - slots_available = MAX_SIMULTANEOUS_JOBS - len(running_jobs) + slots_available = self.config.max_simultaneous_jobs - len(running_jobs) count = 0 while count <= slots_available: count += 1 diff --git a/music_assistant/models/config.py b/music_assistant/models/config.py new file mode 100644 index 00000000..7cb3df16 --- /dev/null +++ b/music_assistant/models/config.py @@ -0,0 +1,35 @@ +"""Model for the Music Assisant runtime config.""" + +from dataclasses import dataclass +from typing import Optional + +from databases import DatabaseURL + +from music_assistant.helpers.util import get_ip, select_stream_port + + +@dataclass(frozen=True) +class MassConfig: + """Model for the Music Assisant runtime config.""" + + database_url: DatabaseURL + + spotify_enabled: bool = False + spotify_username: Optional[str] = None + spotify_password: Optional[str] = None + + qobuz_enabled: bool = False + qobuz_username: Optional[str] = None + qobuz_password: Optional[str] = None + + tunein_enabled: bool = False + tunein_username: Optional[str] = None + + filesystem_enabled: bool = False + filesystem_music_dir: Optional[str] = None + filesystem_playlists_dir: Optional[str] = None + + # advanced settings + max_simultaneous_jobs: int = 10 + stream_port: int = select_stream_port() + stream_ip: str = get_ip() diff --git a/music_assistant/models/enums.py b/music_assistant/models/enums.py index 52b8a259..288e655f 100644 --- a/music_assistant/models/enums.py +++ b/music_assistant/models/enums.py @@ -187,11 +187,9 @@ class EventType(Enum): """Enum with possible Events.""" PLAYER_ADDED = "player added" - PLAYER_REMOVED = "player removed" PLAYER_UPDATED = "player updated" STREAM_STARTED = "streaming started" STREAM_ENDED = "streaming ended" - MUSIC_SYNC_STATUS = "music sync status" QUEUE_ADDED = "queue_added" QUEUE_UPDATED = "queue updated" QUEUE_ITEMS_UPDATED = "queue items updated" @@ -203,8 +201,6 @@ class EventType(Enum): PLAYLIST_ADDED = "playlist added" PLAYLIST_UPDATED = "playlist updated" RADIO_ADDED = "radio added" - TASK_UPDATED = "task updated" - PROVIDER_REGISTERED = "provider registered" BACKGROUND_JOB_UPDATED = "background_job_updated" diff --git a/music_assistant/models/provider.py b/music_assistant/models/provider.py index e69bd921..f1dabbc2 100644 --- a/music_assistant/models/provider.py +++ b/music_assistant/models/provider.py @@ -32,7 +32,7 @@ class MusicProvider: logger: Logger = None # set by setup @abstractmethod - async def setup(self) -> None: + async def setup(self) -> bool: """ Handle async initialization of the provider. @@ -193,3 +193,7 @@ class MusicProvider: return await self.get_playlist(prov_item_id) if media_type == MediaType.RADIO: return await self.get_radio(prov_item_id) + + async def sync(self) -> None: + """Run/schedule sync for this provider.""" + await self.mass.music.run_provider_sync(self.id) diff --git a/music_assistant/providers/__init__.py b/music_assistant/providers/__init__.py deleted file mode 100644 index 01895ef6..00000000 --- a/music_assistant/providers/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Package with Music Providers.""" diff --git a/music_assistant/providers/filesystem.py b/music_assistant/providers/filesystem.py deleted file mode 100644 index 0fea27f3..00000000 --- a/music_assistant/providers/filesystem.py +++ /dev/null @@ -1,529 +0,0 @@ -"""Filesystem musicprovider support for MusicAssistant.""" -from __future__ import annotations - -import base64 -import os -from typing import Dict, List, Optional, Tuple - -import aiofiles -from tinytag.tinytag import TinyTag, TinyTagException - -from music_assistant.helpers.compare import compare_strings -from music_assistant.helpers.util import parse_title_and_version, try_parse_int -from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError -from music_assistant.models.media_items import ( - Album, - AlbumType, - Artist, - ContentType, - ImageType, - MediaItemImage, - MediaItemProviderId, - MediaItemType, - MediaQuality, - MediaType, - Playlist, - StreamDetails, - StreamType, - Track, -) -from music_assistant.models.provider import MusicProvider - - -def split_items(org_str: str, splitters: Tuple[str] = None) -> Tuple[str]: - """Split up a tags string by common splitter.""" - if splitters is None: - splitters = ("/", ";", ",") - if org_str is None: - return tuple() - for splitter in splitters: - if splitter in org_str: - return tuple((x.strip() for x in org_str.split(splitter))) - return (org_str,) - - -FALLBACK_ARTIST = "Various Artists" -ARTIST_SPLITTERS = (";", ",", "Featuring", " Feat. ", " Feat ", "feat.", " & ") - - -class FileSystemProvider(MusicProvider): - """ - Very basic implementation of a musicprovider for local files. - - Assumes files are stored on disk in format // - Reads ID3 tags from file and falls back to parsing filename - Supports m3u files only for playlists - Supports having URI's from streaming providers within m3u playlist - Should be compatible with LMS - """ - - def __init__(self, music_dir: str, playlist_dir: Optional[str] = None) -> None: - """ - Initialize the Filesystem provider. - - music_dir: Directory on disk containing music files - playlist_dir: Directory on disk containing playlist files (optional) - - """ - self._attr_id = "filesystem" - self._attr_name = "Filesystem" - self._playlists_dir = playlist_dir - self._music_dir = music_dir - self._attr_supported_mediatypes = [ - MediaType.ARTIST, - MediaType.ALBUM, - MediaType.TRACK, - ] - if playlist_dir is not None: - self._attr_supported_mediatypes.append(MediaType.PLAYLIST) - self._cached_tracks: List[Track] = [] - - async def setup(self) -> None: - """Handle async initialization of the provider.""" - if not os.path.isdir(self._music_dir): - raise FileNotFoundError(f"Music Directory {self._music_dir} does not exist") - if self._playlists_dir is not None and not os.path.isdir(self._playlists_dir): - raise FileNotFoundError( - f"Playlist Directory {self._playlists_dir} does not exist" - ) - - async def search( - self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5 - ) -> List[MediaItemType]: - """ - Perform search on musicprovider. - - :param search_query: Search query. - :param media_types: A list of media_types to include. All types if None. - :param limit: Number of items to return in the search (per type). - """ - result = [] - for track in await self.get_library_tracks(True): - for search_part in search_query.split(" - "): - if media_types is None or MediaType.TRACK in media_types: - if compare_strings(track.name, search_part): - result.append(track) - if media_types is None or MediaType.ALBUM in media_types: - if track.album: - if compare_strings(track.album.name, search_part): - result.append(track.album) - if media_types is None or MediaType.ARTIST in media_types: - if track.album and track.album.artist: - if compare_strings(track.album.artist, search_part): - result.append(track.album.artist) - return result - - async def get_library_artists(self) -> List[Artist]: - """Retrieve all library artists.""" - result = [] - cur_ids = set() - # for the sake of simplicity we only iterate over the files in one location only, - # which is the library tracks where we recursively enumerate the directory structure - # library artists = unique album artists across all tracks - # the track listing is cached so this should be (pretty) fast - for track in await self.get_library_tracks(True): - if track.album is None or track.album is None: - continue - if track.album.artist.item_id in cur_ids: - continue - result.append(track.album.artist) - cur_ids.add(track.album.artist.item_id) - return result - - async def get_library_albums(self) -> List[Album]: - """Get album folders recursively.""" - result = [] - cur_ids = set() - # for the sake of simplicity we only iterate over the files in one location only, - # which is the library tracks where we recurisvely enumerate the directory structure - # library albums = unique albums across all tracks - # the track listing is cached so this should be (pretty) fast - for track in await self.get_library_tracks(True): - if track.album is None: - continue - if track.album.item_id in cur_ids: - continue - result.append(track.album) - cur_ids.add(track.album.item_id) - return result - - async def get_library_tracks(self, use_cache=False) -> List[Track]: - """Get all tracks recursively.""" - # pylint: disable=arguments-differ - # we cache the entire tracks listing for performance and convenience reasons - # so we can easy retrieve the library artists and albums from the tracks listing - cache_key = f"{self.id}.tracks" - cache_result: Dict[str, dict] = await self.mass.cache.get( - cache_key, checksum=self._music_dir - ) - if cache_result is not None and use_cache: - return [Track.from_dict(x) for x in cache_result.values()] - if cache_result is None: - cache_result = {} - - # find all music files in the music directory and all subfolders - result = [] - for _root, _dirs, _files in os.walk(self._music_dir): - for file in _files: - filename = os.path.join(_root, file) - checksum = self._get_checksum(filename) - prov_item_id = self._get_item_id(filename) - cache_track = cache_result.get(prov_item_id) - # we do not want to parse tags if there are no changes to the file - # so we speedup the sync by comparing a checksum - if cache_track and cache_track["metadata"].get("checksum") == checksum: - # checksum did not change, use cached track - result.append(Track.from_dict(cache_track)) - elif track := await self._parse_track(filename): - cache_result[prov_item_id] = track.to_dict() - result.append(track) - # store cache listing in cache - await self.mass.cache.set(cache_key, cache_result, self._music_dir) - return result - - async def get_library_playlists(self) -> List[Playlist]: - """Retrieve playlists from disk.""" - if not self._playlists_dir: - return [] - result = [] - cur_ids = set() - for filename in os.listdir(self._playlists_dir): - filepath = os.path.join(self._playlists_dir, filename) - if ( - os.path.isfile(filepath) - and not filename.startswith(".") - and filename.lower().endswith(".m3u") - ): - playlist = await self._parse_playlist(filepath) - if playlist: - result.append(playlist) - cur_ids.add(playlist.item_id) - return result - - async def get_artist(self, prov_artist_id: str) -> Artist: - """Get full artist details by id.""" - if album_artist := next( - ( - track.album.artist - for track in await self.get_library_tracks(True) - if track.album is not None - and track.album.artist is not None - and track.album.artist.item_id == prov_artist_id - ), - None, - ): - return album_artist - # fallback to track_artist - for track in await self.get_library_tracks(True): - for artist in track.artists: - if artist.item_id == prov_artist_id: - return artist - return None - - async def get_album(self, prov_album_id: str) -> Album: - """Get full album details by id.""" - return next( - ( - track.album - for track in await self.get_library_tracks(True) - if track.album is not None and track.album.item_id == prov_album_id - ), - None, - ) - - async def get_track(self, prov_track_id: str) -> Track: - """Get full track details by id.""" - itempath = self._get_filename(prov_track_id) - if not os.path.isfile(itempath): - raise MediaNotFoundError(f"Track path does not exist: {itempath}") - return await self._parse_track(itempath) - - async def get_playlist(self, prov_playlist_id: str) -> Playlist: - """Get full playlist details by id.""" - itempath = self._get_filename(prov_playlist_id) - if not os.path.isfile(itempath): - raise MediaNotFoundError(f"playlist path does not exist: {itempath}") - return await self._parse_playlist(itempath) - - async def get_album_tracks(self, prov_album_id) -> List[Track]: - """Get album tracks for given album id.""" - return [ - track - for track in await self.get_library_tracks(True) - if track.album is not None and track.album.item_id == prov_album_id - ] - - async def get_playlist_tracks(self, prov_playlist_id: str) -> List[Track]: - """Get playlist tracks for given playlist id.""" - result = [] - itempath = self._get_filename(prov_playlist_id) - if not os.path.isfile(itempath): - raise MediaNotFoundError(f"playlist path does not exist: {itempath}") - index = 0 - async with aiofiles.open(itempath, "r") as _file: - for line in await _file.readlines(): - line = line.strip() - if line and not line.startswith("#"): - if track := await self._parse_track_from_uri(line): - result.append(track) - index += 1 - return result - - async def get_artist_albums(self, prov_artist_id: str) -> List[Album]: - """Get a list of albums for the given artist.""" - result = [] - cur_ids = set() - for track in await self.get_library_tracks(True): - if track.album is None: - continue - if track.album.item_id in cur_ids: - continue - if track.album.artist is None: - continue - if track.album.artist.item_id != prov_artist_id: - continue - result.append(track.album) - cur_ids.add(track.album.item_id) - return result - - async def get_artist_toptracks(self, prov_artist_id: str) -> List[Track]: - """Get a list of all tracks as we have no clue about preference.""" - return [ - track - for track in await self.get_library_tracks(True) - if track.artists is not None - and ( - (prov_artist_id in (x.item_id for x in track.artists)) - or ( - track.album is not None - and track.album.artist is not None - and track.album.artist.item_id == prov_artist_id - ) - ) - ] - - async def get_stream_details(self, item_id: str) -> StreamDetails: - """Return the content details for the given track when it will be streamed.""" - itempath = self._get_filename(item_id) - if not os.path.isfile(itempath): - raise MediaNotFoundError(f"Track path does not exist: {itempath}") - - def parse_tag(): - return TinyTag.get(itempath) - - tags = await self.mass.loop.run_in_executor(None, parse_tag) - - return StreamDetails( - type=StreamType.FILE, - provider=self.id, - item_id=item_id, - content_type=ContentType(itempath.split(".")[-1]), - path=itempath, - sample_rate=tags.samplerate or 44100, - bit_depth=16, # TODO: parse bitdepth - ) - - async def get_embedded_image(self, filename: str) -> str | None: - """Return the embedded image of an audio file as base64 string.""" - if not TinyTag.is_supported(filename): - return None - - def parse_tags(): - return TinyTag.get(filename, tags=True, image=True, ignore_errors=True) - - tags = await self.mass.loop.run_in_executor(None, parse_tags) - if image_data := tags.get_image(): - enc_image = base64.b64encode(image_data).decode() - enc_image = f"data:image/png;base64,{enc_image}" - return enc_image - - async def _parse_track(self, filename: str) -> Track | None: - """Try to parse a track from a filename by reading its tags.""" - if not TinyTag.is_supported(filename): - return None - - def parse_tags(): - return TinyTag.get(filename, image=True, ignore_errors=True) - - # parse ID3 tags with TinyTag - try: - tags = await self.mass.loop.run_in_executor(None, parse_tags) - except TinyTagException as err: - self.logger.error("Error processing %s: %s", filename, str(err)) - - prov_item_id = self._get_item_id(filename) - - # work out if we have an artist/album/track.ext structure - filename_base = filename.replace(self._music_dir, "") - if filename_base.startswith(os.sep): - filename_base = filename_base[1:] - track_parts = filename_base.rsplit(os.sep) - if len(track_parts) == 3: - album_artist_name = track_parts[0] - album_name = track_parts[1] - else: - album_artist_name = tags.albumartist - album_name = tags.album - - # prefer title from tag, fallback to filename - if tags.title: - track_title = tags.title - else: - ext = filename_base.split(".")[-1] - track_title = filename_base.replace(f".{ext}", "").replace("_", " ") - self.logger.warning( - "%s is missing ID3 tags, use filename as fallback", filename_base - ) - - name, version = parse_title_and_version(track_title) - track = Track( - item_id=prov_item_id, provider=self.id, name=name, version=version - ) - - # Parse track artist(s) from artist string using common splitters used in ID3 tags - # NOTE: do not use a '/' or '&' to prevent artists like AC/DC become messed up - track_artists_str = tags.artist or album_artist_name or FALLBACK_ARTIST - track.artists = [ - Artist( - item_id=item, - provider=self._attr_id, - name=item, - ) - for item in split_items(track_artists_str, ARTIST_SPLITTERS) - ] - - # Check if track has embedded metadata - if tags.get_image(): - # we do not actually embed the image in the metadata because that would consume too - # much space and bandwidth. Instead we set the filename as value so the image can - # be retrieved later in realtime. - track.metadata.images = {MediaItemImage(ImageType.EMBEDDED_THUMB, filename)} - - # Parse album (only if we have album + album artist tags) - if album_name and album_artist_name: - album_id = album_name - album_name, album_version = parse_title_and_version(album_name) - track.album = Album( - item_id=album_id, - provider=self._attr_id, - name=album_name, - version=album_version, - year=try_parse_int(tags.year) if tags.year else None, - artist=Artist( - item_id=album_artist_name, - provider=self._attr_id, - name=album_artist_name, - ), - ) - track.album.metadata.images = track.metadata.images - - # try to guess the album type - if name.lower() == album_name.lower(): - track.album.album_type = AlbumType.SINGLE - elif album_artist_name not in (x.name for x in track.artists): - track.album.album_type = AlbumType.COMPILATION - else: - track.album.album_type = AlbumType.ALBUM - - # parse other info - track.duration = tags.duration - track.metadata.genres = set(split_items(tags.genre)) - track.disc_number = try_parse_int(tags.disc) - track.track_number = try_parse_int(tags.track) - track.isrc = tags.extra.get("isrc", "") - if "copyright" in tags.extra: - track.metadata.copyright = tags.extra["copyright"] - if "lyrics" in tags.extra: - track.metadata.lyrics = tags.extra["lyrics"] - # store last modified time as checksum - track.metadata.checksum = self._get_checksum(filename) - - quality_details = "" - if filename.endswith(".flac"): - # TODO: get bit depth - quality = MediaQuality.FLAC_LOSSLESS - if tags.samplerate > 192000: - quality = MediaQuality.FLAC_LOSSLESS_HI_RES_4 - elif tags.samplerate > 96000: - quality = MediaQuality.FLAC_LOSSLESS_HI_RES_3 - elif tags.samplerate > 48000: - quality = MediaQuality.FLAC_LOSSLESS_HI_RES_2 - quality_details = f"{tags.samplerate / 1000} Khz" - elif filename.endswith(".ogg"): - quality = MediaQuality.LOSSY_OGG - quality_details = f"{tags.bitrate} kbps" - elif filename.endswith(".m4a"): - quality = MediaQuality.LOSSY_AAC - quality_details = f"{tags.bitrate} kbps" - else: - quality = MediaQuality.LOSSY_MP3 - quality_details = f"{tags.bitrate} kbps" - track.add_provider_id( - MediaItemProviderId( - provider=self.id, - item_id=prov_item_id, - quality=quality, - details=quality_details, - url=filename, - ) - ) - return track - - async def _parse_playlist(self, filename: str) -> Playlist | None: - """Parse playlist from file.""" - # use the relative filename as item_id - filename_base = filename.replace(self._music_dir, "") - if filename_base.startswith(os.sep): - filename_base = filename_base[1:] - prov_item_id = filename_base - - name = filename.split(os.sep)[-1].replace(".m3u", "") - - playlist = Playlist(prov_item_id, provider=self.id, name=name) - playlist.is_editable = True - playlist.add_provider_id( - MediaItemProviderId(provider=self.id, item_id=prov_item_id, url=filename) - ) - playlist.owner = self._attr_name - playlist.metadata.checksum = self._get_checksum(filename) - return playlist - - async def _parse_track_from_uri(self, uri): - """Try to parse a track from an uri found in playlist.""" - if "://" in uri: - # track is uri from external provider? - try: - return await self.mass.music.get_item_by_uri(uri) - except MusicAssistantError as err: - self.logger.warning( - "Could not parse uri %s to track: %s", uri, str(err) - ) - return None - # try to treat uri as filename - try: - return await self.get_track(uri) - except MediaNotFoundError: - return None - - def _get_filename(self, item_id: str, playlist: bool = False) -> str: - """Get filename for item_id.""" - if self._music_dir in item_id: - return item_id - if playlist: - return os.path.join(self._playlists_dir, item_id) - return os.path.join(self._music_dir, item_id) - - def _get_item_id(self, filename: str, playlist: bool = False) -> str: - """Return item_id for given filename.""" - # we simply use the base filename as item_id - base_path = self._playlists_dir if playlist else self._music_dir - filename_base = filename.replace(base_path, "") - if filename_base.startswith(os.sep): - filename_base = filename_base[1:] - return filename_base - - @staticmethod - def _get_checksum(filename: str) -> str: - """Get checksum for file.""" - # use last modified time as checksum - return str(os.path.getmtime(filename)) diff --git a/music_assistant/providers/qobuz.py b/music_assistant/providers/qobuz.py deleted file mode 100644 index 29179d5a..00000000 --- a/music_assistant/providers/qobuz.py +++ /dev/null @@ -1,745 +0,0 @@ -"""Qobuz musicprovider support for MusicAssistant.""" -from __future__ import annotations - -import datetime -import hashlib -import time -from json import JSONDecodeError -from typing import List, Optional - -import aiohttp -from asyncio_throttle import Throttler - -from music_assistant.helpers.app_vars import ( # pylint: disable=no-name-in-module - app_var, -) -from music_assistant.helpers.cache import use_cache -from music_assistant.helpers.util import parse_title_and_version, try_parse_int -from music_assistant.models.enums import EventType -from music_assistant.models.errors import LoginFailed -from music_assistant.models.event import MassEvent -from music_assistant.models.media_items import ( - Album, - AlbumType, - Artist, - ContentType, - ImageType, - MediaItemImage, - MediaItemProviderId, - MediaItemType, - MediaQuality, - MediaType, - Playlist, - StreamDetails, - StreamType, - Track, -) -from music_assistant.models.provider import MusicProvider - - -class QobuzProvider(MusicProvider): - """Provider for the Qobux music service.""" - - def __init__(self, username: str, password: str) -> None: - """Initialize the Spotify provider.""" - self._attr_id = "qobuz" - self._attr_name = "Qobuz" - self._attr_supported_mediatypes = [ - MediaType.ARTIST, - MediaType.ALBUM, - MediaType.TRACK, - MediaType.PLAYLIST, - ] - self._username = username - self._password = password - self.__user_auth_info = None - self._throttler = Throttler(rate_limit=4, period=1) - - async def setup(self) -> None: - """Handle async initialization of the provider.""" - # try to get a token, raise if that fails - token = await self._auth_token() - if not token: - raise LoginFailed(f"Login failed for user {self._username}") - # subscribe to stream events so we can report playback to Qobuz - self.mass.subscribe( - self.on_stream_event, - (EventType.STREAM_STARTED, EventType.STREAM_ENDED), - id_filter=self.id, - ) - - async def search( - self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5 - ) -> List[MediaItemType]: - """ - Perform search on musicprovider. - - :param search_query: Search query. - :param media_types: A list of media_types to include. All types if None. - :param limit: Number of items to return in the search (per type). - """ - result = [] - params = {"query": search_query, "limit": limit} - if len(media_types) == 1: - # qobuz does not support multiple searchtypes, falls back to all if no type given - if media_types[0] == MediaType.ARTIST: - params["type"] = "artists" - if media_types[0] == MediaType.ALBUM: - params["type"] = "albums" - if media_types[0] == MediaType.TRACK: - params["type"] = "tracks" - if media_types[0] == MediaType.PLAYLIST: - params["type"] = "playlists" - if searchresult := await self._get_data("catalog/search", **params): - if "artists" in searchresult: - result += [ - await self._parse_artist(item) - for item in searchresult["artists"]["items"] - if (item and item["id"]) - ] - if "albums" in searchresult: - result += [ - await self._parse_album(item) - for item in searchresult["albums"]["items"] - if (item and item["id"]) - ] - if "tracks" in searchresult: - result += [ - await self._parse_track(item) - for item in searchresult["tracks"]["items"] - if (item and item["id"]) - ] - if "playlists" in searchresult: - result += [ - await self._parse_playlist(item) - for item in searchresult["playlists"]["items"] - if (item and item["id"]) - ] - return result - - async def get_library_artists(self) -> List[Artist]: - """Retrieve all library artists from Qobuz.""" - endpoint = "favorite/getUserFavorites" - return [ - await self._parse_artist(item) - for item in await self._get_all_items( - endpoint, key="artists", type="artists" - ) - if (item and item["id"]) - ] - - async def get_library_albums(self) -> List[Album]: - """Retrieve all library albums from Qobuz.""" - endpoint = "favorite/getUserFavorites" - return [ - await self._parse_album(item) - for item in await self._get_all_items(endpoint, key="albums", type="albums") - if (item and item["id"]) - ] - - async def get_library_tracks(self) -> List[Track]: - """Retrieve library tracks from Qobuz.""" - endpoint = "favorite/getUserFavorites" - return [ - await self._parse_track(item) - for item in await self._get_all_items(endpoint, key="tracks", type="tracks") - if (item and item["id"]) - ] - - async def get_library_playlists(self) -> List[Playlist]: - """Retrieve all library playlists from the provider.""" - endpoint = "playlist/getUserPlaylists" - return [ - await self._parse_playlist(item) - for item in await self._get_all_items(endpoint, key="playlists") - if (item and item["id"]) - ] - - async def get_artist(self, prov_artist_id) -> Artist: - """Get full artist details by id.""" - params = {"artist_id": prov_artist_id} - artist_obj = await self._get_data("artist/get", **params) - return ( - await self._parse_artist(artist_obj) - if artist_obj and artist_obj["id"] - else None - ) - - async def get_album(self, prov_album_id) -> Album: - """Get full album details by id.""" - params = {"album_id": prov_album_id} - album_obj = await self._get_data("album/get", **params) - return ( - await self._parse_album(album_obj) - if album_obj and album_obj["id"] - else None - ) - - async def get_track(self, prov_track_id) -> Track: - """Get full track details by id.""" - params = {"track_id": prov_track_id} - track_obj = await self._get_data("track/get", **params) - return ( - await self._parse_track(track_obj) - if track_obj and track_obj["id"] - else None - ) - - async def get_playlist(self, prov_playlist_id) -> Playlist: - """Get full playlist details by id.""" - params = {"playlist_id": prov_playlist_id} - playlist_obj = await self._get_data("playlist/get", **params) - return ( - await self._parse_playlist(playlist_obj) - if playlist_obj and playlist_obj["id"] - else None - ) - - async def get_album_tracks(self, prov_album_id) -> List[Track]: - """Get all album tracks for given album id.""" - params = {"album_id": prov_album_id} - return [ - await self._parse_track(item) - for item in await self._get_all_items("album/get", **params, key="tracks") - if (item and item["id"]) - ] - - async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]: - """Get all playlist tracks for given playlist id.""" - playlist = await self.get_playlist(prov_playlist_id) - endpoint = "playlist/get" - return [ - await self._parse_track(item) - for item in await self._get_all_items( - endpoint, - key="tracks", - playlist_id=prov_playlist_id, - extra="tracks", - cache_checksum=playlist.metadata.checksum, - ) - if (item and item["id"]) - ] - - async def get_artist_albums(self, prov_artist_id) -> List[Album]: - """Get a list of albums for the given artist.""" - endpoint = "artist/get" - return [ - await self._parse_album(item) - for item in await self._get_all_items( - endpoint, key="albums", artist_id=prov_artist_id, extra="albums" - ) - if (item and item["id"] and str(item["artist"]["id"]) == prov_artist_id) - ] - - async def get_artist_toptracks(self, prov_artist_id) -> List[Track]: - """Get a list of most popular tracks for the given artist.""" - result = await self._get_data( - "artist/get", - artist_id=prov_artist_id, - extra="playlists", - offset=0, - limit=25, - ) - if result and result["playlists"]: - return [ - await self._parse_track(item) - for item in result["playlists"][0]["tracks"]["items"] - if (item and item["id"]) - ] - # fallback to search - artist = await self.get_artist(prov_artist_id) - searchresult = await self._get_data( - "catalog/search", query=artist.name, limit=25, type="tracks" - ) - return [ - await self._parse_track(item) - for item in searchresult["tracks"]["items"] - if ( - item - and item["id"] - and "performer" in item - and str(item["performer"]["id"]) == str(prov_artist_id) - ) - ] - - async def get_similar_artists(self, prov_artist_id): - """Get similar artists for given artist.""" - # https://www.qobuz.com/api.json/0.2/artist/getSimilarArtists?artist_id=220020&offset=0&limit=3 - - async def library_add(self, prov_item_id, media_type: MediaType): - """Add item to library.""" - result = None - if media_type == MediaType.ARTIST: - result = await self._get_data( - "favorite/create", {"artist_ids": prov_item_id} - ) - elif media_type == MediaType.ALBUM: - result = await self._get_data( - "favorite/create", {"album_ids": prov_item_id} - ) - elif media_type == MediaType.TRACK: - result = await self._get_data( - "favorite/create", {"track_ids": prov_item_id} - ) - elif media_type == MediaType.PLAYLIST: - result = await self._get_data( - "playlist/subscribe", {"playlist_id": prov_item_id} - ) - return result - - async def library_remove(self, prov_item_id, media_type: MediaType): - """Remove item from library.""" - result = None - if media_type == MediaType.ARTIST: - result = await self._get_data( - "favorite/delete", {"artist_ids": prov_item_id} - ) - elif media_type == MediaType.ALBUM: - result = await self._get_data( - "favorite/delete", {"album_ids": prov_item_id} - ) - elif media_type == MediaType.TRACK: - result = await self._get_data( - "favorite/delete", {"track_ids": prov_item_id} - ) - elif media_type == MediaType.PLAYLIST: - playlist = await self.get_playlist(prov_item_id) - if playlist.is_editable: - result = await self._get_data( - "playlist/delete", {"playlist_id": prov_item_id} - ) - else: - result = await self._get_data( - "playlist/unsubscribe", {"playlist_id": prov_item_id} - ) - return result - - async def add_playlist_tracks( - self, prov_playlist_id: str, prov_track_ids: List[str] - ) -> None: - """Add track(s) to playlist.""" - return await self._get_data( - "playlist/addTracks", - playlist_id=prov_playlist_id, - track_ids=",".join(prov_track_ids), - playlist_track_ids=",".join(prov_track_ids), - ) - - async def remove_playlist_tracks( - self, prov_playlist_id: str, prov_track_ids: List[str] - ) -> None: - """Remove track(s) from playlist.""" - playlist_track_ids = set() - for track in await self._get_all_items( - "playlist/get", key="tracks", playlist_id=prov_playlist_id, extra="tracks" - ): - if str(track["id"]) in prov_track_ids: - playlist_track_ids.add(str(track["playlist_track_id"])) - return await self._get_data( - "playlist/deleteTracks", - playlist_id=prov_playlist_id, - playlist_track_ids=",".join(playlist_track_ids), - ) - - async def get_stream_details(self, item_id: str) -> StreamDetails: - """Return the content details for the given track when it will be streamed.""" - streamdata = None - for format_id in [27, 7, 6, 5]: - # it seems that simply requesting for highest available quality does not work - # from time to time the api response is empty for this request ?! - result = await self._get_data( - "track/getFileUrl", - sign_request=True, - format_id=format_id, - track_id=item_id, - intent="stream", - skip_cache=True, - ) - if result and result.get("url"): - streamdata = result - break - if not streamdata: - self.logger.error("Unable to retrieve stream details for track %s", item_id) - return None - if streamdata["mime_type"] == "audio/mpeg": - content_type = ContentType.MPEG - elif streamdata["mime_type"] == "audio/flac": - content_type = ContentType.FLAC - else: - self.logger.error("Unsupported mime type for track %s", item_id) - return None - return StreamDetails( - type=StreamType.URL, - item_id=str(item_id), - provider=self.id, - path=streamdata["url"], - content_type=content_type, - sample_rate=int(streamdata["sampling_rate"] * 1000), - bit_depth=streamdata["bit_depth"], - details=streamdata, # we need these details for reporting playback - ) - - async def on_stream_event(self, event: MassEvent): - """ - Received event from mass. - - We use this to report playback start/stop to qobuz. - """ - if not self.__user_auth_info: - return - # TODO: need to figure out if the streamed track is purchased by user - # https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx - # {"albums":{"total":0,"items":[]},"tracks":{"total":0,"items":[]},"user":{"id":xxxx,"login":"xxxxx"}} - if event.type == EventType.STREAM_STARTED: - # report streaming started to qobuz - device_id = self.__user_auth_info["user"]["device"]["id"] - credential_id = self.__user_auth_info["user"]["credential"]["id"] - user_id = self.__user_auth_info["user"]["id"] - format_id = event.data.details["format_id"] - timestamp = int(time.time()) - events = [ - { - "online": True, - "sample": False, - "intent": "stream", - "device_id": device_id, - "track_id": str(event.data.item_id), - "purchase": False, - "date": timestamp, - "credential_id": credential_id, - "user_id": user_id, - "local": False, - "format_id": format_id, - } - ] - await self._post_data("track/reportStreamingStart", data=events) - elif event.type == EventType.STREAM_ENDED: - # report streaming ended to qobuz - user_id = self.__user_auth_info["user"]["id"] - await self._get_data( - "/track/reportStreamingEnd", - user_id=user_id, - track_id=str(event.data.item_id), - duration=try_parse_int(event.data.seconds_played), - ) - - async def _parse_artist(self, artist_obj: dict): - """Parse qobuz artist object to generic layout.""" - artist = Artist( - item_id=str(artist_obj["id"]), provider=self.id, name=artist_obj["name"] - ) - artist.add_provider_id( - MediaItemProviderId( - provider=self.id, - item_id=str(artist_obj["id"]), - url=artist_obj.get( - "url", f'https://open.qobuz.com/artist/{artist_obj["id"]}' - ), - ) - ) - if img := self.__get_image(artist_obj): - artist.metadata.images = {MediaItemImage(ImageType.THUMB, img)} - if artist_obj.get("biography"): - artist.metadata.description = artist_obj["biography"].get("content") - return artist - - async def _parse_album(self, album_obj: dict, artist_obj: dict = None): - """Parse qobuz album object to generic layout.""" - if not artist_obj and "artist" not in album_obj: - # artist missing in album info, return full abum instead - return await self.get_album(album_obj["id"]) - name, version = parse_title_and_version( - album_obj["title"], album_obj.get("version") - ) - album = Album( - item_id=str(album_obj["id"]), provider=self.id, name=name, version=version - ) - if album_obj["maximum_sampling_rate"] > 192: - quality = MediaQuality.FLAC_LOSSLESS_HI_RES_4 - elif album_obj["maximum_sampling_rate"] > 96: - quality = MediaQuality.FLAC_LOSSLESS_HI_RES_3 - elif album_obj["maximum_sampling_rate"] > 48: - quality = MediaQuality.FLAC_LOSSLESS_HI_RES_2 - elif album_obj["maximum_bit_depth"] > 16: - quality = MediaQuality.FLAC_LOSSLESS_HI_RES_1 - elif album_obj.get("format_id", 0) == 5: - quality = MediaQuality.LOSSY_AAC - else: - quality = MediaQuality.FLAC_LOSSLESS - album.add_provider_id( - MediaItemProviderId( - provider=self.id, - item_id=str(album_obj["id"]), - quality=quality, - url=album_obj.get( - "url", f'https://open.qobuz.com/album/{album_obj["id"]}' - ), - details=f'{album_obj["maximum_sampling_rate"]}kHz {album_obj["maximum_bit_depth"]}bit', - available=album_obj["streamable"] and album_obj["displayable"], - ) - ) - - if artist_obj: - album.artist = artist_obj - else: - album.artist = await self._parse_artist(album_obj["artist"]) - if ( - album_obj.get("product_type", "") == "single" - or album_obj.get("release_type", "") == "single" - ): - album.album_type = AlbumType.SINGLE - elif ( - album_obj.get("product_type", "") == "compilation" - or "Various" in album.artist.name - ): - album.album_type = AlbumType.COMPILATION - elif ( - album_obj.get("product_type", "") == "album" - or album_obj.get("release_type", "") == "album" - ): - album.album_type = AlbumType.ALBUM - if "genre" in album_obj: - album.metadata.genres = {album_obj["genre"]["name"]} - if img := self.__get_image(album_obj): - album.metadata.images = {MediaItemImage(ImageType.THUMB, img)} - if len(album_obj["upc"]) == 13: - # qobuz writes ean as upc ?! - album.upc = album_obj["upc"][1:] - else: - album.upc = album_obj["upc"] - if "label" in album_obj: - album.metadata.label = album_obj["label"]["name"] - if album_obj.get("released_at"): - album.year = datetime.datetime.fromtimestamp(album_obj["released_at"]).year - if album_obj.get("copyright"): - album.metadata.copyright = album_obj["copyright"] - if album_obj.get("description"): - album.metadata.description = album_obj["description"] - return album - - async def _parse_track(self, track_obj: dict): - """Parse qobuz track object to generic layout.""" - name, version = parse_title_and_version( - track_obj["title"], track_obj.get("version") - ) - track = Track( - item_id=str(track_obj["id"]), - provider=self.id, - name=name, - version=version, - disc_number=track_obj["media_number"], - track_number=track_obj["track_number"], - duration=track_obj["duration"], - ) - if track_obj.get("performer") and "Various " not in track_obj["performer"]: - artist = await self._parse_artist(track_obj["performer"]) - if artist: - track.artists.append(artist) - if not track.artists: - # try to grab artist from album - if ( - track_obj.get("album") - and track_obj["album"].get("artist") - and "Various " not in track_obj["album"]["artist"] - ): - artist = await self._parse_artist(track_obj["album"]["artist"]) - if artist: - track.artists.append(artist) - if not track.artists: - # last resort: parse from performers string - for performer_str in track_obj["performers"].split(" - "): - role = performer_str.split(", ")[1] - name = performer_str.split(", ")[0] - if "artist" in role.lower(): - artist = Artist(name, self.id, name) - track.artists.append(artist) - # TODO: fix grabbing composer from details - - if "album" in track_obj: - album = await self._parse_album(track_obj["album"]) - if album: - track.album = album - if track_obj.get("isrc"): - track.isrc = track_obj["isrc"] - if track_obj.get("performers"): - track.metadata.performers = { - x.strip() for x in track_obj["performers"].split("-") - } - if track_obj.get("copyright"): - track.metadata.copyright = track_obj["copyright"] - if track_obj.get("audio_info"): - track.metadata.replaygain = track_obj["audio_info"]["replaygain_track_gain"] - if track_obj.get("parental_warning"): - track.metadata.explicit = True - if img := self.__get_image(track_obj): - track.metadata.images = {MediaItemImage(ImageType.THUMB, img)} - # get track quality - if track_obj["maximum_sampling_rate"] > 192: - quality = MediaQuality.FLAC_LOSSLESS_HI_RES_4 - elif track_obj["maximum_sampling_rate"] > 96: - quality = MediaQuality.FLAC_LOSSLESS_HI_RES_3 - elif track_obj["maximum_sampling_rate"] > 48: - quality = MediaQuality.FLAC_LOSSLESS_HI_RES_2 - elif track_obj["maximum_bit_depth"] > 16: - quality = MediaQuality.FLAC_LOSSLESS_HI_RES_1 - elif track_obj.get("format_id", 0) == 5: - quality = MediaQuality.LOSSY_AAC - else: - quality = MediaQuality.FLAC_LOSSLESS - track.add_provider_id( - MediaItemProviderId( - provider=self.id, - item_id=str(track_obj["id"]), - quality=quality, - url=track_obj.get( - "url", f'https://open.qobuz.com/track/{track_obj["id"]}' - ), - details=f'{track_obj["maximum_sampling_rate"]}kHz {track_obj["maximum_bit_depth"]}bit', - available=track_obj["streamable"] and track_obj["displayable"], - ) - ) - return track - - async def _parse_playlist(self, playlist_obj): - """Parse qobuz playlist object to generic layout.""" - playlist = Playlist( - item_id=str(playlist_obj["id"]), - provider=self.id, - name=playlist_obj["name"], - owner=playlist_obj["owner"]["name"], - ) - playlist.add_provider_id( - MediaItemProviderId( - provider=self.id, - item_id=str(playlist_obj["id"]), - url=playlist_obj.get( - "url", f'https://open.qobuz.com/playlist/{playlist_obj["id"]}' - ), - ) - ) - playlist.is_editable = ( - playlist_obj["owner"]["id"] == self.__user_auth_info["user"]["id"] - or playlist_obj["is_collaborative"] - ) - if img := self.__get_image(playlist_obj): - playlist.metadata.images = {MediaItemImage(ImageType.THUMB, img)} - playlist.metadata.checksum = str(playlist_obj["updated_at"]) - return playlist - - async def _auth_token(self): - """Login to qobuz and store the token.""" - if self.__user_auth_info: - return self.__user_auth_info["user_auth_token"] - params = { - "username": self._username, - "password": self._password, - "device_manufacturer_id": "music_assistant", - } - details = await self._get_data("user/login", **params) - if details and "user" in details: - self.__user_auth_info = details - self.logger.info( - "Succesfully logged in to Qobuz as %s", details["user"]["display_name"] - ) - self.mass.metadata.preferred_language = details["user"]["country_code"] - return details["user_auth_token"] - - @use_cache(3600 * 24) - async def _get_all_items(self, endpoint, key="tracks", **kwargs): - """Get all items from a paged list.""" - limit = 50 - offset = 0 - all_items = [] - while True: - kwargs["limit"] = limit - kwargs["offset"] = offset - result = await self._get_data(endpoint, skip_cache=True, **kwargs) - offset += limit - if not result: - break - if not result.get(key) or not result[key].get("items"): - break - all_items += result[key]["items"] - if len(result[key]["items"]) < limit: - break - return all_items - - @use_cache(3600 * 2) - async def _get_data(self, endpoint, sign_request=False, **kwargs): - """Get data from api.""" - url = f"http://www.qobuz.com/api.json/0.2/{endpoint}" - headers = {"X-App-Id": app_var(0)} - if endpoint != "user/login": - auth_token = await self._auth_token() - if not auth_token: - self.logger.debug("Not logged in") - return None - headers["X-User-Auth-Token"] = auth_token - if sign_request: - signing_data = "".join(endpoint.split("/")) - keys = list(kwargs.keys()) - keys.sort() - for key in keys: - signing_data += f"{key}{kwargs[key]}" - request_ts = str(time.time()) - request_sig = signing_data + request_ts + app_var(1) - request_sig = str(hashlib.md5(request_sig.encode()).hexdigest()) - kwargs["request_ts"] = request_ts - kwargs["request_sig"] = request_sig - kwargs["app_id"] = app_var(0) - kwargs["user_auth_token"] = await self._auth_token() - async with self._throttler: - async with self.mass.http_session.get( - url, headers=headers, params=kwargs, verify_ssl=False - ) as response: - try: - result = await response.json() - if "error" in result or ( - "status" in result and "error" in result["status"] - ): - self.logger.error("%s - %s", endpoint, result) - return None - except ( - aiohttp.ContentTypeError, - JSONDecodeError, - ) as err: - self.logger.error("%s - %s", endpoint, str(err)) - return None - return result - - async def _post_data(self, endpoint, params=None, data=None): - """Post data to api.""" - if not params: - params = {} - if not data: - data = {} - url = f"http://www.qobuz.com/api.json/0.2/{endpoint}" - params["app_id"] = app_var(0) - params["user_auth_token"] = await self._auth_token() - async with self.mass.http_session.post( - url, params=params, json=data, verify_ssl=False - ) as response: - result = await response.json() - if "error" in result or ( - "status" in result and "error" in result["status"] - ): - self.logger.error("%s - %s", endpoint, result) - return None - return result - - def __get_image(self, obj: dict) -> Optional[str]: - """Try to parse image from Qobuz media object.""" - if obj.get("image"): - for key in ["extralarge", "large", "medium", "small"]: - if obj["image"].get(key): - if "2a96cbd8b46e442fc41c2b86b821562f" in obj["image"][key]: - continue - return obj["image"][key] - if obj.get("images300"): - # playlists seem to use this strange format - return obj["images300"][0] - if obj.get("album"): - return self.__get_image(obj["album"]) - if obj.get("artist"): - return self.__get_image(obj["artist"]) - return None diff --git a/music_assistant/providers/spotify/__init__.py b/music_assistant/providers/spotify/__init__.py deleted file mode 100644 index 5ef92cd4..00000000 --- a/music_assistant/providers/spotify/__init__.py +++ /dev/null @@ -1,661 +0,0 @@ -"""Spotify musicprovider support for MusicAssistant.""" -from __future__ import annotations - -import asyncio -import json -import os -import platform -import time -from json.decoder import JSONDecodeError -from tempfile import gettempdir -from typing import List, Optional - -import aiohttp -from asyncio_throttle import Throttler - -from music_assistant.helpers.app_vars import ( # noqa # pylint: disable=no-name-in-module - app_var, -) -from music_assistant.helpers.cache import use_cache -from music_assistant.helpers.util import parse_title_and_version -from music_assistant.models.errors import LoginFailed -from music_assistant.models.media_items import ( - Album, - AlbumType, - Artist, - ContentType, - ImageType, - MediaItemImage, - MediaItemProviderId, - MediaItemType, - MediaQuality, - MediaType, - Playlist, - StreamDetails, - StreamType, - Track, -) -from music_assistant.models.provider import MusicProvider - -CACHE_DIR = gettempdir() - - -class SpotifyProvider(MusicProvider): - """Implementation of a Spotify MusicProvider.""" - - def __init__(self, username: str, password: str) -> None: - """Initialize the Spotify provider.""" - self._attr_id = "spotify" - self._attr_name = "Spotify" - self._attr_supported_mediatypes = [ - MediaType.ARTIST, - MediaType.ALBUM, - MediaType.TRACK, - MediaType.PLAYLIST - # TODO: Return spotify radio - ] - self._username = username - self._password = password - self._auth_token = None - self._sp_user = None - self._librespot_bin = None - self._throttler = Throttler(rate_limit=4, period=1) - - async def setup(self) -> None: - """Handle async initialization of the provider.""" - if not self._username or not self._password: - raise LoginFailed("Invalid login credentials") - # try to get a token, raise if that fails - token = await self.get_token() - if not token: - raise LoginFailed(f"Login failed for user {self._username}") - - async def search( - self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5 - ) -> List[MediaItemType]: - """ - Perform search on musicprovider. - - :param search_query: Search query. - :param media_types: A list of media_types to include. All types if None. - :param limit: Number of items to return in the search (per type). - """ - result = [] - searchtypes = [] - if MediaType.ARTIST in media_types: - searchtypes.append("artist") - if MediaType.ALBUM in media_types: - searchtypes.append("album") - if MediaType.TRACK in media_types: - searchtypes.append("track") - if MediaType.PLAYLIST in media_types: - searchtypes.append("playlist") - searchtype = ",".join(searchtypes) - if searchresult := await self._get_data( - "search", q=search_query, type=searchtype, limit=limit - ): - if "artists" in searchresult: - result += [ - await self._parse_artist(item) - for item in searchresult["artists"]["items"] - if (item and item["id"]) - ] - if "albums" in searchresult: - result += [ - await self._parse_album(item) - for item in searchresult["albums"]["items"] - if (item and item["id"]) - ] - if "tracks" in searchresult: - result += [ - await self._parse_track(item) - for item in searchresult["tracks"]["items"] - if (item and item["id"]) - ] - if "playlists" in searchresult: - result += [ - await self._parse_playlist(item) - for item in searchresult["playlists"]["items"] - if (item and item["id"]) - ] - return result - - async def get_library_artists(self) -> List[Artist]: - """Retrieve library artists from spotify.""" - spotify_artists = await self._get_data( - "me/following", type="artist", limit=50, skip_cache=True - ) - return [ - await self._parse_artist(item) - for item in spotify_artists["artists"]["items"] - if (item and item["id"]) - ] - - async def get_library_albums(self) -> List[Album]: - """Retrieve library albums from the provider.""" - return [ - await self._parse_album(item["album"]) - for item in await self._get_all_items("me/albums", skip_cache=True) - if (item["album"] and item["album"]["id"]) - ] - - async def get_library_tracks(self) -> List[Track]: - """Retrieve library tracks from the provider.""" - return [ - await self._parse_track(item["track"]) - for item in await self._get_all_items("me/tracks", skip_cache=True) - if (item and item["track"]["id"]) - ] - - async def get_library_playlists(self) -> List[Playlist]: - """Retrieve playlists from the provider.""" - return [ - await self._parse_playlist(item) - for item in await self._get_all_items("me/playlists", skip_cache=True) - if (item and item["id"]) - ] - - async def get_artist(self, prov_artist_id) -> Artist: - """Get full artist details by id.""" - artist_obj = await self._get_data(f"artists/{prov_artist_id}") - return await self._parse_artist(artist_obj) if artist_obj else None - - async def get_album(self, prov_album_id) -> Album: - """Get full album details by id.""" - album_obj = await self._get_data(f"albums/{prov_album_id}") - return await self._parse_album(album_obj) if album_obj else None - - async def get_track(self, prov_track_id) -> Track: - """Get full track details by id.""" - track_obj = await self._get_data(f"tracks/{prov_track_id}") - return await self._parse_track(track_obj) if track_obj else None - - async def get_playlist(self, prov_playlist_id) -> Playlist: - """Get full playlist details by id.""" - playlist_obj = await self._get_data(f"playlists/{prov_playlist_id}") - return await self._parse_playlist(playlist_obj) if playlist_obj else None - - async def get_album_tracks(self, prov_album_id) -> List[Track]: - """Get all album tracks for given album id.""" - return [ - await self._parse_track(item) - for item in await self._get_all_items(f"albums/{prov_album_id}/tracks") - if (item and item["id"]) - ] - - async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]: - """Get all playlist tracks for given playlist id.""" - playlist = await self.get_playlist(prov_playlist_id) - return [ - await self._parse_track(item["track"]) - for item in await self._get_all_items( - f"playlists/{prov_playlist_id}/tracks", - cache_checksum=playlist.metadata.checksum, - ) - if (item and item["track"] and item["track"]["id"]) - ] - - async def get_artist_albums(self, prov_artist_id) -> List[Album]: - """Get a list of all albums for the given artist.""" - return [ - await self._parse_album(item) - for item in await self._get_all_items( - f"artists/{prov_artist_id}/albums?include_groups=album,single,compilation" - ) - if (item and item["id"]) - ] - - async def get_artist_toptracks(self, prov_artist_id) -> List[Track]: - """Get a list of 10 most popular tracks for the given artist.""" - artist = await self.get_artist(prov_artist_id) - endpoint = f"artists/{prov_artist_id}/top-tracks" - items = await self._get_data(endpoint) - return [ - await self._parse_track(item, artist=artist) - for item in items["tracks"] - if (item and item["id"]) - ] - - async def library_add(self, prov_item_id, media_type: MediaType): - """Add item to library.""" - result = False - if media_type == MediaType.ARTIST: - result = await self._put_data( - "me/following", {"ids": prov_item_id, "type": "artist"} - ) - elif media_type == MediaType.ALBUM: - result = await self._put_data("me/albums", {"ids": prov_item_id}) - elif media_type == MediaType.TRACK: - result = await self._put_data("me/tracks", {"ids": prov_item_id}) - elif media_type == MediaType.PLAYLIST: - result = await self._put_data( - f"playlists/{prov_item_id}/followers", data={"public": False} - ) - return result - - async def library_remove(self, prov_item_id, media_type: MediaType): - """Remove item from library.""" - result = False - if media_type == MediaType.ARTIST: - result = await self._delete_data( - "me/following", {"ids": prov_item_id, "type": "artist"} - ) - elif media_type == MediaType.ALBUM: - result = await self._delete_data("me/albums", {"ids": prov_item_id}) - elif media_type == MediaType.TRACK: - result = await self._delete_data("me/tracks", {"ids": prov_item_id}) - elif media_type == MediaType.PLAYLIST: - result = await self._delete_data(f"playlists/{prov_item_id}/followers") - return result - - async def add_playlist_tracks( - self, prov_playlist_id: str, prov_track_ids: List[str] - ): - """Add track(s) to playlist.""" - track_uris = [] - for track_id in prov_track_ids: - track_uris.append(f"spotify:track:{track_id}") - data = {"uris": track_uris} - return await self._post_data(f"playlists/{prov_playlist_id}/tracks", data=data) - - async def remove_playlist_tracks( - self, prov_playlist_id: str, prov_track_ids: List[str] - ) -> None: - """Remove track(s) from playlist.""" - track_uris = [] - for track_id in prov_track_ids: - track_uris.append({"uri": f"spotify:track:{track_id}"}) - data = {"tracks": track_uris} - return await self._delete_data( - f"playlists/{prov_playlist_id}/tracks", data=data - ) - - async def get_stream_details(self, item_id: str) -> StreamDetails: - """Return the content details for the given track when it will be streamed.""" - # make sure a valid track is requested. - track = await self.get_track(item_id) - if not track: - return None - # make sure that the token is still valid by just requesting it - await self.get_token() - librespot = await self.get_librespot_binary() - librespot_exec = f'{librespot} -c "{CACHE_DIR}" --pass-through -b 320 --single-track spotify://track:{track.item_id}' - return StreamDetails( - type=StreamType.EXECUTABLE, - item_id=track.item_id, - provider=self.id, - path=librespot_exec, - content_type=ContentType.OGG, - sample_rate=44100, - bit_depth=16, - ) - - async def _parse_artist(self, artist_obj): - """Parse spotify artist object to generic layout.""" - artist = Artist( - item_id=artist_obj["id"], provider=self.id, name=artist_obj["name"] - ) - artist.add_provider_id( - MediaItemProviderId( - provider=self.id, - item_id=artist_obj["id"], - url=artist_obj["external_urls"]["spotify"], - ) - ) - if "genres" in artist_obj: - artist.metadata.genres = set(artist_obj["genres"]) - if artist_obj.get("images"): - for img in artist_obj["images"]: - img_url = img["url"] - if "2a96cbd8b46e442fc41c2b86b821562f" not in img_url: - artist.metadata.images = {MediaItemImage(ImageType.THUMB, img_url)} - break - return artist - - async def _parse_album(self, album_obj: dict): - """Parse spotify album object to generic layout.""" - name, version = parse_title_and_version(album_obj["name"]) - album = Album( - item_id=album_obj["id"], provider=self.id, name=name, version=version - ) - for artist in album_obj["artists"]: - album.artist = await self._parse_artist(artist) - if album.artist: - break - if album_obj["album_type"] == "single": - album.album_type = AlbumType.SINGLE - elif album_obj["album_type"] == "compilation": - album.album_type = AlbumType.COMPILATION - elif album_obj["album_type"] == "album": - album.album_type = AlbumType.ALBUM - if "genres" in album_obj: - album.metadata.genre = set(album_obj["genres"]) - if album_obj.get("images"): - album.metadata.images = { - MediaItemImage(ImageType.THUMB, album_obj["images"][0]["url"]) - } - if "external_ids" in album_obj and album_obj["external_ids"].get("upc"): - album.upc = album_obj["external_ids"]["upc"] - if "label" in album_obj: - album.metadata.label = album_obj["label"] - if album_obj.get("release_date"): - album.year = int(album_obj["release_date"].split("-")[0]) - if album_obj.get("copyrights"): - album.metadata.copyright = album_obj["copyrights"][0]["text"] - if album_obj.get("explicit"): - album.metadata.explicit = album_obj["explicit"] - album.add_provider_id( - MediaItemProviderId( - provider=self.id, - item_id=album_obj["id"], - quality=MediaQuality.LOSSY_OGG, - url=album_obj["external_urls"]["spotify"], - ) - ) - return album - - async def _parse_track(self, track_obj, artist=None): - """Parse spotify track object to generic layout.""" - name, version = parse_title_and_version(track_obj["name"]) - track = Track( - item_id=track_obj["id"], - provider=self.id, - name=name, - version=version, - duration=track_obj["duration_ms"] / 1000, - disc_number=track_obj["disc_number"], - track_number=track_obj["track_number"], - ) - if artist: - track.artists.append(artist) - for track_artist in track_obj.get("artists", []): - artist = await self._parse_artist(track_artist) - if artist and artist.item_id not in {x.item_id for x in track.artists}: - track.artists.append(artist) - - track.metadata.explicit = track_obj["explicit"] - if "preview_url" in track_obj: - track.metadata.preview = track_obj["preview_url"] - if "external_ids" in track_obj and "isrc" in track_obj["external_ids"]: - track.isrc = track_obj["external_ids"]["isrc"] - if "album" in track_obj: - track.album = await self._parse_album(track_obj["album"]) - if track_obj["album"].get("images"): - track.metadata.images = { - MediaItemImage( - ImageType.THUMB, track_obj["album"]["images"][0]["url"] - ) - } - if track_obj.get("copyright"): - track.metadata.copyright = track_obj["copyright"] - if track_obj.get("explicit"): - track.metadata.explicit = True - if track_obj.get("popularity"): - track.metadata.popularity = track_obj["popularity"] - track.add_provider_id( - MediaItemProviderId( - provider=self.id, - item_id=track_obj["id"], - quality=MediaQuality.LOSSY_OGG, - url=track_obj["external_urls"]["spotify"], - available=not track_obj["is_local"] and track_obj["is_playable"], - ) - ) - return track - - async def _parse_playlist(self, playlist_obj): - """Parse spotify playlist object to generic layout.""" - playlist = Playlist( - item_id=playlist_obj["id"], - provider=self.id, - name=playlist_obj["name"], - owner=playlist_obj["owner"]["display_name"], - ) - playlist.add_provider_id( - MediaItemProviderId( - provider=self.id, - item_id=playlist_obj["id"], - url=playlist_obj["external_urls"]["spotify"], - ) - ) - playlist.is_editable = ( - playlist_obj["owner"]["id"] == self._sp_user["id"] - or playlist_obj["collaborative"] - ) - if playlist_obj.get("images"): - playlist.metadata.images = { - MediaItemImage(ImageType.THUMB, playlist_obj["images"][0]["url"]) - } - playlist.metadata.checksum = str(playlist_obj["snapshot_id"]) - return playlist - - async def get_token(self): - """Get auth token on spotify.""" - # return existing token if we have one in memory - if ( - self._auth_token - and os.path.isdir(CACHE_DIR) - and (self._auth_token["expiresAt"] > int(time.time()) + 20) - ): - return self._auth_token - tokeninfo = {} - if not self._username or not self._password: - return tokeninfo - # retrieve token with librespot - tokeninfo = await self._get_token() - if tokeninfo: - self._auth_token = tokeninfo - self._sp_user = await self._get_data("me") - self.mass.metadata.preferred_language = self._sp_user["country"] - self.logger.info( - "Succesfully logged in to Spotify as %s", self._sp_user["id"] - ) - self._auth_token = tokeninfo - else: - self.logger.error("Login failed for user %s", self._username) - return tokeninfo - - async def _get_token(self): - """Get spotify auth token with librespot bin.""" - # authorize with username and password (NOTE: this can also be Spotify Connect) - args = [ - await self.get_librespot_binary(), - "-O", - "-c", - CACHE_DIR, - "-a", - "-u", - self._username, - "-p", - self._password, - ] - librespot = await asyncio.create_subprocess_exec(*args) - await librespot.wait() - # get token with (authorized) librespot - scopes = [ - "user-read-playback-state", - "user-read-currently-playing", - "user-modify-playback-state", - "playlist-read-private", - "playlist-read-collaborative", - "playlist-modify-public", - "playlist-modify-private", - "user-follow-modify", - "user-follow-read", - "user-library-read", - "user-library-modify", - "user-read-private", - "user-read-email", - "user-read-birthdate", - "user-top-read", - ] - scope = ",".join(scopes) - args = [ - await self.get_librespot_binary(), - "-O", - "-t", - "--client-id", - app_var(2), - "--scope", - scope, - "-c", - CACHE_DIR, - ] - librespot = await asyncio.create_subprocess_exec( - *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT - ) - stdout, _ = await librespot.communicate() - try: - result = json.loads(stdout) - except JSONDecodeError: - self.logger.warning( - "Error while retrieving Spotify token, details: %s", stdout - ) - return None - # transform token info to spotipy compatible format - if result and "accessToken" in result: - tokeninfo = result - tokeninfo["expiresAt"] = tokeninfo["expiresIn"] + int(time.time()) - return tokeninfo - return None - - @use_cache(3600 * 24) - async def _get_all_items(self, endpoint, key="items", **kwargs): - """Get all items from a paged list.""" - limit = 50 - offset = 0 - all_items = [] - while True: - kwargs["limit"] = limit - kwargs["offset"] = offset - result = await self._get_data(endpoint, skip_cache=True, **kwargs) - offset += limit - if not result or key not in result or not result[key]: - break - all_items += result[key] - if len(result[key]) < limit: - break - return all_items - - @use_cache(3600 * 2) - async def _get_data(self, endpoint, **kwargs): - """Get data from api.""" - url = f"https://api.spotify.com/v1/{endpoint}" - kwargs["market"] = "from_token" - kwargs["country"] = "from_token" - token = await self.get_token() - if not token: - return None - headers = {"Authorization": f'Bearer {token["accessToken"]}'} - async with self._throttler: - async with self.mass.http_session.get( - url, headers=headers, params=kwargs, verify_ssl=False - ) as response: - try: - result = await response.json() - if "error" in result or ( - "status" in result and "error" in result["status"] - ): - self.logger.error("%s - %s", endpoint, result) - return None - except ( - aiohttp.ContentTypeError, - JSONDecodeError, - ) as err: - self.logger.error("%s - %s", endpoint, str(err)) - return None - return result - - async def _delete_data(self, endpoint, data=None, **kwargs): - """Delete data from api.""" - url = f"https://api.spotify.com/v1/{endpoint}" - token = await self.get_token() - if not token: - return None - headers = {"Authorization": f'Bearer {token["accessToken"]}'} - async with self.mass.http_session.delete( - url, headers=headers, params=kwargs, json=data, verify_ssl=False - ) as response: - return await response.text() - - async def _put_data(self, endpoint, data=None, **kwargs): - """Put data on api.""" - url = f"https://api.spotify.com/v1/{endpoint}" - token = await self.get_token() - if not token: - return None - headers = {"Authorization": f'Bearer {token["accessToken"]}'} - async with self.mass.http_session.put( - url, headers=headers, params=kwargs, json=data, verify_ssl=False - ) as response: - return await response.text() - - async def _post_data(self, endpoint, data=None, **kwargs): - """Post data on api.""" - url = f"https://api.spotify.com/v1/{endpoint}" - token = await self.get_token() - if not token: - return None - headers = {"Authorization": f'Bearer {token["accessToken"]}'} - async with self.mass.http_session.post( - url, headers=headers, params=kwargs, json=data, verify_ssl=False - ) as response: - return await response.text() - - async def get_librespot_binary(self): - """Find the correct librespot binary belonging to the platform.""" - if self._librespot_bin is not None: - return self._librespot_bin - - async def check_librespot(librespot_path: str) -> str | None: - try: - librespot = await asyncio.create_subprocess_exec( - *[librespot_path, "-V"], stdout=asyncio.subprocess.PIPE - ) - stdout, _ = await librespot.communicate() - if librespot.returncode == 0 and b"librespot" in stdout: - self._librespot_bin = librespot_path - return librespot_path - except OSError: - return None - - base_path = os.path.join(os.path.dirname(__file__), "librespot") - if platform.system() == "Windows": - if librespot := await check_librespot( - os.path.join(base_path, "windows", "librespot.exe") - ): - return librespot - if platform.system() == "Darwin": - # macos binary is x86_64 intel - if librespot := await check_librespot( - os.path.join(base_path, "osx", "librespot") - ): - return librespot - - if platform.system() == "Linux": - architecture = platform.machine() - if architecture in ["AMD64", "x86_64"]: - # generic linux x86_64 binary - if librespot := await check_librespot( - os.path.join( - base_path, - "linux", - "librespot-x86_64", - ) - ): - return librespot - - # arm architecture... try all options one by one... - for arch in ["aarch64", "armv7", "armhf", "arm"]: - if librespot := await check_librespot( - os.path.join( - base_path, - "linux", - f"librespot-{arch}", - ) - ): - return librespot - - raise RuntimeError( - f"Unable to locate Libespot for platform {platform.system()}" - ) diff --git a/music_assistant/providers/spotify/librespot/linux/librespot-aarch64 b/music_assistant/providers/spotify/librespot/linux/librespot-aarch64 deleted file mode 100755 index 5359098f..00000000 Binary files a/music_assistant/providers/spotify/librespot/linux/librespot-aarch64 and /dev/null differ diff --git a/music_assistant/providers/spotify/librespot/linux/librespot-arm b/music_assistant/providers/spotify/librespot/linux/librespot-arm deleted file mode 100755 index 5cd38c75..00000000 Binary files a/music_assistant/providers/spotify/librespot/linux/librespot-arm and /dev/null differ diff --git a/music_assistant/providers/spotify/librespot/linux/librespot-armhf b/music_assistant/providers/spotify/librespot/linux/librespot-armhf deleted file mode 100755 index 18c2e05b..00000000 Binary files a/music_assistant/providers/spotify/librespot/linux/librespot-armhf and /dev/null differ diff --git a/music_assistant/providers/spotify/librespot/linux/librespot-armv7 b/music_assistant/providers/spotify/librespot/linux/librespot-armv7 deleted file mode 100755 index 0a792b2e..00000000 Binary files a/music_assistant/providers/spotify/librespot/linux/librespot-armv7 and /dev/null differ diff --git a/music_assistant/providers/spotify/librespot/linux/librespot-x86_64 b/music_assistant/providers/spotify/librespot/linux/librespot-x86_64 deleted file mode 100755 index e025abdb..00000000 Binary files a/music_assistant/providers/spotify/librespot/linux/librespot-x86_64 and /dev/null differ diff --git a/music_assistant/providers/spotify/librespot/osx/librespot b/music_assistant/providers/spotify/librespot/osx/librespot deleted file mode 100755 index c1b37543..00000000 Binary files a/music_assistant/providers/spotify/librespot/osx/librespot and /dev/null differ diff --git a/music_assistant/providers/spotify/librespot/windows/librespot.exe b/music_assistant/providers/spotify/librespot/windows/librespot.exe deleted file mode 100755 index a973f4e1..00000000 Binary files a/music_assistant/providers/spotify/librespot/windows/librespot.exe and /dev/null differ diff --git a/music_assistant/providers/tunein.py b/music_assistant/providers/tunein.py deleted file mode 100644 index a79aa712..00000000 --- a/music_assistant/providers/tunein.py +++ /dev/null @@ -1,181 +0,0 @@ -"""Tune-In musicprovider support for MusicAssistant.""" -from __future__ import annotations - -from typing import List, Optional - -from asyncio_throttle import Throttler - -from music_assistant.helpers.cache import use_cache -from music_assistant.helpers.util import create_sort_name -from music_assistant.models.media_items import ( - ContentType, - ImageType, - MediaItemImage, - MediaItemProviderId, - MediaItemType, - MediaQuality, - MediaType, - Radio, - StreamDetails, - StreamType, -) -from music_assistant.models.provider import MusicProvider - - -class TuneInProvider(MusicProvider): - """Provider implementation for Tune In.""" - - def __init__(self, username: Optional[str]) -> None: - """Initialize the provider.""" - self._attr_id = "tunein" - self._attr_name = "Tune-in Radio" - self._attr_supported_mediatypes = [MediaType.RADIO] - self._username = username - self._throttler = Throttler(rate_limit=1, period=1) - - async def setup(self) -> None: - """Handle async initialization of the provider.""" - # we have nothing to setup - - async def search( - self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5 - ) -> List[MediaItemType]: - """ - Perform search on musicprovider. - - :param search_query: Search query. - :param media_types: A list of media_types to include. All types if None. - :param limit: Number of items to return in the search (per type). - """ - result = [] - # TODO: search for radio stations - return result - - async def get_library_radios(self) -> List[Radio]: - """Retrieve library/subscribed radio stations from the provider.""" - - async def parse_items(items: List[dict], folder: str = None) -> List[Radio]: - result = [] - for item in items: - item_type = item.get("type", "") - if item_type == "audio": - if "preset_id" not in item: - continue - # each radio station can have multiple streams add each one as different quality - stream_info = await self.__get_data( - "Tune.ashx", id=item["preset_id"] - ) - for stream in stream_info["body"]: - result.append(await self._parse_radio(item, stream, folder)) - elif item_type == "link": - # stations are in sublevel (new style) - if sublevel := await self.__get_data(item["URL"], render="json"): - result += await parse_items(sublevel["body"], item["text"]) - elif item.get("children"): - # stations are in sublevel (old style ?) - result += await parse_items(item["children"], item["text"]) - return result - - data = await self.__get_data("Browse.ashx", c="presets") - if data and "body" in data: - return await parse_items(data["body"]) - return [] - - async def get_radio(self, prov_radio_id: str) -> Radio: - """Get radio station details.""" - prov_radio_id, media_type = prov_radio_id.split("--", 1) - params = {"c": "composite", "detail": "listing", "id": prov_radio_id} - result = await self.__get_data("Describe.ashx", **params) - if result and result.get("body") and result["body"][0].get("children"): - item = result["body"][0]["children"][0] - stream_info = await self.__get_data("Tune.ashx", id=prov_radio_id) - for stream in stream_info["body"]: - if stream["media_type"] != media_type: - continue - return await self._parse_radio(item, stream) - return None - - async def _parse_radio( - self, details: dict, stream: dict, folder: Optional[str] = None - ) -> Radio: - """Parse Radio object from json obj returned from api.""" - if "name" in details: - name = details["name"] - else: - # parse name from text attr - name = details["text"] - if " | " in name: - name = name.split(" | ")[1] - name = name.split(" (")[0] - item_id = f'{details["preset_id"]}--{stream["media_type"]}' - radio = Radio(item_id=item_id, provider=self.id, name=name) - if stream["media_type"] == "aac": - quality = MediaQuality.LOSSY_AAC - elif stream["media_type"] == "ogg": - quality = MediaQuality.LOSSY_OGG - else: - quality = MediaQuality.LOSSY_MP3 - radio.add_provider_id( - MediaItemProviderId( - provider=self.id, - item_id=item_id, - quality=quality, - details=stream["url"], - ) - ) - # preset number is used for sorting (not present at stream time) - preset_number = details.get("preset_number") - if preset_number and folder: - radio.sort_name = f'{folder}-{details["preset_number"]}' - elif preset_number: - radio.sort_name = details["preset_number"] - radio.sort_name += create_sort_name(name) - if "text" in details: - radio.metadata.description = details["text"] - # images - if img := details.get("image"): - radio.metadata.images = {MediaItemImage(ImageType.THUMB, img)} - if img := details.get("logo"): - radio.metadata.images = {MediaItemImage(ImageType.LOGO, img)} - return radio - - async def get_stream_details(self, item_id: str) -> StreamDetails: - """Get streamdetails for a radio station.""" - item_id, media_type = item_id.split("--", 1) - stream_info = await self.__get_data("Tune.ashx", id=item_id) - for stream in stream_info["body"]: - if stream["media_type"] == media_type: - return StreamDetails( - type=StreamType.URL, - item_id=item_id, - provider=self.id, - path=stream["url"], - content_type=ContentType(stream["media_type"]), - sample_rate=44100, - bit_depth=16, - media_type=MediaType.RADIO, - details=stream, - ) - return None - - @use_cache(3600 * 2) - async def __get_data(self, endpoint: str, **kwargs): - """Get data from api.""" - if endpoint.startswith("http"): - url = endpoint - else: - url = f"https://opml.radiotime.com/{endpoint}" - kwargs["formats"] = "ogg,aac,wma,mp3" - kwargs["username"] = self._username - kwargs["partnerId"] = "1" - kwargs["render"] = "json" - async with self._throttler: - async with self.mass.http_session.get( - url, params=kwargs, verify_ssl=False - ) as response: - result = await response.json() - if not result or "error" in result: - self.logger.error(url) - self.logger.error(kwargs) - result = None - return result