def get_image_url(self, image: MediaItemImage) -> str:
"""Get (proxied) URL for MediaItemImage."""
- if image.provider != "url":
- # return imageproxy url for images that need to be resolved
- # the original path is double encoded
- encoded_url = urllib.parse.quote(urllib.parse.quote(image.path))
- return f"{self.server_info.base_url}/imageproxy?path={encoded_url}&provider={image.provider}" # noqa: E501
- return image.path
+ if image.remotely_accessible:
+ return image.path
+ # return imageproxy url for images that need to be resolved
+ # the original path is double encoded
+ encoded_url = urllib.parse.quote(urllib.parse.quote(image.path))
+ return (
+ f"{self.server_info.base_url}/imageproxy?path={encoded_url}&provider={image.provider}"
+ )
def subscribe(
self,
media_type_str = uri.split("/")[3]
media_type = MediaType(media_type_str)
item_id = uri.split("/")[4].split("?")[0]
+ elif uri.startswith("https://tidal.com/browse/"):
+ # Tidal public share URL
+ # https://tidal.com/browse/track/123456
+ provider_instance_id_or_domain = "tidal"
+ media_type_str = uri.split("/")[4]
+ media_type = MediaType(media_type_str)
+ item_id = uri.split("/")[5].split("?")[0]
elif uri.startswith(("http://", "https://", "rtsp://", "rtmp://")):
- # Translate a plain URL to the URL provider
- provider_instance_id_or_domain = "url"
+ # Translate a plain URL to the builtin provider
+ provider_instance_id_or_domain = "builtin"
media_type = MediaType.UNKNOWN
item_id = uri
elif "://" in uri and len(uri.split("/")) >= 4:
provider_instance_id_or_domain, media_type_str, item_id = uri.split(":")
media_type = MediaType(media_type_str)
elif "/" in uri and await asyncio.to_thread(os.path.isfile, uri):
- # Translate a local file (which is not from file provider) to the URL provider
- provider_instance_id_or_domain = "url"
+ # Translate a local file (which is not from a file provider!) to the builtin provider
+ provider_instance_id_or_domain = "builtin"
media_type = MediaType.UNKNOWN
item_id = uri
else:
type: ImageType
path: str
- # set to instance_id of provider if the path needs to be resolved
- # if the path is just a plain (remotely accessible) URL, set it to 'url'
- provider: str = "url"
+ provider: str
+ remotely_accessible: bool = False # url that is accessible from anywhere
def __hash__(self) -> int:
"""Return custom hash."""
performers: set[str] | None = None
preview: str | None = None
popularity: int | None = None
+ # cache_checksum: optional value to (in)validate cache / detect changes (used for playlists)
+ cache_checksum: str | None = None
# last_refresh: timestamp the (full) metadata was last collected
last_refresh: int | None = None
- # checksum: optional value to detect changes (e.g. playlists)
- checksum: str | None = None
def update(
self,
elif isinstance(cur_val, set) and isinstance(new_val, list):
new_val = cur_val.update(new_val)
setattr(self, fld.name, new_val)
- elif new_val and fld.name in ("checksum", "popularity", "last_refresh"):
+ elif new_val and fld.name in ("popularity", "last_refresh", "cache_checksum"):
# some fields are always allowed to be overwritten
# (such as checksum and last_refresh)
setattr(self, fld.name, new_val)
supported_features: list[ProviderFeature]
available: bool
icon: str | None
+ is_streaming_provider: bool | None = None # music providers only
@dataclass
ANNOUNCE_ALERT_FILE: Final[str] = str(RESOURCES_DIR.joinpath("announce.mp3"))
SILENCE_FILE: Final[str] = str(RESOURCES_DIR.joinpath("silence.mp3"))
+VARIOUS_ARTISTS_FANART: Final[str] = str(RESOURCES_DIR.joinpath("fallback_fanart.jpeg"))
+MASS_LOGO: Final[str] = str(RESOURCES_DIR.joinpath("logo.png"))
# if duration is None (e.g. radio stream):Final[str] = 48 hours
FALLBACK_DURATION: Final[int] = 172800
subkeys = key.split("/")
for index, subkey in enumerate(subkeys):
if index == (len(subkeys) - 1):
- cur_value = parent.get(subkey)
- if cur_value == value:
- # no need to save if value did not change
- return
parent[subkey] = value
- self.save()
else:
parent.setdefault(subkey, {})
parent = parent[subkey]
+ self.save()
def set_default(self, key: str, default_value: Any) -> None:
"""Set default value(s) for a specific key/path in persistent storage."""
full_album = await self.get_provider_item(item_id, provider_instance_id_or_domain)
# prefer cache items (if any) for streaming providers only
cache_key = f"{prov.instance_id}.albumtracks.{item_id}"
- if isinstance(full_album, ItemMapping):
- cache_checksum = None
- else:
- cache_checksum = full_album.metadata.checksum
- if prov.is_streaming_provider and (
- cache := await self.mass.cache.get(cache_key, checksum=cache_checksum)
- ):
+ if prov.is_streaming_provider and (cache := await self.mass.cache.get(cache_key)):
return [AlbumTrack.from_dict(x) for x in cache]
# no items in cache - get listing from provider
items = []
items.append(track)
# store (serializable items) in cache
if prov.is_streaming_provider:
- self.mass.create_task(
- self.mass.cache.set(
- cache_key, [x.to_dict() for x in items], checksum=cache_checksum
- )
- )
+ self.mass.create_task(self.mass.cache.set(cache_key, [x.to_dict() for x in items]))
return items
async def _get_provider_dynamic_tracks(
"""Return top tracks for an artist on given provider."""
items = []
assert provider_instance_id_or_domain != "library"
- artist = await self.get(item_id, provider_instance_id_or_domain, add_to_library=False)
- cache_checksum = artist.metadata.checksum
prov = self.mass.get_provider(provider_instance_id_or_domain)
if prov is None:
return []
- # prefer cache items (if any)
+ # prefer cache items (if any) - for streaming providers
cache_key = f"{prov.instance_id}.artist_toptracks.{item_id}"
- if cache := await self.mass.cache.get(cache_key, checksum=cache_checksum):
+ if prov.is_streaming_provider and (cache := await self.mass.cache.get(cache_key)):
return [Track.from_dict(x) for x in cache]
# no items in cache - get listing from provider
if ProviderFeature.ARTIST_TOPTRACKS in prov.supported_features:
paged_list = await self.mass.music.tracks.library_items(extra_query=query)
return paged_list.items
# store (serializable items) in cache
- self.mass.create_task(
- self.mass.cache.set(cache_key, [x.to_dict() for x in items], checksum=cache_checksum)
- )
+ if prov.is_streaming_provider:
+ self.mass.create_task(self.mass.cache.set(cache_key, [x.to_dict() for x in items]))
return items
async def get_library_artist_tracks(
"""Return albums for an artist on given provider."""
items = []
assert provider_instance_id_or_domain != "library"
- artist = await self.get_provider_item(item_id, provider_instance_id_or_domain)
- cache_checksum = artist.metadata.checksum
prov = self.mass.get_provider(provider_instance_id_or_domain)
if prov is None:
return []
# prefer cache items (if any)
cache_key = f"{prov.instance_id}.artist_albums.{item_id}"
- if cache := await self.mass.cache.get(cache_key, checksum=cache_checksum):
+ if prov.is_streaming_provider and (cache := await self.mass.cache.get(cache_key)):
return [Album.from_dict(x) for x in cache]
# no items in cache - get listing from provider
if ProviderFeature.ARTIST_ALBUMS in prov.supported_features:
paged_list = await self.mass.music.albums.library_items(extra_query=query)
return paged_list.items
# store (serializable items) in cache
- self.mass.create_task(
- self.mass.cache.set(cache_key, [x.to_dict() for x in items], checksum=cache_checksum)
- )
+ if prov.is_streaming_provider:
+ self.mass.create_task(self.mass.cache.set(cache_key, [x.to_dict() for x in items]))
return items
async def get_library_artist_albums(
if provider := self.mass.get_provider(provider_instance_id_or_domain):
with suppress(MediaNotFoundError):
if item := await provider.get_item(self.media_type, item_id):
- await self.mass.cache.set(cache_key, item.to_dict())
+ if item.metadata.cache_checksum != "no_cache":
+ await self.mass.cache.set(cache_key, item.to_dict())
return item
# if we reach this point all possibilities failed and the item could not be found.
# There is a possibility that the (streaming) provider changed the id of the item
import asyncio
import random
-import time
from collections.abc import AsyncGenerator
from typing import Any
from music_assistant.common.helpers.datetime import utc_timestamp
from music_assistant.common.helpers.json import serialize_to_json
-from music_assistant.common.helpers.uri import create_uri
+from music_assistant.common.helpers.uri import create_uri, parse_uri
from music_assistant.common.models.enums import EventType, MediaType, ProviderFeature
from music_assistant.common.models.errors import (
InvalidDataError,
) -> AsyncGenerator[PlaylistTrack, None]:
"""Return playlist tracks for the given provider playlist id."""
playlist = await self.get(
- item_id, provider_instance_id_or_domain, force_refresh=force_refresh
+ item_id,
+ provider_instance_id_or_domain,
+ force_refresh=force_refresh,
)
prov = next(x for x in playlist.provider_mappings)
async for track in self._get_provider_playlist_tracks(
prov.item_id,
prov.provider_instance,
- cache_checksum=(str(time.time()) if force_refresh else playlist.metadata.checksum),
+ cache_checksum=playlist.metadata.cache_checksum,
):
yield track
self, name: str, provider_instance_or_domain: str | None = None
) -> Playlist:
"""Create new playlist."""
- # if provider is omitted, just pick first provider
+ # if provider is omitted, just pick builtin provider
if provider_instance_or_domain:
provider = self.mass.get_provider(provider_instance_or_domain)
+ if provider is None:
+ raise ProviderUnavailableError
else:
- provider = next(
- (
- x
- for x in self.mass.music.providers
- if ProviderFeature.PLAYLIST_CREATE in x.supported_features
- ),
- None,
- )
- if provider is None:
- msg = "No provider available which allows playlists creation."
- raise ProviderUnavailableError(msg)
+ provider = self.mass.get_provider("builtin")
# create playlist on the provider
playlist = await provider.create_playlist(name)
# add the new playlist to the library
- return await self.add_item_to_library(playlist, True)
+ return await self.add_item_to_library(playlist, False)
- async def add_playlist_tracks(self, db_playlist_id: str | int, uris: list[str]) -> None:
- """Add multiple tracks to playlist. Creates background tasks to process the action."""
+ async def add_playlist_tracks(self, db_playlist_id: str | int, uris: list[str]) -> None: # noqa: PLR0915
+ """Add tracks to playlist."""
db_id = int(db_playlist_id) # ensure integer
playlist = await self.get_library_item(db_id)
if not playlist:
if not playlist.is_editable:
msg = f"Playlist {playlist.name} is not editable"
raise InvalidDataError(msg)
- for uri in uris:
- self.mass.create_task(self.add_playlist_track(db_id, uri))
- async def add_playlist_track(self, db_playlist_id: str | int, track_uri: str) -> None:
- """Add track to playlist - make sure we dont add duplicates."""
- db_id = int(db_playlist_id) # ensure integer
- # we can only edit playlists that are in the database (marked as editable)
- playlist = await self.get_library_item(db_id)
- if not playlist:
- msg = f"Playlist with id {db_id} not found"
- raise MediaNotFoundError(msg)
- if not playlist.is_editable:
- msg = f"Playlist {playlist.name} is not editable"
- raise InvalidDataError(msg)
- # make sure we have recent full track details
- track = await self.mass.music.get_item_by_uri(track_uri)
- assert track.media_type == MediaType.TRACK
- # a playlist can only have one provider (for now)
- playlist_prov = next(iter(playlist.provider_mappings))
# grab all existing track ids in the playlist so we can check for duplicates
+ playlist_prov_map = next(iter(playlist.provider_mappings))
+ playlist_prov = self.mass.get_provider(playlist_prov_map.provider_instance)
+ if not playlist_prov or not playlist_prov.available:
+ msg = f"Provider {playlist_prov_map.provider_instance} is not available"
+ raise ProviderUnavailableError(msg)
cur_playlist_track_ids = set()
- count = 0
- async for item in self.tracks(playlist_prov.item_id, playlist_prov.provider_instance):
- count += 1
- cur_playlist_track_ids.update(
- {
- i.item_id
- for i in item.provider_mappings
- if i.provider_instance == playlist_prov.provider_instance
- }
- )
- await asyncio.sleep(0) # yield to eventloop
- # check for duplicates
- for track_prov in track.provider_mappings:
- if (
- track_prov.provider_domain == playlist_prov.provider_domain
- and track_prov.item_id in cur_playlist_track_ids
- ):
- msg = "Track already exists in playlist {playlist.name}"
- raise InvalidDataError(msg)
- # add track to playlist
- # we can only add a track to a provider playlist if track is available on that provider
- # a track can contain multiple versions on the same provider
- # simply sort by quality and just add the first one (assuming track is still available)
- track_id_to_add = None
- for track_version in sorted(track.provider_mappings, key=lambda x: x.quality, reverse=True):
- if not track.available:
+ cur_playlist_track_uris = set()
+ async for item in self.tracks(playlist_prov_map.item_id, playlist_prov.instance_id):
+ cur_playlist_track_uris.add(item.item_id)
+ cur_playlist_track_uris.add(item.uri)
+
+ # work out the track id's that need to be added
+ # filter out duplicates and items that not exist on the provider.
+ ids_to_add: set[str] = set()
+ for uri in uris:
+ # skip if item already in the playlist
+ if uri in cur_playlist_track_uris:
continue
- if playlist_prov.provider_domain.startswith("filesystem"):
- # the file provider can handle uri's from all providers so simply add the uri
- track_id_to_add = track_version.url or create_uri(
- MediaType.TRACK,
- track_version.provider_instance,
- track_version.item_id,
+
+ # parse uri for further processing
+ media_type, provider_instance_id_or_domain, item_id = await parse_uri(uri)
+
+ # skip if item already in the playlist
+ if item_id in cur_playlist_track_ids:
+ continue
+
+ # skip non-track items
+ # TODO: revisit this once we support audiobooks and podcasts ?
+ if media_type != MediaType.TRACK:
+ continue
+
+ # special: the builtin provider can handle uri's from all providers (with uri as id)
+ if provider_instance_id_or_domain != "library" and playlist_prov.domain == "builtin":
+ # note: we try not to add library uri's to the builtin playlists
+ # so we can survive db rebuilds
+ ids_to_add.add(uri)
+ continue
+
+ # handle library uri (we need to fully unwrap it)
+ if provider_instance_id_or_domain == "library":
+ # library item, fetch full object
+ db_track = await self.mass.music.tracks.get_library_item(item_id)
+ # a track can contain multiple versions on the same provider
+ # simply sort by quality and just add the first available version
+ for track_version in sorted(
+ db_track.provider_mappings, key=lambda x: x.quality, reverse=True
+ ):
+ if not track_version.available:
+ continue
+ if track_version.item_id in cur_playlist_track_ids:
+ break # already existing in the playlist
+ item_prov = self.mass.get_provider(track_version.provider_instance)
+ if not item_prov:
+ continue
+ track_version_uri = create_uri(
+ MediaType.TRACK,
+ item_prov.lookup_key,
+ track_version.item_id,
+ )
+ if track_version_uri in cur_playlist_track_uris:
+ break # already existing in the playlist
+ if playlist_prov.domain == "builtin":
+ # the builtin provider can handle uri's from all providers (with uri as id)
+ ids_to_add.add(track_version_uri)
+ break
+ if item_prov.lookup_key == playlist_prov.lookup_key:
+ ids_to_add.add(track_version.item_id)
+ break
+ continue
+
+ # all other: if target playlist is an exact provider match, we can add it
+ item_prov = self.mass.get_provider(provider_instance_id_or_domain)
+ if not item_prov or not item_prov.available:
+ self.logger.warning(
+ "Skip adding uri %s to playlist: Provider %s is not available",
+ uri,
+ provider_instance_id_or_domain,
)
- break
- if track_version.provider_domain == playlist_prov.provider_domain:
- track_id_to_add = track_version.item_id
- break
- if not track_id_to_add:
- msg = f"Track is not available on provider {playlist_prov.provider_domain}"
- raise MediaNotFoundError(msg)
+ continue
+ if item_prov.lookup_key == playlist_prov.lookup_key:
+ ids_to_add.add(item_id)
+
# actually add the tracks to the playlist on the provider
- provider = self.mass.get_provider(playlist_prov.provider_instance)
- await provider.add_playlist_tracks(playlist_prov.item_id, [track_id_to_add])
- # invalidate cache by updating the checksum
- await self.get(db_id, "library", force_refresh=True)
+ await playlist_prov.add_playlist_tracks(playlist_prov_map.item_id, list(ids_to_add))
+ # invalidate cache so tracks get refreshed
+ cache_key = f"{playlist_prov.instance_id}.playlist.{playlist_prov_map.item_id}.tracks"
+ await self.mass.cache.delete(cache_key)
+
+ async def add_playlist_track(self, db_playlist_id: str | int, track_uri: str) -> None:
+ """Add (single) track to playlist."""
+ await self.add_playlist_tracks(db_playlist_id, [track_uri])
async def remove_playlist_tracks(
self, db_playlist_id: str | int, positions_to_remove: tuple[int, ...]
)
continue
await provider.remove_playlist_tracks(prov_mapping.item_id, positions_to_remove)
- # invalidate cache by updating the checksum
- await self.get(db_id, "library", force_refresh=True)
+ # invalidate cache so tracks get refreshed
+ cache_key = f"{provider.instance_id}.playlist.{prov_mapping.item_id}.tracks"
+ await self.mass.cache.delete(cache_key)
async def _add_library_item(self, item: Playlist) -> Playlist:
"""Add a new record to the database."""
yield item
all_items.append(item)
# store (serializable items) in cache
- self.mass.create_task(
- self.mass.cache.set(
- cache_key, [x.to_dict() for x in all_items], checksum=cache_checksum
+ if cache_checksum != "no_cache":
+ self.mass.create_task(
+ self.mass.cache.set(
+ cache_key, [x.to_dict() for x in all_items], checksum=cache_checksum
+ )
)
- )
async def _get_provider_dynamic_tracks(
self,
from __future__ import annotations
import asyncio
+import logging
import os
import urllib.parse
from base64 import b64encode
+from collections.abc import Iterable
from contextlib import suppress
from time import time
from typing import TYPE_CHECKING, cast
Radio,
Track,
)
-from music_assistant.constants import CONF_LANGUAGE, VARIOUS_ARTISTS_ID_MBID, VARIOUS_ARTISTS_NAME
+from music_assistant.constants import (
+ CONF_LANGUAGE,
+ VARIOUS_ARTISTS_ID_MBID,
+ VARIOUS_ARTISTS_NAME,
+ VERBOSE_LOG_LEVEL,
+)
from music_assistant.server.helpers.api import api_command
from music_assistant.server.helpers.compare import compare_strings
from music_assistant.server.helpers.images import create_collage, get_image_thumb
async def setup(self, config: CoreConfig) -> None:
"""Async initialize of module."""
+ if not self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
+ # silence PIL logger
+ logging.getLogger("PIL").setLevel(logging.WARNING)
+ # make sure that our directory with collage images exists
+ self._collage_images_dir = os.path.join(self.mass.storage_path, "collage_images")
+ if not await asyncio.to_thread(os.path.exists, self._collage_images_dir):
+ await asyncio.to_thread(os.mkdir, self._collage_images_dir)
+
self.mass.streams.register_dynamic_route("/imageproxy", self.handle_imageproxy)
async def close(self) -> None:
async def get_playlist_metadata(self, playlist: Playlist) -> None:
"""Get/update rich metadata for a playlist."""
- # retrieve genres from tracks
- # TODO: retrieve style/mood ?
playlist.metadata.genres = set()
- images = set()
- try:
- playlist_genres: dict[str, int] = {}
- async for track in self.mass.music.playlists.tracks(
- playlist.item_id, playlist.provider
+ all_playlist_tracks_images = set()
+ playlist_genres: dict[str, int] = {}
+ # retrieve metedata for the playlist from the tracks (such as genres etc.)
+ # TODO: retrieve style/mood ?
+ async for track in self.mass.music.playlists.tracks(playlist.item_id, playlist.provider):
+ if track.image:
+ all_playlist_tracks_images.add(track.image)
+ if track.metadata.genres:
+ genres = track.metadata.genres
+ elif track.album and isinstance(track.album, Album) and track.album.metadata.genres:
+ genres = track.album.metadata.genres
+ else:
+ genres = set()
+ for genre in genres:
+ if genre not in playlist_genres:
+ playlist_genres[genre] = 0
+ playlist_genres[genre] += 1
+ await asyncio.sleep(0) # yield to eventloop
+
+ playlist_genres_filtered = {genre for genre, count in playlist_genres.items() if count > 5}
+ playlist.metadata.genres.update(playlist_genres_filtered)
+ # create collage images
+ cur_images = playlist.metadata.images or []
+ new_images = []
+ thumb_image = next((x for x in cur_images if x.type == ImageType.THUMB), None)
+ if not thumb_image or self._collage_images_dir in thumb_image.path:
+ thumb_image_path = (
+ thumb_image.path
+ if thumb_image
+ else os.path.join(self._collage_images_dir, f"{uuid4().hex}_thumb.jpg")
+ )
+ if collage_thumb_image := await self.create_collage_image(
+ all_playlist_tracks_images, thumb_image_path
):
- if track.image:
- images.add(track.image)
- if track.media_type != MediaType.TRACK:
- # filter out radio items
- continue
- if not isinstance(track, Track):
- continue
- if track.metadata.genres:
- genres = track.metadata.genres
- elif track.album and isinstance(track.album, Album) and track.album.metadata.genres:
- genres = track.album.metadata.genres
- else:
- genres = set()
- for genre in genres:
- if genre not in playlist_genres:
- playlist_genres[genre] = 0
- playlist_genres[genre] += 1
- await asyncio.sleep(0) # yield to eventloop
-
- playlist_genres_filtered = {
- genre for genre, count in playlist_genres.items() if count > 5
- }
- playlist.metadata.genres.update(playlist_genres_filtered)
-
- # create collage thumb/fanart from playlist tracks
- # if playlist has no default image (e.g. a local playlist)
- if images and (not playlist.image or playlist.image.provider != "url"):
- if playlist.image and self.mass.storage_path in playlist.image.path:
- # reuse previous created path
- img_path = playlist.image.path
- else:
- img_path = os.path.join(self.mass.storage_path, f"{uuid4().hex}.png")
- img_data = await create_collage(self.mass, list(images))
- async with aiofiles.open(img_path, "wb") as _file:
- await _file.write(img_data)
- playlist.metadata.images = [
- MediaItemImage(type=ImageType.THUMB, path=img_path, provider="file")
- ]
- except Exception as err:
- self.logger.warning(
- "Error while creating playlist image: %s",
- str(err),
- exc_info=err if self.logger.isEnabledFor(10) else None,
+ new_images.append(collage_thumb_image)
+ elif thumb_image:
+ # just use old image
+ new_images.append(thumb_image)
+ fanart_image = next((x for x in cur_images if x.type == ImageType.FANART), None)
+ if not fanart_image or self._collage_images_dir in fanart_image.path:
+ fanart_image_path = (
+ fanart_image.path
+ if fanart_image
+ else os.path.join(self._collage_images_dir, f"{uuid4().hex}_fanart.jpg")
)
+ if collage_fanart_image := await self.create_collage_image(
+ all_playlist_tracks_images, fanart_image_path, fanart=True
+ ):
+ new_images.append(collage_fanart_image)
+ elif fanart_image:
+ # just use old image
+ new_images.append(fanart_image)
+ playlist.metadata.images = new_images
# set timestamp, used to determine when this function was last called
playlist.metadata.last_refresh = int(time())
for img in media_item.metadata.images:
if img.type != img_type:
continue
- if img.provider != "url" and not resolve:
+ if img.remotely_accessible and not resolve:
continue
- if img.provider != "url" and resolve:
+ if img.remotely_accessible and resolve:
return self.get_image_url(img)
return img.path
image_format: str = "png",
) -> str:
"""Get (proxied) URL for MediaItemImage."""
- if image.provider != "url" or prefer_proxy or size:
+ if not image.remotely_accessible or prefer_proxy or size:
# return imageproxy url for images that need to be resolved
# the original path is double encoded
encoded_url = urllib.parse.quote(urllib.parse.quote(image.path))
async def get_thumbnail(
self,
path: str,
+ provider: str,
size: int | None = None,
- provider: str = "url",
base64: bool = False,
image_format: str = "png",
) -> bytes | str:
"""Get/create thumbnail image for path (image url or local path)."""
- if provider != "url" and not self.mass.get_provider(provider):
+ if not self.mass.get_provider(provider):
raise ProviderUnavailableError
thumbnail = await get_image_thumb(
self.mass, path, size=size, provider=provider, image_format=image_format
async def handle_imageproxy(self, request: web.Request) -> web.Response:
"""Handle request for image proxy."""
path = request.query["path"]
- provider = request.query.get("provider", "url")
+ provider = request.query.get("provider", "builtin")
+ if provider in ("url", "file"):
+ # temporary for backwards compatibility
+ provider = "builtin"
size = int(request.query.get("size", "0"))
image_format = request.query.get("fmt", "png")
- if provider != "url" and not self.mass.get_provider(provider):
+ if not self.mass.get_provider(provider):
return web.Response(status=404)
if "%" in path:
# assume (double) encoded url, decode it
path, size=size, provider=provider, image_format=image_format
)
# we set the cache header to 1 year (forever)
- # the client can use the checksum value to refresh when content changes
+ # assuming that images do not/rarely change
return web.Response(
body=image_data,
headers={"Cache-Control": "max-age=31536000"},
content_type=f"image/{image_format}",
)
return web.Response(status=404)
+
+ async def create_collage_image(
+ self,
+ images: Iterable[MediaItemImage],
+ img_path: str,
+ fanart: bool = False,
+ ) -> MediaItemImage | None:
+ """Create collage thumb/fanart image for (in-library) playlist."""
+ if len(images) < 8 and fanart or len(images) < 3:
+ # require at least some images otherwise this does not make a lot of sense
+ return None
+ try:
+ # create collage thumb from playlist tracks
+ # if playlist has no default image (e.g. a local playlist)
+ dimensions = (2500, 1750) if fanart else (1500, 1500)
+ img_data = await create_collage(self.mass, images, dimensions)
+ # always overwrite existing path
+ async with aiofiles.open(img_path, "wb") as _file:
+ await _file.write(img_data)
+ return MediaItemImage(
+ type=ImageType.FANART if fanart else ImageType.THUMB,
+ path=img_path,
+ provider="builtin",
+ remotely_accessible=False,
+ )
+ except Exception as err:
+ self.logger.warning(
+ "Error while creating playlist image: %s",
+ str(err),
+ exc_info=err if self.logger.isEnabledFor(10) else None,
+ )
+ return None
if provider_instance_id_or_domain == "database":
# backwards compatibility - to remove when 2.0 stable is released
provider_instance_id_or_domain = "library"
- if provider_instance_id_or_domain == "url":
- # handle special case of 'URL' MusicProvider which allows us to play regular url's
- return await self.mass.get_provider("url").parse_item(item_id)
+ if provider_instance_id_or_domain == "builtin":
+ # handle special case of 'builtin' MusicProvider which allows us to play regular url's
+ return await self.mass.get_provider("builtin").parse_item(item_id)
ctrl = self.get_controller(media_type)
return await ctrl.get(
item_id=item_id,
and provider.is_streaming_provider
and provider.library_edit_supported(item.media_type)
):
- await provider.library_add(item.item_id, item.media_type)
+ await provider.library_add(item)
# make sure we have a full library item
# a favorite must always be in the library
full_item = await self.get_item(
# add to provider's library first
provider = self.mass.get_provider(item.provider)
if provider.library_edit_supported(item.media_type):
- await provider.library_add(item.item_id, item.media_type)
+ await provider.library_add(item)
return await ctrl.get(
item_id=item.item_id,
provider_instance_id_or_domain=item.provider,
from __future__ import annotations
import asyncio
+import itertools
import random
+from collections.abc import Iterable
from io import BytesIO
from typing import TYPE_CHECKING
from PIL import Image
from music_assistant.server.helpers.tags import get_embedded_image
+from music_assistant.server.models.metadata_provider import MetadataProvider
if TYPE_CHECKING:
from music_assistant.common.models.media_items import MediaItemImage
async def get_image_data(mass: MusicAssistant, path_or_url: str, provider: str = "url") -> bytes:
"""Create thumbnail from image url."""
- if provider != "url" and (prov := mass.get_provider(provider)):
- prov: MusicProvider
+ if prov := mass.get_provider(provider):
+ prov: MusicProvider | MetadataProvider
if resolved_data := await prov.resolve_image(path_or_url):
if isinstance(resolved_data, bytes):
return resolved_data
) -> bytes:
"""Get (optimized) PNG thumbnail from image url."""
img_data = await get_image_data(mass, path_or_url, provider)
+ if not img_data:
+ raise FileNotFoundError(f"Image not found: {path_or_url}")
def _create_image():
data = BytesIO()
return await asyncio.to_thread(_create_image)
-async def create_collage(mass: MusicAssistant, images: list[MediaItemImage]) -> bytes:
+async def create_collage(
+ mass: MusicAssistant, images: Iterable[MediaItemImage], dimensions: tuple[int] = (1500, 1500)
+) -> bytes:
"""Create a basic collage image from multiple image urls."""
+ image_size = 250
def _new_collage():
- return Image.new("RGBA", (1500, 1500), color=(255, 255, 255, 255))
+ return Image.new("RGBA", (dimensions[0], dimensions[1]), color=(255, 255, 255, 255))
collage = await asyncio.to_thread(_new_collage)
def _add_to_collage(img_data: bytes, coord_x: int, coord_y: int) -> None:
data = BytesIO(img_data)
photo = Image.open(data).convert("RGBA")
- photo = photo.resize((500, 500))
+ photo = photo.resize((image_size, image_size))
collage.paste(photo, (coord_x, coord_y))
- for x_co in range(0, 1500, 500):
- for y_co in range(0, 1500, 500):
- img = random.choice(images)
- img_data = await get_image_data(mass, img.path, img.provider)
- await asyncio.to_thread(_add_to_collage, img_data, x_co, y_co)
+ # prevent duplicates with a set
+ images = list(set(images))
+ random.shuffle(images)
+ iter_images = itertools.cycle(images)
+
+ for x_co in range(0, dimensions[0], image_size):
+ for y_co in range(0, dimensions[1], image_size):
+ for _ in range(5):
+ img = next(iter_images)
+ img_data = await get_image_data(mass, img.path, img.provider)
+ if img_data:
+ await asyncio.to_thread(_add_to_collage, img_data, x_co, y_co)
+ break
def _save_collage():
final_data = BytesIO()
- collage.convert("RGB").save(final_data, "PNG", optimize=True)
+ collage.convert("RGB").save(final_data, "JPEG", optimize=True)
return final_data.getvalue()
return await asyncio.to_thread(_save_collage)
if audio_stream is None:
msg = "No audio stream found"
raise InvalidDataError(msg)
- has_cover_image = any(x for x in raw["streams"] if x["codec_name"] in ("mjpeg", "png"))
+ has_cover_image = any(
+ x for x in raw["streams"] if x.get("codec_name", "") in ("mjpeg", "png")
+ )
# convert all tag-keys (gathered from all streams) to lowercase without spaces
tags = {}
for stream in raw["streams"] + [raw["format"]]:
"ffmpeg",
"-hide_banner",
"-loglevel",
- "fatal",
+ "error",
"-i",
file_path,
"-map",
)
writer_task: asyncio.Task | None = None
- ffmpeg_proc = AsyncProcess(args, stdin=file_path == "-", stdout=True)
+ ffmpeg_proc = AsyncProcess(
+ args, stdin=file_path == "-", stdout=True, stderr=None, name="ffmpeg_image"
+ )
await ffmpeg_proc.start()
async def writer() -> None:
from __future__ import annotations
+from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING
from music_assistant.common.models.enums import ProviderFeature
"""Retrieve metadata for a track on this Metadata provider."""
if ProviderFeature.TRACK_METADATA in self.supported_features:
raise NotImplementedError
+
+ async def resolve_image(self, path: str) -> str | bytes | AsyncGenerator[bytes, None]:
+ """
+ Resolve an image from an image path.
+
+ This either returns (a generator to get) raw bytes of the image or
+ a string with an http(s) URL or local path that is accessible from the server.
+ """
+ return path
@property
def lookup_key(self) -> str:
- """Return domain if streaming_provider or instance_id otherwise."""
- return self.domain if self.is_streaming_provider else self.instance_id
+ """Return domain if (multi-instance) streaming_provider or instance_id otherwise."""
+ if self.is_streaming_provider or not self.manifest.multi_instance:
+ return self.domain
+ return self.instance_id
async def search(
self,
raise NotImplementedError
yield # type: ignore
- async def library_add(self, prov_item_id: str, media_type: MediaType) -> bool:
+ async def library_add(self, item: MediaItemType) -> bool:
"""Add item to provider's library. Return true on success."""
if (
- media_type == MediaType.ARTIST
+ item.media_type == MediaType.ARTIST
and ProviderFeature.LIBRARY_ARTISTS_EDIT in self.supported_features
):
raise NotImplementedError
if (
- media_type == MediaType.ALBUM
+ item.media_type == MediaType.ALBUM
and ProviderFeature.LIBRARY_ALBUMS_EDIT in self.supported_features
):
raise NotImplementedError
if (
- media_type == MediaType.TRACK
+ item.media_type == MediaType.TRACK
and ProviderFeature.LIBRARY_TRACKS_EDIT in self.supported_features
):
raise NotImplementedError
if (
- media_type == MediaType.PLAYLIST
+ item.media_type == MediaType.PLAYLIST
and ProviderFeature.LIBRARY_PLAYLISTS_EDIT in self.supported_features
):
raise NotImplementedError
if (
- media_type == MediaType.RADIO
+ item.media_type == MediaType.RADIO
and ProviderFeature.LIBRARY_RADIOS_EDIT in self.supported_features
):
raise NotImplementedError
This either returns (a generator to get) raw bytes of the image or
a string with an http(s) URL or local path that is accessible from the server.
"""
- raise NotImplementedError
+ return path
async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType:
"""Get single MediaItem from provider."""
library_item = await controller.add_item_to_library(
prov_item, metadata_lookup=False, **extra_kwargs
)
- elif (
- library_item.metadata.checksum and prov_item.metadata.checksum
- ) and library_item.metadata.checksum != prov_item.metadata.checksum:
+ elif library_item.metadata.cache_checksum != prov_item.metadata.cache_checksum:
# existing dbitem checksum changed
library_item = await controller.update_item_in_library(
library_item.item_id, prov_item
"instance_id": self.instance_id,
"supported_features": [x.value for x in self.supported_features],
"available": self.available,
+ "is_streaming_provider": getattr(self, "is_streaming_provider", None),
}
--- /dev/null
+"""Built-in/generic provider to handle media from files and (remote) urls."""
+
+from __future__ import annotations
+
+import time
+from collections.abc import AsyncGenerator
+from typing import TYPE_CHECKING, NotRequired, TypedDict
+
+import shortuuid
+
+from music_assistant.common.models.config_entries import ConfigEntry
+from music_assistant.common.models.enums import (
+ ConfigEntryType,
+ ContentType,
+ ImageType,
+ MediaType,
+ ProviderFeature,
+ StreamType,
+)
+from music_assistant.common.models.errors import InvalidDataError, MediaNotFoundError
+from music_assistant.common.models.media_items import (
+ AlbumTrack,
+ Artist,
+ AudioFormat,
+ MediaItemImage,
+ MediaItemMetadata,
+ MediaItemType,
+ Playlist,
+ PlaylistTrack,
+ ProviderMapping,
+ Radio,
+ Track,
+)
+from music_assistant.common.models.streamdetails import StreamDetails
+from music_assistant.constants import MASS_LOGO, VARIOUS_ARTISTS_FANART
+from music_assistant.server.helpers.tags import AudioTags, parse_tags
+from music_assistant.server.models.music_provider import MusicProvider
+
+if TYPE_CHECKING:
+ from music_assistant.common.models.config_entries import ConfigValueType, ProviderConfig
+ from music_assistant.common.models.provider import ProviderManifest
+ from music_assistant.server import MusicAssistant
+ from music_assistant.server.models import ProviderInstanceType
+
+
+class StoredItem(TypedDict):
+ """Definition of an media item (for the builtin provider) stored in persistent storage."""
+
+ item_id: str # url or (locally accessible) file path (or id in case of playlist)
+ name: str
+ image_url: NotRequired[str]
+ last_updated: NotRequired[int]
+
+
+CONF_KEY_RADIOS = "stored_radios"
+CONF_KEY_TRACKS = "stored_tracks"
+CONF_KEY_PLAYLISTS = "stored_playlists"
+CONF_KEY_PLAYLIST_ITEMS = "stored_playlists_items"
+
+
+ALL_LIBRARY_TRACKS = "all_library_tracks"
+ALL_FAVORITE_TRACKS = "all_favorite_tracks"
+RANDOM_ARTIST = "random_artist"
+RANDOM_ALBUM = "random_album"
+RANDOM_TRACKS = "random_tracks"
+
+BUILTIN_PLAYLISTS = {
+ ALL_LIBRARY_TRACKS: "All library tracks",
+ ALL_FAVORITE_TRACKS: "All favorited tracks",
+ RANDOM_ARTIST: "Random Artist (from library)",
+ RANDOM_ALBUM: "Random Album (from library)",
+ RANDOM_TRACKS: "100 Random tracks (from library)",
+}
+
+COLLAGE_IMAGE_PLAYLISTS = (ALL_FAVORITE_TRACKS, ALL_LIBRARY_TRACKS, RANDOM_TRACKS)
+
+DEFAULT_THUMB = MediaItemImage(
+ type=ImageType.THUMB,
+ path=MASS_LOGO,
+ provider="builtin",
+ remotely_accessible=False,
+)
+
+DEFAULT_FANART = MediaItemImage(
+ type=ImageType.FANART,
+ path=VARIOUS_ARTISTS_FANART,
+ provider="builtin",
+ remotely_accessible=False,
+)
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ return BuiltinProvider(mass, manifest, config)
+
+
+async def get_config_entries(
+ mass: MusicAssistant, # noqa: ARG001
+ instance_id: str | None = None, # noqa: ARG001
+ action: str | None = None, # noqa: ARG001
+ values: dict[str, ConfigValueType] | None = None, # noqa: ARG001
+) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to setup this provider.
+
+ instance_id: id of an existing provider instance (None if new instance setup).
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ return tuple(
+ ConfigEntry(
+ key=key,
+ type=ConfigEntryType.BOOLEAN,
+ label=name,
+ default_value=True,
+ category="builtin_playlists",
+ )
+ for key, name in BUILTIN_PLAYLISTS.items()
+ )
+
+
+class BuiltinProvider(MusicProvider):
+ """Built-in/generic provider to handle (manually added) media from files and (remote) urls."""
+
+ def __init__(
+ self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+ ) -> None:
+ """Initialize MusicProvider."""
+ super().__init__(mass, manifest, config)
+
+ @property
+ def is_streaming_provider(self) -> bool:
+ """Return True if the provider is a streaming provider."""
+ return False
+
+ @property
+ def supported_features(self) -> tuple[ProviderFeature, ...]:
+ """Return the features supported by this Provider."""
+ return (
+ ProviderFeature.BROWSE,
+ ProviderFeature.LIBRARY_TRACKS,
+ ProviderFeature.LIBRARY_RADIOS,
+ ProviderFeature.LIBRARY_PLAYLISTS,
+ ProviderFeature.LIBRARY_TRACKS_EDIT,
+ ProviderFeature.LIBRARY_RADIOS_EDIT,
+ ProviderFeature.PLAYLIST_CREATE,
+ ProviderFeature.PLAYLIST_TRACKS_EDIT,
+ )
+
+ async def get_track(self, prov_track_id: str) -> Track:
+ """Get full track details by id."""
+ parsed_item = await self.parse_item(prov_track_id)
+ stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_TRACKS, [])
+ if stored_item := next((x for x in stored_items if x["item_id"] == prov_track_id), None):
+ # always prefer the stored info, such as the name
+ parsed_item.name = stored_item["name"]
+ if image_url := stored_item.get("image_url"):
+ parsed_item.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=self.instance_id,
+ remotely_accessible=image_url.startswith("http"),
+ )
+ ]
+ return parsed_item
+
+ async def get_radio(self, prov_radio_id: str) -> Radio:
+ """Get full radio details by id."""
+ parsed_item = await self.parse_item(prov_radio_id, force_radio=True)
+ stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_RADIOS, [])
+ if stored_item := next((x for x in stored_items if x["item_id"] == prov_radio_id), None):
+ # always prefer the stored info, such as the name
+ parsed_item.name = stored_item["name"]
+ if image_url := stored_item.get("image_url"):
+ parsed_item.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=self.instance_id,
+ remotely_accessible=image_url.startswith("http"),
+ )
+ ]
+ return parsed_item
+
+ async def get_artist(self, prov_artist_id: str) -> Track:
+ """Get full artist details by id."""
+ artist = prov_artist_id
+ # this is here for compatibility reasons only
+ return Artist(
+ item_id=artist,
+ provider=self.domain,
+ name=artist,
+ provider_mappings={
+ ProviderMapping(
+ item_id=artist,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ available=False,
+ )
+ },
+ )
+
+ async def get_playlist(self, prov_playlist_id: str) -> Playlist:
+ """Get full playlist details by id."""
+ if prov_playlist_id in BUILTIN_PLAYLISTS:
+ # this is one of our builtin/default playlists
+ return Playlist(
+ item_id=prov_playlist_id,
+ provider=self.instance_id,
+ name=BUILTIN_PLAYLISTS[prov_playlist_id],
+ provider_mappings={
+ ProviderMapping(
+ item_id=prov_playlist_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ owner="Music Assistant",
+ is_editable=False,
+ metadata=MediaItemMetadata(
+ images=[DEFAULT_THUMB]
+ if prov_playlist_id in COLLAGE_IMAGE_PLAYLISTS
+ else [DEFAULT_THUMB, DEFAULT_FANART],
+ cache_checksum="no_cache",
+ ),
+ )
+ # user created universal playlist
+ stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
+ stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
+ if not stored_item:
+ raise MediaNotFoundError
+ playlist = Playlist(
+ item_id=prov_playlist_id,
+ provider=self.instance_id,
+ name=stored_item["name"],
+ provider_mappings={
+ ProviderMapping(
+ item_id=prov_playlist_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ owner="Music Assistant",
+ is_editable=True,
+ )
+ playlist.metadata.cache_checksum = str(stored_item.get("last_updated", 0))
+ if image_url := stored_item.get("image_url"):
+ playlist.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=self.instance_id,
+ remotely_accessible=image_url.startswith("http"),
+ )
+ ]
+ return playlist
+
+ async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType:
+ """Get single MediaItem from provider."""
+ if media_type == MediaType.ARTIST:
+ return await self.get_artist(prov_item_id)
+ if media_type == MediaType.TRACK:
+ return await self.get_track(prov_item_id)
+ if media_type == MediaType.RADIO:
+ return await self.get_radio(prov_item_id)
+ if media_type == MediaType.PLAYLIST:
+ return await self.get_playlist(prov_item_id)
+ if media_type == MediaType.UNKNOWN:
+ return await self.parse_item(prov_item_id)
+ raise NotImplementedError
+
+ async def get_library_tracks(self) -> AsyncGenerator[Track | AlbumTrack, None]:
+ """Retrieve library tracks from the provider."""
+ stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_TRACKS, [])
+ for item in stored_items:
+ yield await self.get_track(item["item_id"])
+
+ async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
+ """Retrieve library/subscribed playlists from the provider."""
+ # return user stored playlists
+ stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
+ for item in stored_items:
+ yield await self.get_playlist(item["item_id"])
+ # return builtin playlists
+ for item_id in BUILTIN_PLAYLISTS:
+ if self.config.get_value(item_id) is False:
+ continue
+ yield await self.get_playlist(item_id)
+
+ async def get_library_radios(self) -> AsyncGenerator[Radio, None]:
+ """Retrieve library/subscribed radio stations from the provider."""
+ stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_RADIOS, [])
+ for item in stored_items:
+ yield await self.get_radio(item["item_id"])
+
+ async def library_add(self, item: MediaItemType) -> bool:
+ """Add item to provider's library. Return true on success."""
+ if item.media_type == MediaType.TRACK:
+ key = CONF_KEY_TRACKS
+ elif item.media_type == MediaType.RADIO:
+ key = CONF_KEY_RADIOS
+ else:
+ return False
+ stored_item = StoredItem(item_id=item.item_id, name=item.name)
+ if item.image:
+ stored_item["image_url"] = item.image
+ stored_items: list[StoredItem] = self.mass.config.get(key, [])
+ # filter out existing
+ stored_items = [x for x in stored_items if x["item_id"] != item.item_id]
+ stored_items.append(stored_item)
+ self.mass.config.set(key, stored_items)
+ return True
+
+ async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
+ """Remove item from provider's library. Return true on success."""
+ if media_type == MediaType.PLAYLIST and prov_item_id in BUILTIN_PLAYLISTS:
+ # user wants to disable/remove one of our builtin playlists
+ # to prevent it comes back, we mark it as disabled in config
+ await self.mass.config.set_provider_config_value(self.instance_id, prov_item_id, False)
+ return True
+ if media_type == MediaType.TRACK:
+ # regular manual track URL/path
+ key = CONF_KEY_TRACKS
+ elif media_type == MediaType.RADIO:
+ # regular manual radio URL/path
+ key = CONF_KEY_RADIOS
+ elif media_type == MediaType.PLAYLIST:
+ # manually added (multi provider) playlist removal
+ key = CONF_KEY_PLAYLISTS
+ else:
+ return False
+ stored_items: list[StoredItem] = self.mass.config.get(key, [])
+ stored_items = [x for x in stored_items if x["item_id"] != prov_item_id]
+ self.mass.config.set(key, stored_items)
+ return True
+
+ async def get_playlist_tracks(
+ self, prov_playlist_id: str
+ ) -> AsyncGenerator[PlaylistTrack, None]:
+ # handle built-in playlists
+ """Get all playlist tracks for given playlist id."""
+ if prov_playlist_id in BUILTIN_PLAYLISTS:
+ async for item in self._get_builtin_playlist_tracks(prov_playlist_id):
+ yield item
+ return
+ # user created universal playlist
+ conf_key = f"{CONF_KEY_PLAYLIST_ITEMS}/{prov_playlist_id}"
+ playlist_items: list[str] = self.mass.config.get(conf_key, [])
+ for count, playlist_item_uri in enumerate(playlist_items, 1):
+ try:
+ base_item = await self.mass.music.get_item_by_uri(playlist_item_uri)
+ yield PlaylistTrack.from_dict({**base_item.to_dict(), "position": count})
+ except (MediaNotFoundError, InvalidDataError) as err:
+ self.logger.warning("Skipping item in playlist: %s:%s", playlist_item_uri, str(err))
+
+ async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
+ """Add track(s) to playlist."""
+ conf_key = f"{CONF_KEY_PLAYLIST_ITEMS}/{prov_playlist_id}"
+ playlist_items: list[str] = self.mass.config.get(conf_key, [])
+ for uri in prov_track_ids:
+ if uri not in playlist_items:
+ playlist_items.append(uri)
+ self.mass.config.set(conf_key, playlist_items)
+ # mark last_updated on playlist object
+ stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
+ stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
+ stored_item["last_updated"] = int(time.time())
+ self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
+
+ async def remove_playlist_tracks(
+ self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
+ ) -> None:
+ """Remove track(s) from playlist."""
+ conf_key = f"{CONF_KEY_PLAYLIST_ITEMS}/{prov_playlist_id}"
+ playlist_items: list[str] = self.mass.config.get(conf_key, [])
+ # remove items by index
+ for i in sorted(positions_to_remove, reverse=True):
+ del playlist_items[i]
+ self.mass.config.set(conf_key, playlist_items)
+ # mark last_updated on playlist object
+ stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
+ stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
+ stored_item["last_updated"] = int(time.time())
+ self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
+
+ async def create_playlist(self, name: str) -> Playlist: # type: ignore[return]
+ """Create a new playlist on provider with given name."""
+ item_id = shortuuid.random(8)
+ stored_item = StoredItem(item_id=item_id, name=name)
+ stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
+ stored_items.append(stored_item)
+ self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
+ return await self.get_playlist(item_id)
+
+ async def parse_item(
+ self,
+ url: str,
+ force_refresh: bool = False,
+ force_radio: bool = False,
+ ) -> Track | Radio:
+ """Parse plain URL to MediaItem of type Radio or Track."""
+ media_info = await self._get_media_info(url, force_refresh)
+ is_radio = media_info.get("icyname") or not media_info.duration
+ provider_mappings = {
+ ProviderMapping(
+ item_id=url,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(media_info.format),
+ sample_rate=media_info.sample_rate,
+ bit_depth=media_info.bits_per_sample,
+ bit_rate=media_info.bit_rate,
+ ),
+ )
+ }
+ if is_radio or force_radio:
+ # treat as radio
+ media_item = Radio(
+ item_id=url,
+ provider=self.domain,
+ name=media_info.get("icyname")
+ or media_info.get("programtitle")
+ or media_info.title
+ or url,
+ provider_mappings=provider_mappings,
+ )
+ else:
+ media_item = Track(
+ item_id=url,
+ provider=self.domain,
+ name=media_info.title or url,
+ duration=int(media_info.duration or 0),
+ artists=[await self.get_artist(artist) for artist in media_info.artists],
+ provider_mappings=provider_mappings,
+ )
+
+ if media_info.has_cover_image:
+ media_item.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=url,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
+ ]
+ return media_item
+
+ async def _get_media_info(self, url: str, force_refresh: bool = False) -> AudioTags:
+ """Retrieve mediainfo for url."""
+ # do we have some cached info for this url ?
+ cache_key = f"{self.instance_id}.media_info.{url}"
+ cached_info = await self.mass.cache.get(cache_key)
+ if cached_info and not force_refresh:
+ return AudioTags.parse(cached_info)
+ # parse info with ffprobe (and store in cache)
+ media_info = await parse_tags(url)
+ if "authSig" in url:
+ media_info.has_cover_image = False
+ await self.mass.cache.set(cache_key, media_info.raw)
+ return media_info
+
+ async def get_stream_details(self, item_id: str) -> StreamDetails:
+ """Get streamdetails for a track/radio."""
+ media_info = await self._get_media_info(item_id)
+ is_radio = media_info.get("icy-name") or not media_info.duration
+ return StreamDetails(
+ provider=self.instance_id,
+ item_id=item_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(media_info.format),
+ sample_rate=media_info.sample_rate,
+ bit_depth=media_info.bits_per_sample,
+ channels=media_info.channels,
+ ),
+ media_type=MediaType.RADIO if is_radio else MediaType.TRACK,
+ stream_type=StreamType.HTTP,
+ path=item_id,
+ can_seek=not is_radio,
+ )
+
+ async def _get_builtin_playlist_tracks(
+ self, builtin_playlist_id: str
+ ) -> AsyncGenerator[PlaylistTrack, None]:
+ """Get all playlist tracks for given builtin playlist id."""
+ count = 0
+ if builtin_playlist_id == ALL_LIBRARY_TRACKS:
+ async for item in self.mass.music.tracks.iter_library_items(order_by="RANDOM()"):
+ count += 1
+ yield PlaylistTrack.from_dict({**item.to_dict(), "position": count})
+ return
+ if builtin_playlist_id == ALL_FAVORITE_TRACKS:
+ async for item in self.mass.music.tracks.iter_library_items(
+ favorite=True, order_by="RANDOM()"
+ ):
+ count += 1
+ yield PlaylistTrack.from_dict({**item.to_dict(), "position": count})
+ return
+ if builtin_playlist_id == RANDOM_TRACKS:
+ async for item in self.mass.music.tracks.iter_library_items(order_by="RANDOM()"):
+ count += 1
+ yield PlaylistTrack.from_dict({**item.to_dict(), "position": count})
+ if count == 100:
+ return
+ return
+ if builtin_playlist_id == RANDOM_ALBUM:
+ async for random_album in self.mass.music.albums.iter_library_items(
+ order_by="RANDOM()"
+ ):
+ for album_track in await self.mass.music.albums.tracks(
+ random_album.item_id, random_album.provider
+ ):
+ count += 1
+ yield PlaylistTrack.from_dict({**album_track.to_dict(), "position": count})
+ if count > 0:
+ return
+ return
+ if builtin_playlist_id == RANDOM_ARTIST:
+ async for random_artist in self.mass.music.artists.iter_library_items(
+ order_by="RANDOM()"
+ ):
+ for artist_track in await self.mass.music.artists.tracks(
+ random_artist.item_id, random_artist.provider
+ ):
+ count += 1
+ yield PlaylistTrack.from_dict({**artist_track.to_dict(), "position": count})
+ if count > 0:
+ return
+ return
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 25 25" version="1.1">
+<g id="surface1">
+<path style=" stroke:none;fill-rule:nonzero;fill:rgb(0%,0%,0%);fill-opacity:1;" d="M 1.5 0 L 23.5 0 C 24.328125 0 25 0.671875 25 1.5 L 25 23.5 C 25 24.328125 24.328125 25 23.5 25 L 1.5 25 C 0.671875 25 0 24.328125 0 23.5 L 0 1.5 C 0 0.671875 0.671875 0 1.5 0 Z M 1.5 0 "/>
+<path style=" stroke:none;fill-rule:nonzero;fill:rgb(100%,100%,100%);fill-opacity:1;" d="M 10.386719 18.875 L 14.8125 7.125 L 16.113281 7.125 L 11.6875 18.875 Z M 10.386719 18.875 "/>
+<path style=" stroke:none;fill-rule:nonzero;fill:rgb(100%,100%,100%);fill-opacity:1;" d="M 21.371094 18.875 L 16.945312 7.125 L 18.246094 7.125 L 22.671875 18.875 Z M 21.371094 18.875 "/>
+<path style=" stroke:none;fill-rule:nonzero;fill:rgb(100%,100%,100%);fill-opacity:1;" d="M 2.636719 18.875 L 2.636719 7.125 L 3.875 7.125 L 3.875 18.875 Z M 2.636719 18.875 "/>
+<path style=" stroke:none;fill-rule:nonzero;fill:rgb(100%,100%,100%);fill-opacity:1;" d="M 5.445312 18.875 L 5.445312 7.125 L 6.683594 7.125 L 6.683594 18.875 Z M 5.445312 18.875 "/>
+<path style=" stroke:none;fill-rule:nonzero;fill:rgb(100%,100%,100%);fill-opacity:1;" d="M 8.253906 18.875 L 8.253906 7.125 L 9.492188 7.125 L 9.492188 18.875 Z M 8.253906 18.875 "/>
+</g>
+</svg>
--- /dev/null
+{
+ "type": "music",
+ "domain": "builtin",
+ "name": "Music Assistant",
+ "description": "Built-in/generic provider to handle media from files and (remote) urls.",
+ "codeowners": [
+ "@music-assistant"
+ ],
+ "requirements": [],
+ "documentation": "https://music-assistant.io/music-providers/builtin/",
+ "multi_instance": false,
+ "builtin": true,
+ "hidden": true,
+ "load_by_default": true
+}
ItemMapping,
MediaItemImage,
MediaItemMetadata,
+ MediaItemType,
Playlist,
PlaylistTrack,
ProviderMapping,
async for track in await artist.get_top(limit=50)
]
- async def library_add(self, prov_item_id: str, media_type: MediaType) -> bool:
+ async def library_add(self, item: MediaItemType) -> bool:
"""Add an item to the provider's library/favorites."""
result = False
- if media_type == MediaType.ARTIST:
+ if item.media_type == MediaType.ARTIST:
result = await self.client.add_user_artist(
- artist_id=int(prov_item_id),
+ artist_id=int(item.item_id),
)
- elif media_type == MediaType.ALBUM:
+ elif item.media_type == MediaType.ALBUM:
result = await self.client.add_user_album(
- album_id=int(prov_item_id),
+ album_id=int(item.item_id),
)
- elif media_type == MediaType.TRACK:
+ elif item.media_type == MediaType.TRACK:
result = await self.client.add_user_track(
- track_id=int(prov_item_id),
+ track_id=int(item.item_id),
)
- elif media_type == MediaType.PLAYLIST:
+ elif item.media_type == MediaType.PLAYLIST:
result = await self.client.add_user_playlist(
- playlist_id=int(prov_item_id),
+ playlist_id=int(item.item_id),
)
else:
raise NotImplementedError
MediaItemImage(
type=ImageType.THUMB,
path=track.album.cover_big,
+ provider=self.instance_id,
+ remotely_accessible=True,
)
]
return metadata
"""Parse the album metadata."""
return MediaItemMetadata(
explicit=album.explicit_lyrics,
- images=[MediaItemImage(type=ImageType.THUMB, path=album.cover_big)],
+ images=[
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=album.cover_big,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ],
)
def parse_metadata_artist(self, artist: deezer.Artist) -> MediaItemMetadata:
"""Parse the artist metadata."""
return MediaItemMetadata(
- images=[MediaItemImage(type=ImageType.THUMB, path=artist.picture_big)],
+ images=[
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=artist.picture_big,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ],
)
### PARSING FUNCTIONS ###
)
},
metadata=MediaItemMetadata(
- images=[MediaItemImage(type=ImageType.THUMB, path=playlist.picture_big)],
+ images=[
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=playlist.picture_big,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ],
checksum=playlist.checksum,
),
is_editable=creator.id == self.user.id,
if not items:
continue
for item in items:
- metadata.images.append(MediaItemImage(type=img_type, path=item["url"]))
+ metadata.images.append(
+ MediaItemImage(
+ type=img_type,
+ path=item["url"],
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ )
return metadata
return None
if not items:
continue
for item in items:
- metadata.images.append(MediaItemImage(type=img_type, path=item["url"]))
+ metadata.images.append(
+ MediaItemImage(
+ type=img_type,
+ path=item["url"],
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ )
return metadata
return None
from typing import TYPE_CHECKING
import aiofiles
+import cchardet
+import shortuuid
from aiofiles.os import wrap
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
raise SetupFailedError(msg)
prov = LocalFileSystemProvider(mass, manifest, config)
prov.base_path = config.get_value(CONF_PATH)
+ await prov.check_write_access()
+ mass.call_later(30, prov.migrate_playlists)
return prov
base_path: str
+ async def check_write_access(self) -> None:
+ """Perform check if we have write access."""
+ # verify write access to determine we have playlist create/edit support
+ # overwrite with provider specific implementation if needed
+ temp_file_name = get_absolute_path(self.base_path, f"{shortuuid.random(8)}.txt")
+ try:
+ await self.write_file_content(temp_file_name, b"")
+ await asyncio.to_thread(os.remove, temp_file_name)
+ self.write_access = True
+ except Exception as err:
+ self.logger.debug("Write access disabled: %s", str(err))
+
async def listdir(
self, path: str, recursive: bool = False
) -> AsyncGenerator[FileSystemItem, None]:
abs_path = get_absolute_path(self.base_path, file_path)
async with aiofiles.open(abs_path, "wb") as _file:
await _file.write(data)
+
+ async def migrate_playlists(self) -> None:
+ """Migrate Music Assistant filesystem playlists."""
+ # Remove this code when 2.0 stable has been released!
+ # prior to version 2.0.0b137 Music Assistant stored universal playlists
+ # in the filesystem (root of the music dir, m3u files with uri's)
+ # that is converted into a universal builtin provider approach in b137
+ # so the filesystem is not longer polluted/abused for this.
+ # this code hunts these playlists, migrates them to the universal provider
+ # and cleans up the files.
+ cache_key = f"{self.instance_id}.playlist_migrattion_done"
+ if await self.mass.cache.get(cache_key):
+ return
+ self.logger.info("Starting playlist migration...")
+ async for item in self.listdir("", False):
+ if not item.is_file:
+ continue
+ if item.ext != "m3u":
+ continue
+ playlist_data = b""
+ async for chunk in self.read_file_content(item.absolute_path):
+ playlist_data += chunk
+ encoding_details = await asyncio.to_thread(cchardet.detect, playlist_data)
+ playlist_data = playlist_data.decode(encoding_details["encoding"] or "utf-8")
+ # a (legacy) playlist file created by MA does not have EXTINFO tags and has uri's
+ if "EXTINF" in playlist_data or "://" not in playlist_data:
+ continue
+ all_uris: list[str] = []
+ skipped_lines = 0
+ for playlist_line in playlist_data.split("\n"):
+ if "://" not in playlist_line:
+ skipped_lines += 1
+ self.logger.debug("Ignoring line in migration playlist: %s", playlist_line)
+ all_uris.append(playlist_line)
+ if skipped_lines > len(all_uris):
+ self.logger.warning("NOT migrating playlist: %s", item.path)
+ continue
+ # create playlist on the builtin provider
+ name = item.name.replace(".m3u", "")
+ new_playlist = await self.mass.music.playlists.create_playlist(name, "builtin")
+ # append existing uri's to the new playlist
+ await self.mass.music.playlists.add_playlist_tracks(new_playlist.item_id, all_uris)
+ # remove existing item from the library
+ if library_item := await self.mass.music.playlists.get_library_item_by_prov_id(
+ item.path, self.instance_id
+ ):
+ await self.mass.music.playlists.remove_item_from_library(library_item.item_id)
+ # remove old file
+ await asyncio.to_thread(os.remove, item.absolute_path)
+ # refresh the playlist so it builds the metadata
+ await self.mass.music.playlists.add_item_to_library(new_playlist, metadata_lookup=True)
+ self.logger.info("Migrated playlist %s", item.name)
+ await self.mass.cache.set(cache_key, True, expiration=365 * 86400)
ProviderFeature.LIBRARY_ALBUMS,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.LIBRARY_PLAYLISTS,
- ProviderFeature.PLAYLIST_TRACKS_EDIT,
- ProviderFeature.PLAYLIST_CREATE,
ProviderFeature.BROWSE,
ProviderFeature.SEARCH,
)
Supports having URI's from streaming providers within m3u playlist.
"""
+ write_access: bool = False
+
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
"""Return the features supported by this Provider."""
+ if self.write_access:
+ return (
+ *SUPPORTED_FEATURES,
+ ProviderFeature.PLAYLIST_CREATE,
+ ProviderFeature.PLAYLIST_TRACKS_EDIT,
+ )
return SUPPORTED_FEATURES
- @abstractmethod
- async def async_setup(self) -> None:
- """Handle async initialization of the provider."""
-
@abstractmethod
async def listdir(
self, path: str, recursive: bool = False
for x in db_item.provider_mappings
if x.provider_instance == self.instance_id
)
- prev_checksums[file_name] = db_item.metadata.checksum
+ prev_checksums[file_name] = db_item.metadata.cache_checksum
await asyncio.sleep(0) # yield to eventloop
# process all deleted (or renamed) files first
elif item.ext in PLAYLIST_EXTENSIONS:
playlist = await self.get_playlist(item.path)
# add/update] playlist to db
- playlist.metadata.checksum = item.checksum
+ playlist.metadata.cache_checksum = item.checksum
# playlist is always in-library
playlist.favorite = True
await self.mass.music.playlists.add_item_to_library(
)
},
)
- playlist.is_editable = file_item.ext != "pls" # can only edit m3u playlists
+ playlist.is_editable = ProviderFeature.PLAYLIST_TRACKS_EDIT in self.supported_features
+ # only playlists in the root are editable - all other are read only
+ if "/" in prov_playlist_id or "\\" in prov_playlist_id:
+ playlist.is_editable = False
+ # we do not (yet) have support to edit/create pls playlists, only m3u files can be edited
+ if file_item.ext == "pls":
+ playlist.is_editable = False
playlist.owner = self.name
checksum = f"{DB_SCHEMA_VERSION}.{file_item.checksum}"
- playlist.metadata.checksum = checksum
+ playlist.metadata.cache_checksum = checksum
return playlist
async def get_album_tracks(self, prov_album_id: str) -> list[AlbumTrack]:
) -> PlaylistTrack | None:
"""Try to parse a track from a playlist line."""
try:
- if "://" in line:
- # handle as generic uri
- media_item = await self.mass.music.get_item_by_uri(line)
- if isinstance(media_item, Track):
- return PlaylistTrack.from_dict({**media_item.to_dict(), "position": position})
-
# if a relative path was given in an upper level from the playlist,
# try to resolve it
for parentpart in ("../", "..\\"):
playlist_data += chunk
encoding_details = await asyncio.to_thread(cchardet.detect, playlist_data)
playlist_data = playlist_data.decode(encoding_details["encoding"] or "utf-8")
- for uri in prov_track_ids:
- playlist_data += f"\n{uri}"
+ for file_path in prov_track_ids:
+ track = await self.get_track(file_path)
+ playlist_data += f"\n#EXTINF:{track.duration or 0},{track.name}\n{file_path}\n"
# write playlist file (always in utf-8)
await self.write_file_content(prov_playlist_id, playlist_data.encode("utf-8"))
msg = f"Playlist path does not exist: {prov_playlist_id}"
raise MediaNotFoundError(msg)
_, ext = prov_playlist_id.rsplit(".", 1)
-
# get playlist file contents
playlist_data = b""
async for chunk in self.read_file_content(prov_playlist_id):
# remove items by index
for i in sorted(positions_to_remove, reverse=True):
del playlist_items[i]
-
# build new playlist data
new_playlist_data = "#EXTM3U\n"
for item in playlist_items:
- new_playlist_data.append(f"#EXTINF:{item.length or 0},{item.title}\n")
- new_playlist_data.append(f"{item.path}\n")
+ playlist_data += f"\n#EXTINF:{item.length or 0},{item.title}\n{item.path}\n"
await self.write_file_content(prov_playlist_id, new_playlist_data.encode("utf-8"))
async def create_playlist(self, name: str) -> Playlist:
# much space and bandwidth. Instead we set the filename as value so the image can
# be retrieved later in realtime.
track.metadata.images = [
- MediaItemImage(type=ImageType.THUMB, path=file_item.path, provider=self.instance_id)
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=file_item.path,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
]
if track.album and not track.album.metadata.images:
track.album.album_type = tags.album_type
track.album.metadata.explicit = track.metadata.explicit
# set checksum to invalidate any cached listings
- track.metadata.checksum = file_item.checksum
+ track.metadata.cache_checksum = file_item.checksum
if track.album:
# use track checksum for album(artists) too
- track.album.metadata.checksum = track.metadata.checksum
+ track.album.metadata.cache_checksum = track.metadata.cache_checksum
for artist in track.album.artists:
- artist.metadata.checksum = track.metadata.checksum
+ artist.metadata.cache_checksum = track.metadata.cache_checksum
return track
provider_mappings={
ProviderMapping(
item_id=album_path,
- provider_domain=self.instance_id,
+ provider_domain=self.domain,
provider_instance=self.instance_id,
url=album_path,
)
type=ImageType(item.name),
path=item.path,
provider=self.instance_id,
+ remotely_accessible=False,
)
)
except ValueError:
type=ImageType.THUMB,
path=item.path,
provider=self.instance_id,
+ remotely_accessible=False,
)
)
break
raise LoginFailed(msg)
prov = SMBFileSystemProvider(mass, manifest, config)
await prov.handle_async_init()
+ await prov.check_write_access()
+ mass.call_later(30, prov.migrate_playlists)
return prov
MediaNotFoundError,\r
MusicAssistantError,\r
)\r
-from music_assistant.common.models.media_items import Album\r
-from music_assistant.common.models.media_items import Album as JellyfinAlbum\r
-from music_assistant.common.models.media_items import AlbumTrack\r
-from music_assistant.common.models.media_items import Artist\r
-from music_assistant.common.models.media_items import Artist as JellyfinArtist\r
from music_assistant.common.models.media_items import (\r
+ Album,\r
+ AlbumTrack,\r
+ Artist,\r
AudioFormat,\r
ItemMapping,\r
MediaItem,\r
MediaItemImage,\r
+ Playlist,\r
+ PlaylistTrack,\r
+ ProviderMapping,\r
+ SearchResults,\r
+ Track,\r
)\r
-from music_assistant.common.models.media_items import Playlist\r
+from music_assistant.common.models.media_items import Album as JellyfinAlbum\r
+from music_assistant.common.models.media_items import Artist as JellyfinArtist\r
from music_assistant.common.models.media_items import Playlist as JellyfinPlaylist\r
-from music_assistant.common.models.media_items import PlaylistTrack, ProviderMapping, SearchResults\r
-from music_assistant.common.models.media_items import Track\r
from music_assistant.common.models.media_items import Track as JellyfinTrack\r
from music_assistant.common.models.streamdetails import StreamDetails\r
\r
async def _run_async(self, call: Callable, *args, **kwargs):\r
return await self.mass.create_task(call, *args, **kwargs)\r
\r
- async def resolve_image(self, path: str) -> str | bytes | AsyncGenerator[bytes, None]:\r
- """Return the full image URL including the auth token."""\r
- return path\r
-\r
def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:\r
return ItemMapping(\r
media_type=media_type,\r
album.year = current_jellyfin_album[ITEM_KEY_PRODUCTION_YEAR]\r
if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_album):\r
album.metadata.images = [\r
- MediaItemImage(type=ImageType.THUMB, path=thumb, provider=self.instance_id)\r
+ MediaItemImage(\r
+ type=ImageType.THUMB,\r
+ path=thumb,\r
+ provider=self.instance_id,\r
+ remotely_accessible=True,\r
+ )\r
]\r
if ITEM_KEY_OVERVIEW in current_jellyfin_album:\r
album.metadata.description = current_jellyfin_album[ITEM_KEY_OVERVIEW]\r
artist.sort_name = current_artist[ITEM_KEY_SORT_NAME]\r
if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_artist):\r
artist.metadata.images = [\r
- MediaItemImage(type=ImageType.THUMB, path=thumb, provider=self.instance_id)\r
+ MediaItemImage(\r
+ type=ImageType.THUMB,\r
+ path=thumb,\r
+ provider=self.instance_id,\r
+ remotely_accessible=True,\r
+ )\r
]\r
return artist\r
\r
\r
if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_track):\r
track.metadata.images = [\r
- MediaItemImage(type=ImageType.THUMB, path=thumb, provider=self.instance_id)\r
+ MediaItemImage(\r
+ type=ImageType.THUMB,\r
+ path=thumb,\r
+ provider=self.instance_id,\r
+ remotely_accessible=True,\r
+ )\r
]\r
if len(current_jellyfin_track[ITEM_KEY_ARTIST_ITEMS]) >= 1:\r
track.artists.append(\r
playlist.metadata.description = jellyfin_playlist[ITEM_KEY_OVERVIEW]\r
if thumb := self._get_thumbnail_url(self._jellyfin_server, jellyfin_playlist):\r
playlist.metadata.images = [\r
- MediaItemImage(type=ImageType.THUMB, path=thumb, provider=self.instance_id)\r
+ MediaItemImage(\r
+ type=ImageType.THUMB,\r
+ path=thumb,\r
+ provider=self.instance_id,\r
+ remotely_accessible=True,\r
+ )\r
]\r
playlist.is_editable = False\r
return playlist\r
artist.metadata.description = sonic_channel.description
if sonic_channel.original_image_url:
artist.metadata.images = [
- MediaItemImage(type=ImageType.THUMB, path=sonic_channel.original_image_url)
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=sonic_channel.original_image_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
]
return artist
if sonic_artist.cover_id:
artist.metadata.images = [
MediaItemImage(
- type=ImageType.THUMB, path=sonic_artist.cover_id, provider=self.instance_id
+ type=ImageType.THUMB,
+ path=sonic_artist.cover_id,
+ provider=self.instance_id,
+ remotely_accessible=False,
)
]
else:
artist.metadata.description = sonic_info.biography
if sonic_info.small_url:
artist.metadata.images.append(
- MediaItemImage(type=ImageType.THUMB, path=sonic_info.small_url)
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=sonic_info.small_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
)
return artist
if sonic_album.cover_id:
album.metadata.images = [
MediaItemImage(
- type=ImageType.THUMB, path=sonic_album.cover_id, provider=self.instance_id
+ type=ImageType.THUMB,
+ path=sonic_album.cover_id,
+ provider=self.instance_id,
+ remotely_accessible=False,
),
]
else:
if sonic_info:
if sonic_info.small_url:
album.metadata.images.append(
- MediaItemImage(type=ImageType.THUMB, path=sonic_info.small_url)
+ MediaItemImage(
+ type=ImageType.THUMB, path=sonic_info.small_url, remotely_accessible=False
+ )
)
if sonic_info.notes:
album.metadata.description = sonic_info.notes
if sonic_playlist.cover_id:
playlist.metadata.images = [
MediaItemImage(
- type=ImageType.THUMB, path=sonic_playlist.cover_id, provider=self.instance_id
+ type=ImageType.THUMB,
+ path=sonic_playlist.cover_id,
+ provider=self.instance_id,
+ remotely_accessible=False,
)
]
return playlist
album.year = plex_album.year
if thumb := plex_album.firstAttr("thumb", "parentThumb", "grandparentThumb"):
album.metadata.images = [
- MediaItemImage(type=ImageType.THUMB, path=thumb, provider=self.instance_id)
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
]
if plex_album.summary:
album.metadata.description = plex_album.summary
artist.metadata.description = plex_artist.summary
if thumb := plex_artist.firstAttr("thumb", "parentThumb", "grandparentThumb"):
artist.metadata.images = [
- MediaItemImage(type=ImageType.THUMB, path=thumb, provider=self.instance_id)
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
]
return artist
playlist.metadata.description = plex_playlist.summary
if thumb := plex_playlist.firstAttr("thumb", "parentThumb", "grandparentThumb"):
playlist.metadata.images = [
- MediaItemImage(type=ImageType.THUMB, path=thumb, provider=self.instance_id)
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
]
playlist.is_editable = True
return playlist
if thumb := plex_track.firstAttr("thumb", "parentThumb", "grandparentThumb"):
track.metadata.images = [
- MediaItemImage(type=ImageType.THUMB, path=thumb, provider=self.instance_id)
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb,
+ provider=self.instance_id,
+ remotely_accessible=False,
+ )
]
if plex_track.parentKey:
track.album = self._get_item_mapping(
ContentType,
ImageType,
MediaItemImage,
+ MediaItemType,
MediaType,
Playlist,
PlaylistTrack,
"""Get similar artists for given artist."""
# https://www.qobuz.com/api.json/0.2/artist/getSimilarArtists?artist_id=220020&offset=0&limit=3
- async def library_add(self, prov_item_id, media_type: MediaType):
+ async def library_add(self, item: MediaItemType):
"""Add item to library."""
result = None
- if media_type == MediaType.ARTIST:
- result = await self._get_data("favorite/create", artist_id=prov_item_id)
- elif media_type == MediaType.ALBUM:
- result = await self._get_data("favorite/create", album_ids=prov_item_id)
- elif media_type == MediaType.TRACK:
- result = await self._get_data("favorite/create", track_ids=prov_item_id)
- elif media_type == MediaType.PLAYLIST:
- result = await self._get_data("playlist/subscribe", playlist_id=prov_item_id)
+ if item.media_type == MediaType.ARTIST:
+ result = await self._get_data("favorite/create", artist_id=item.item_id)
+ elif item.media_type == MediaType.ALBUM:
+ result = await self._get_data("favorite/create", album_ids=item.item_id)
+ elif item.media_type == MediaType.TRACK:
+ result = await self._get_data("favorite/create", track_ids=item.item_id)
+ elif item.media_type == MediaType.PLAYLIST:
+ result = await self._get_data("playlist/subscribe", playlist_id=item.item_id)
return result
async def library_remove(self, prov_item_id, media_type: MediaType):
artist.mbid = VARIOUS_ARTISTS_ID_MBID
artist.name = VARIOUS_ARTISTS_NAME
if img := self.__get_image(artist_obj):
- artist.metadata.images = [MediaItemImage(type=ImageType.THUMB, path=img)]
+ artist.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=img,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
if artist_obj.get("biography"):
artist.metadata.description = artist_obj["biography"].get("content")
return artist
if "genre" in album_obj:
album.metadata.genres = {album_obj["genre"]["name"]}
if img := self.__get_image(album_obj):
- album.metadata.images = [MediaItemImage(type=ImageType.THUMB, path=img)]
+ album.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=img,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
if "label" in album_obj:
album.metadata.label = album_obj["label"]["name"]
if (released_at := album_obj.get("released_at")) and released_at != 0:
if track_obj.get("parental_warning"):
track.metadata.explicit = True
if img := self.__get_image(track_obj):
- track.metadata.images = [MediaItemImage(type=ImageType.THUMB, path=img)]
+ track.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=img,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
return track
or playlist_obj["is_collaborative"]
)
if img := self.__get_image(playlist_obj):
- playlist.metadata.images = [MediaItemImage(type=ImageType.THUMB, path=img)]
- playlist.metadata.checksum = str(playlist_obj["updated_at"])
+ playlist.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=img,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
+ playlist.metadata.cache_checksum = str(playlist_obj["updated_at"])
return playlist
async def _auth_token(self):
name=country.name,
)
folder.metadata.images = [
- MediaItemImage(type=ImageType.THUMB, path=country.favicon)
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=country.favicon,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
]
yield folder
return
radio.metadata.label = radio_obj.tags
radio.metadata.popularity = radio_obj.votes
radio.metadata.links = [MediaItemLink(type=LinkType.WEBSITE, url=radio_obj.homepage)]
- radio.metadata.images = [MediaItemImage(type=ImageType.THUMB, path=radio_obj.favicon)]
+ radio.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=radio_obj.favicon,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
return radio
artist.metadata.description = artist_obj["description"]
if artist_obj.get("avatar_url"):
img_url = artist_obj["avatar_url"]
- artist.metadata.images = [MediaItemImage(type=ImageType.THUMB, path=img_url)]
+ artist.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=img_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
return artist
async def _parse_playlist(self, playlist_obj: dict) -> Playlist:
playlist.metadata.description = playlist_obj["description"]
if playlist_obj.get("artwork_url"):
playlist.metadata.images = [
- MediaItemImage(type=ImageType.THUMB, path=playlist_obj["artwork_url"])
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=playlist_obj["artwork_url"],
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
]
if playlist_obj.get("genre"):
playlist.metadata.genres = playlist_obj["genre"]
if track_obj.get("artwork_url"):
track.metadata.images = [
- MediaItemImage(type=ImageType.THUMB, path=track_obj["artwork_url"])
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=track_obj["artwork_url"],
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
]
if track_obj.get("description"):
track.metadata.description = track_obj["description"]
ContentType,
ImageType,
MediaItemImage,
+ MediaItemType,
MediaType,
Playlist,
PlaylistTrack,
liked_songs.metadata.images = [
MediaItemImage(
- type=ImageType.THUMB, path="https://misc.scdn.co/liked-songs/liked-songs-64.png"
+ type=ImageType.THUMB,
+ path="https://misc.scdn.co/liked-songs/liked-songs-64.png",
+ provider=self.domain,
+ remotely_accessible=True,
)
]
- liked_songs.metadata.checksum = str(time.time())
+ liked_songs.metadata.cache_checksum = str(time.time())
return liked_songs
if (item and item["id"])
]
- async def library_add(self, prov_item_id, media_type: MediaType):
+ async def library_add(self, item: MediaItemType):
"""Add item to library."""
result = False
- if media_type == MediaType.ARTIST:
- result = await self._put_data("me/following", {"ids": [prov_item_id]}, type="artist")
- elif media_type == MediaType.ALBUM:
- result = await self._put_data("me/albums", {"ids": [prov_item_id]})
- elif media_type == MediaType.TRACK:
- result = await self._put_data("me/tracks", {"ids": [prov_item_id]})
- elif media_type == MediaType.PLAYLIST:
+ if item.media_type == MediaType.ARTIST:
+ result = await self._put_data("me/following", {"ids": [item.item_id]}, type="artist")
+ elif item.media_type == MediaType.ALBUM:
+ result = await self._put_data("me/albums", {"ids": [item.item_id]})
+ elif item.media_type == MediaType.TRACK:
+ result = await self._put_data("me/tracks", {"ids": [item.item_id]})
+ elif item.media_type == MediaType.PLAYLIST:
result = await self._put_data(
- f"playlists/{prov_item_id}/followers", data={"public": False}
+ f"playlists/{item.item_id}/followers", data={"public": False}
)
return result
for img in artist_obj["images"]:
img_url = img["url"]
if "2a96cbd8b46e442fc41c2b86b821562f" not in img_url:
- artist.metadata.images = [MediaItemImage(type=ImageType.THUMB, path=img_url)]
+ artist.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=img_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
break
return artist
album.metadata.genre = set(album_obj["genres"])
if album_obj.get("images"):
album.metadata.images = [
- MediaItemImage(type=ImageType.THUMB, path=album_obj["images"][0]["url"])
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=album_obj["images"][0]["url"],
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
]
if "label" in album_obj:
album.metadata.label = album_obj["label"]
MediaItemImage(
type=ImageType.THUMB,
path=track_obj["album"]["images"][0]["url"],
+ provider=self.instance_id,
+ remotely_accessible=True,
)
]
if track_obj.get("copyright"):
)
if playlist_obj.get("images"):
playlist.metadata.images = [
- MediaItemImage(type=ImageType.THUMB, path=playlist_obj["images"][0]["url"])
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=playlist_obj["images"][0]["url"],
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
]
- playlist.metadata.checksum = str(playlist_obj["snapshot_id"])
+ playlist.metadata.cache_checksum = str(playlist_obj["snapshot_id"])
return playlist
async def login(self) -> dict:
for key, img_type in IMG_MAPPING.items():
for postfix in ("", "2", "3", "4", "5", "6", "7", "8", "9", "10"):
if img := artist_obj.get(f"{key}{postfix}"):
- metadata.images.append(MediaItemImage(type=img_type, path=img))
+ metadata.images.append(
+ MediaItemImage(
+ type=img_type,
+ path=img,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ )
else:
break
return metadata
for key, img_type in IMG_MAPPING.items():
for postfix in ("", "2", "3", "4", "5", "6", "7", "8", "9", "10"):
if img := album_obj.get(f"{key}{postfix}"):
- metadata.images.append(MediaItemImage(type=img_type, path=img))
+ metadata.images.append(
+ MediaItemImage(
+ type=img_type,
+ path=img,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ )
else:
break
return metadata
for key, img_type in IMG_MAPPING.items():
for postfix in ("", "2", "3", "4", "5", "6", "7", "8", "9", "10"):
if img := track_obj.get(f"{key}{postfix}"):
- metadata.images.append(MediaItemImage(type=img_type, path=img))
+ metadata.images.append(
+ MediaItemImage(
+ type=img_type,
+ path=img,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ )
else:
break
return metadata
ContentType,
ItemMapping,
MediaItemImage,
+ MediaItemType,
Playlist,
PlaylistTrack,
ProviderMapping,
for track in await get_similar_tracks(tidal_session, prov_track_id, limit)
]
- async def library_add(self, prov_item_id: str, media_type: MediaType) -> bool:
+ async def library_add(self, item: MediaItemType) -> bool:
"""Add item to library."""
tidal_session = await self._get_tidal_session()
return await library_items_add_remove(
tidal_session,
str(self._tidal_user_id),
- prov_item_id,
- media_type,
+ item.item_id,
+ item.media_type,
add=True,
)
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
)
]
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
)
]
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
)
]
return track
is_editable = bool(creator_id and str(creator_id) == self._tidal_user_id)
playlist.is_editable = is_editable
# metadata
- playlist.metadata.checksum = str(playlist_obj.last_updated)
+ playlist.metadata.cache_checksum = str(playlist_obj.last_updated)
playlist.metadata.popularity = playlist_obj.popularity
if picture := (playlist_obj.square_picture or playlist_obj.picture):
picture_id = picture.replace("-", "/")
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
)
]
radio.metadata.description = details["text"]
# images
if img := details.get("image"):
- radio.metadata.images = [MediaItemImage(type=ImageType.THUMB, path=img)]
+ radio.metadata.images = [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=img,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
if img := details.get("logo"):
- radio.metadata.images = [MediaItemImage(type=ImageType.LOGO, path=img)]
+ radio.metadata.images = [
+ MediaItemImage(
+ type=ImageType.LOGO,
+ path=img,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
return radio
async def get_stream_details(self, item_id: str) -> StreamDetails:
+++ /dev/null
-"""Basic provider allowing for external URL's to be streamed."""
-
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-from music_assistant.common.models.enums import ContentType, ImageType, MediaType, StreamType
-from music_assistant.common.models.errors import MediaNotFoundError
-from music_assistant.common.models.media_items import (
- Artist,
- AudioFormat,
- MediaItemImage,
- MediaItemType,
- ProviderMapping,
- Radio,
- Track,
-)
-from music_assistant.common.models.streamdetails import StreamDetails
-from music_assistant.server.helpers.tags import AudioTags, parse_tags
-from music_assistant.server.models.music_provider import MusicProvider
-
-if TYPE_CHECKING:
- from music_assistant.common.models.config_entries import (
- ConfigEntry,
- ConfigValueType,
- ProviderConfig,
- )
- from music_assistant.common.models.provider import ProviderManifest
- from music_assistant.server import MusicAssistant
- from music_assistant.server.models import ProviderInstanceType
-
-
-async def setup(
- mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
-) -> ProviderInstanceType:
- """Initialize provider(instance) with given configuration."""
- return URLProvider(mass, manifest, config)
-
-
-async def get_config_entries(
- mass: MusicAssistant,
- instance_id: str | None = None,
- action: str | None = None,
- values: dict[str, ConfigValueType] | None = None,
-) -> tuple[ConfigEntry, ...]:
- """
- Return Config entries to setup this provider.
-
- instance_id: id of an existing provider instance (None if new instance setup).
- action: [optional] action key called from config entries UI.
- values: the (intermediate) raw values for config entries sent with the action.
- """
- # ruff: noqa: ARG001
- return () # we do not have any config entries (yet)
-
-
-class URLProvider(MusicProvider):
- """Music Provider for manual URL's/files added to the queue."""
-
- def __init__(
- self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
- ) -> None:
- """Initialize MusicProvider."""
- super().__init__(mass, manifest, config)
- self._full_url = {}
-
- async def get_track(self, prov_track_id: str) -> Track:
- """Get full track details by id."""
- # always prefer db item for existing items to not overwrite user customizations
- db_item = await self.mass.music.tracks.get_library_item_by_prov_id(
- prov_track_id, self.instance_id
- )
- if db_item is None and not prov_track_id.startswith("http"):
- msg = f"Track not found: {prov_track_id}"
- raise MediaNotFoundError(msg)
- return await self.parse_item(prov_track_id)
-
- async def get_radio(self, prov_radio_id: str) -> Radio:
- """Get full radio details by id."""
- # always prefer db item for existing items to not overwrite user customizations
- db_item = await self.mass.music.radio.get_library_item_by_prov_id(
- prov_radio_id, self.instance_id
- )
- if db_item is None and not prov_radio_id.startswith("http"):
- msg = f"Radio not found: {prov_radio_id}"
- raise MediaNotFoundError(msg)
- return await self.parse_item(prov_radio_id)
-
- async def get_artist(self, prov_artist_id: str) -> Track:
- """Get full artist details by id."""
- artist = prov_artist_id
- # this is here for compatibility reasons only
- return Artist(
- item_id=artist,
- provider=self.domain,
- name=artist,
- provider_mappings={
- ProviderMapping(
- item_id=artist,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- available=False,
- )
- },
- )
-
- async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType:
- """Get single MediaItem from provider."""
- if media_type == MediaType.ARTIST:
- return await self.get_artist(prov_item_id)
- if media_type == MediaType.TRACK:
- return await self.get_track(prov_item_id)
- if media_type == MediaType.RADIO:
- return await self.get_radio(prov_item_id)
- if media_type == MediaType.UNKNOWN:
- return await self.parse_item(prov_item_id)
- raise NotImplementedError
-
- async def parse_item(
- self,
- url: str,
- force_refresh: bool = False,
- force_radio: bool = False,
- ) -> Track | Radio:
- """Parse plain URL to MediaItem of type Radio or Track."""
- media_info = await self._get_media_info(url, force_refresh)
- is_radio = media_info.get("icy-name") or not media_info.duration
- provider_mappings = {
- ProviderMapping(
- item_id=url,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
- audio_format=AudioFormat(
- content_type=ContentType.try_parse(media_info.format),
- sample_rate=media_info.sample_rate,
- bit_depth=media_info.bits_per_sample,
- bit_rate=media_info.bit_rate,
- ),
- )
- }
- if is_radio or force_radio:
- # treat as radio
- media_item = Radio(
- item_id=url,
- provider=self.domain,
- name=media_info.get("icy-name") or url,
- provider_mappings=provider_mappings,
- )
- else:
- media_item = Track(
- item_id=url,
- provider=self.domain,
- name=media_info.title or url,
- duration=int(media_info.duration or 0),
- artists=[await self.get_artist(artist) for artist in media_info.artists],
- provider_mappings=provider_mappings,
- )
-
- if media_info.has_cover_image:
- media_item.metadata.images = [
- MediaItemImage(type=ImageType.THUMB, path=url, provider="file")
- ]
- return media_item
-
- async def _get_media_info(self, url: str, force_refresh: bool = False) -> AudioTags:
- """Retrieve mediainfo for url."""
- # do we have some cached info for this url ?
- cache_key = f"{self.instance_id}.media_info.{url}"
- cached_info = await self.mass.cache.get(cache_key)
- if cached_info and not force_refresh:
- return AudioTags.parse(cached_info)
- # parse info with ffprobe (and store in cache)
- media_info = await parse_tags(url)
- if "authSig" in url:
- media_info.has_cover_image = False
- await self.mass.cache.set(cache_key, media_info.raw)
- return media_info
-
- async def get_stream_details(self, item_id: str) -> StreamDetails:
- """Get streamdetails for a track/radio."""
- media_info = await self._get_media_info(item_id)
- is_radio = media_info.get("icy-name") or not media_info.duration
- return StreamDetails(
- provider=self.instance_id,
- item_id=item_id,
- audio_format=AudioFormat(
- content_type=ContentType.try_parse(media_info.format),
- sample_rate=media_info.sample_rate,
- bit_depth=media_info.bits_per_sample,
- ),
- media_type=MediaType.RADIO if is_radio else MediaType.TRACK,
- stream_type=StreamType.HTTP,
- path=item_id,
- can_seek=not is_radio,
- )
+++ /dev/null
-{
- "type": "music",
- "domain": "url",
- "name": "URL",
- "description": "Built-in/generic provider to play music (or playlists) from a remote URL.",
- "codeowners": ["@music-assistant"],
- "requirements": [],
- "documentation": "https://music-assistant.io/music-providers/url/",
- "multi_instance": false,
- "builtin": true,
- "hidden": true,
- "load_by_default": true,
- "icon": "web"
-}
ImageType,
ItemMapping,
MediaItemImage,
+ MediaItemType,
MediaType,
Playlist,
PlaylistTrack,
return playlist_tracks[:25]
return []
- async def library_add(self, prov_item_id, media_type: MediaType) -> None:
+ async def library_add(self, item: MediaItemType) -> bool:
"""Add an item to the library."""
await self._check_oauth_token()
result = False
- if media_type == MediaType.ARTIST:
+ if item.media_type == MediaType.ARTIST:
result = await library_add_remove_artist(
- headers=self._headers, prov_artist_id=prov_item_id, add=True
+ headers=self._headers, prov_artist_id=item.item_id, add=True
)
- elif media_type == MediaType.ALBUM:
+ elif item.media_type == MediaType.ALBUM:
result = await library_add_remove_album(
- headers=self._headers, prov_item_id=prov_item_id, add=True
+ headers=self._headers, prov_item_id=item.item_id, add=True
)
- elif media_type == MediaType.PLAYLIST:
+ elif item.media_type == MediaType.PLAYLIST:
result = await library_add_remove_playlist(
- headers=self._headers, prov_item_id=prov_item_id, add=True
+ headers=self._headers, prov_item_id=item.item_id, add=True
)
- elif media_type == MediaType.TRACK:
+ elif item.media_type == MediaType.TRACK:
raise NotImplementedError
return result
playlist.owner = authors["name"]
else:
playlist.owner = self.instance_id
- playlist.metadata.checksum = playlist_obj.get("checksum")
+ playlist.metadata.cache_checksum = playlist_obj.get("checksum")
return playlist
async def _parse_track(self, track_obj: dict) -> Track | AlbumTrack | PlaylistTrack:
artist_id = VARIOUS_ARTISTS_YTM_ID
return self._get_item_mapping(MediaType.ARTIST, artist_id, artist_obj.get("name"))
- @classmethod
- async def _parse_thumbnails(cls, thumbnails_obj: dict) -> list[MediaItemImage]:
+ async def _parse_thumbnails(self, thumbnails_obj: dict) -> list[MediaItemImage]:
"""Parse and sort a list of thumbnails and return the highest quality."""
thumb = sorted(thumbnails_obj, key=itemgetter("width"), reverse=True)[0]
- return [MediaItemImage(type=ImageType.THUMB, path=thumb["url"])]
+ return [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=thumb["url"],
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
@classmethod
async def _parse_stream_format(cls, track_obj: dict) -> dict:
assert media_type == media_items.MediaType.TRACK
assert provider == "filesystem"
assert item_id == "Artist/Album/Track.flac"
- # test regular url to URL provider
+ # test regular url to builtin provider
test_uri = "http://radiostream.io/stream.mp3"
media_type, provider, item_id = await uri.parse_uri(test_uri)
assert media_type == media_items.MediaType.UNKNOWN
- assert provider == "url"
+ assert provider == "builtin"
assert item_id == "http://radiostream.io/stream.mp3"
- # test local file to URL provider
+ # test local file to builtin provider
test_uri = SILENCE_FILE
media_type, provider, item_id = await uri.parse_uri(test_uri)
assert media_type == media_items.MediaType.UNKNOWN
- assert provider == "url"
+ assert provider == "builtin"
assert item_id == SILENCE_FILE
# test invalid uri
with pytest.raises(MusicAssistantError):