"""Model for a image."""
type: ImageType
- url: str
- source: str = "http" # set to instance_id of file provider if path is local
+ path: str
+ # set to instance_id of provider if the path needs to be resolved
+ # if the path is just a plain (remotely accessible) URL, set it to 'url'
+ provider: str = "url"
def __hash__(self):
"""Return custom hash."""
- return hash(self.url)
+ return hash(self.type.value, self.path)
@dataclass(frozen=True)
def __hash__(self):
"""Return custom hash."""
- return hash(self.number)
+ return hash(self.chapter_id)
@dataclass
"""Return single configentry value for a player."""
conf = self.get(f"{CONF_PLAYERS}/{player_id}")
if not conf:
- player = self.mass.players.get(player_id)
- if not player:
- raise PlayerUnavailableError(f"Player {player_id} is not available")
+ player = self.mass.players.get(player_id, True)
conf = {"provider": player.provider, "player_id": player_id, "values": {}}
prov = self.mass.get_provider(conf["provider"])
entries = DEFAULT_PLAYER_CONFIG_ENTRIES + prov.get_player_config_entries(player_id)
self,
media_item: MediaItemType,
img_type: ImageType = ImageType.THUMB,
- resolve_local: bool = True,
- allow_local: bool = True,
+ resolve: bool = True,
) -> str | None:
"""Get url to image for given media media_item."""
if not media_item:
for img in media_item.metadata.images:
if img.type != img_type:
continue
- if img.source != "http" and not allow_local:
+ if img.provider != "url" and not resolve:
continue
- if img.source != "http" and resolve_local:
- # return imageproxy url for local filesystem items
+ if img.provider != "url" and resolve:
+ # return imageproxy url for images that need to be resolved
# the original path is double encoded
encoded_url = urllib.parse.quote(urllib.parse.quote(img.url))
return f"{self.mass.webserver.base_url}/imageproxy?path={encoded_url}"
# retry with track's album
if media_item.media_type == MediaType.TRACK and media_item.album:
- return await self.get_image_url_for_item(media_item.album, img_type, resolve_local)
+ return await self.get_image_url_for_item(media_item.album, img_type, resolve)
# try artist instead for albums
if media_item.media_type == MediaType.ALBUM and media_item.artist:
- return await self.get_image_url_for_item(media_item.artist, img_type, resolve_local)
+ return await self.get_image_url_for_item(media_item.artist, img_type, resolve)
# last resort: track artist(s)
if media_item.media_type == MediaType.TRACK and media_item.artists:
for artist in media_item.artists:
- return await self.get_image_url_for_item(artist, img_type, resolve_local)
+ return await self.get_image_url_for_item(artist, img_type, resolve)
return None
async def get_thumbnail(
- self, path_or_url: str, size: int | None = None, source: str = "http", base64: bool = False
+ self, path: str, size: int | None = None, provider: str = "url", base64: bool = False
) -> bytes | str:
"""Get/create thumbnail image for path (image url or local path)."""
- thumbnail = await get_image_thumb(self.mass, path_or_url, size=size, source=source)
+ thumbnail = await get_image_thumb(self.mass, path, size=size, provider=provider)
if base64:
enc_image = b64encode(thumbnail).decode()
thumbnail = f"data:image/png;base64,{enc_image}"
async def _handle_imageproxy(self, request: web.Request) -> web.Response:
"""Handle request for image proxy."""
path = request.query["path"]
- source = request.query.get("source", "http")
+ provider = request.query.get("provider", "url")
size = int(request.query.get("size", "0"))
if "%" in path:
# assume (double) encoded url, decode it
path = urllib.parse.unquote(path)
with suppress(FileNotFoundError):
- image_data = await self.get_thumbnail(path, size=size, source=source)
+ image_data = await self.get_thumbnail(path, size=size, provider=provider)
# we set the cache header to 1 year (forever)
# the client can use the checksum value to refresh when content changes
return web.Response(
from __future__ import annotations
import asyncio
-import contextlib
import logging
from collections.abc import Iterator
from typing import TYPE_CHECKING, cast
self,
player_id: str,
raise_unavailable: bool = False,
- ) -> Player:
+ ) -> Player | None:
"""Return Player by player_id."""
if player := self._players.get(player_id):
if (not player.available or not player.enabled) and raise_unavailable:
raise PlayerUnavailableError(f"Player {player_id} is not available")
return player
- raise PlayerUnavailableError(f"Player {player_id} is not available")
+ if raise_unavailable:
+ raise PlayerUnavailableError(f"Player {player_id} is not available")
+ return None
@api_command("players/get_by_name")
def get_by_name(self, name: str) -> Player | None:
await player_provider.cmd_pause(player_id)
async def _watch_pause(_player_id: str) -> None:
- player = self.get(_player_id)
+ player = self.get(_player_id, True)
count = 0
# wait for pause
while count < 5 and player.state == PlayerState.PLAYING:
# it is the master in a sync group and thus always present as child player
child_players.append(player)
for child_id in player.group_childs:
- with contextlib.suppress(PlayerUnavailableError):
- if child_player := self.get(child_id):
- if not (not only_powered or child_player.powered):
- continue
- if not (
- not only_playing
- or child_player.state in (PlayerState.PLAYING, PlayerState.PAUSED)
- ):
- continue
- child_players.append(child_player)
+ if child_player := self.get(child_id, False):
+ if not (not only_powered or child_player.powered):
+ continue
+ if not (
+ not only_playing
+ or child_player.state in (PlayerState.PLAYING, PlayerState.PAUSED)
+ ):
+ continue
+ child_players.append(child_player)
return child_players
async def _poll_players(self) -> None:
import asyncio
import random
-from base64 import b64decode, b64encode
+from base64 import b64encode
from io import BytesIO
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from music_assistant.server import MusicAssistant
- from music_assistant.server.providers.filesystem_local.base import FileSystemProviderBase
+ from music_assistant.server.models.music_provider import MusicProvider
-async def get_image_data(mass: MusicAssistant, path_or_url: str, source: str = "http") -> bytes:
+async def get_image_data(mass: MusicAssistant, path_or_url: str, provider: str = "url") -> bytes:
"""Create thumbnail from image url."""
- if source != "http" and (prov := mass.get_provider(source)):
- prov: FileSystemProviderBase
- file_item = await prov.resolve(path_or_url)
- # store images in cache db if file larger than 5mb and we have no direct access to the file
- use_cache = not file_item.local_path and (
- not file_item.file_size or file_item.file_size > 5000000
- )
- cache_key = f"embedded_image.{path_or_url}.{source}"
- if use_cache and (
- cache_data := await mass.cache.get(cache_key, checksum=file_item.checksum)
- ):
- return b64decode(cache_data)
- # read from file
- input_file = file_item.local_path or prov.read_file_content(file_item.absolute_path)
- if img_data := await get_embedded_image(input_file):
- if use_cache:
- await mass.cache.set(
- cache_key, b64encode(img_data).decode(), checksum=file_item.checksum
- )
- return img_data
+ if provider != "url" and (prov := mass.get_provider(provider)):
+ prov: MusicProvider
+ if resolved_data := await prov.resolve_image(path_or_url):
+ if isinstance(resolved_data, bytes):
+ return resolved_data
+ return await get_embedded_image(resolved_data)
# always use ffmpeg to get the image because it supports
# both online and offline image files as well as embedded images in media files
if img_data := await get_embedded_image(path_or_url):
async def get_image_thumb(
- mass: MusicAssistant, path_or_url: str, size: int | None, source: str = "http"
+ mass: MusicAssistant, path_or_url: str, size: int | None, provider: str = "url"
) -> bytes:
"""Get (optimized) PNG thumbnail from image url."""
- img_data = await get_image_data(mass, path_or_url, source)
+ img_data = await get_image_data(mass, path_or_url, provider)
def _create_image():
data = BytesIO()
if streamdetails.direct is None:
raise NotImplementedError
+ async def resolve_image(self, path: str) -> str | bytes | AsyncGenerator[bytes, None]:
+ """
+ Resolve an image from an image path.
+
+ This either returns (a generator to get) raw bytes of the image or
+ a string with an http(s) URL or local path that is accessible from the server.
+ """
+ raise NotImplementedError
+
async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType:
"""Get single MediaItem from provider."""
if media_type == MediaType.ARTIST:
async for chunk in self.read_file_content(streamdetails.item_id, seek_bytes):
yield chunk
+ async def resolve_image(self, path: str) -> str | bytes | AsyncGenerator[bytes, None]:
+ """
+ Resolve an image from an image path.
+
+ This either returns (a generator to get) raw bytes of the image or
+ a string with an http(s) URL or local path that is accessible from the server.
+ """
+ file_item = await self.resolve(path)
+ return file_item.local_path or self.read_file_content(file_item.absolute_path)
+
async def _parse_track(self, file_item: FileSystemItem) -> Track:
"""Get full track details by id."""
# ruff: noqa: PLR0915, PLR0912