import asyncio
import itertools
+import os
import random
from collections.abc import Iterable
from io import BytesIO
from music_assistant.server.models.music_provider import MusicProvider
-async def get_image_data(mass: MusicAssistant, path_or_url: str, provider: str = "url") -> bytes:
+async def get_image_data(mass: MusicAssistant, path_or_url: str, provider: str) -> bytes:
"""Create thumbnail from image url."""
+ # TODO: add local cache here !
if prov := mass.get_provider(provider):
prov: MusicProvider | MetadataProvider
- if resolved_data := await prov.resolve_image(path_or_url):
- if isinstance(resolved_data, bytes):
- return resolved_data
- return await get_embedded_image(resolved_data)
- # always use ffmpeg to get the image because it supports
- # both online and offline image files as well as embedded images in media files
+ if resolved_image := await prov.resolve_image(path_or_url):
+ if isinstance(resolved_image, bytes):
+ return resolved_image
+ if isinstance(resolved_image, str):
+ path_or_url = resolved_image
+ # handle HTTP location
+ if path_or_url.startswith("http"):
+ async with mass.http_session.get(path_or_url) as resp:
+ return await resp.read()
+ # handle FILE location (of type image)
+ if path_or_url.endswith(("jpg", "JPG", "png", "PNG", "jpeg")):
+ if await asyncio.to_thread(os.path.isfile, path_or_url):
+ async with aiofiles.open(path_or_url, "rb") as _file:
+ return await _file.read()
+ # use ffmpeg for embedded images
if img_data := await get_embedded_image(path_or_url):
return img_data
msg = f"Image not found: {path_or_url}"
mass: MusicAssistant,
path_or_url: str,
size: int | None,
- provider: str = "url",
+ provider: str,
image_format: str = "PNG",
) -> bytes:
"""Get (optimized) PNG thumbnail from image url."""
img_data = await get_image_data(mass, path_or_url, provider)
- if not img_data:
+ if not img_data or not isinstance(img_data, bytes):
raise FileNotFoundError(f"Image not found: {path_or_url}")
+ if not size and image_format.encode() in img_data:
+ return img_data
+
def _create_image():
data = BytesIO()
img = Image.open(BytesIO(img_data))
if size:
img.thumbnail((size, size), Image.LANCZOS) # pylint: disable=no-member
- img.convert("RGB").save(data, image_format, optimize=True)
+ mode = "RGBA" if image_format == "PNG" else "RGB"
+ img.convert(mode).save(data, image_format, optimize=True)
return data.getvalue()
return await asyncio.to_thread(_create_image)
image_size = 250
def _new_collage():
- return Image.new("RGBA", (dimensions[0], dimensions[1]), color=(255, 255, 255, 255))
+ return Image.new("RGB", (dimensions[0], dimensions[1]), color=(255, 255, 255, 255))
collage = await asyncio.to_thread(_new_collage)
def _add_to_collage(img_data: bytes, coord_x: int, coord_y: int) -> None:
data = BytesIO(img_data)
- photo = Image.open(data).convert("RGBA")
+ photo = Image.open(data).convert("RGB")
photo = photo.resize((image_size, image_size))
collage.paste(photo, (coord_x, coord_y))
async for chunk in self.read_file_content(streamdetails.item_id, seek_bytes):
yield chunk
- async def resolve_image(self, path: str) -> str | bytes | AsyncGenerator[bytes, None]:
+ async def resolve_image(self, path: str) -> str | bytes:
"""
Resolve an image from an image path.
a string with an http(s) URL or local path that is accessible from the server.
"""
file_item = await self.resolve(path)
- return file_item.local_path or self.read_file_content(file_item.absolute_path)
+ if file_item.local_path:
+ return file_item.local_path
+ return file_item.absolute_path
async def _parse_track(self, file_item: FileSystemItem) -> Track:
"""Get full track details by id."""