from __future__ import annotations
import asyncio
+import hashlib
import itertools
import os
import random
from base64 import b64decode
+from collections import OrderedDict
from collections.abc import Iterable
from io import BytesIO
from typing import TYPE_CHECKING, cast
from music_assistant.mass import MusicAssistant
+# Thumbnail cache: on-disk (persistent) + small in-memory FIFO (hot path)
+_THUMB_CACHE_DIR = "thumbnails"
+_THUMB_MEMORY_CACHE_MAX = 50
+
+_thumb_memory_cache: OrderedDict[str, bytes] = OrderedDict()
+
+
+def _create_thumb_hash(provider: str, path_or_url: str) -> str:
+ """Create a safe filesystem hash from provider and image path."""
+ raw = f"{provider}/{path_or_url}"
+ return hashlib.sha256(raw.encode(), usedforsecurity=False).hexdigest()
+
+
+def _thumb_cache_filename(thumb_hash: str, size: int | None, image_format: str) -> str:
+ """Build the cache filename for a thumbnail."""
+ ext = image_format.lower()
+ if ext == "jpeg":
+ ext = "jpg"
+ return f"{thumb_hash}_{size or 0}.{ext}"
+
+
+def _get_from_memory_cache(key: str) -> bytes | None:
+ """Retrieve thumbnail from in-memory FIFO cache."""
+ if key in _thumb_memory_cache:
+ _thumb_memory_cache.move_to_end(key)
+ return _thumb_memory_cache[key]
+ return None
+
+
+def _put_in_memory_cache(key: str, data: bytes) -> None:
+ """Store thumbnail in in-memory FIFO cache."""
+ _thumb_memory_cache[key] = data
+ _thumb_memory_cache.move_to_end(key)
+ while len(_thumb_memory_cache) > _THUMB_MEMORY_CACHE_MAX:
+ _thumb_memory_cache.popitem(last=False)
+
+
async def get_image_data(mass: MusicAssistant, path_or_url: str, provider: str) -> bytes:
"""Create thumbnail from image url."""
# TODO: add local cache here !
provider: str,
image_format: str = "PNG",
) -> bytes:
- """Get (optimized) PNG thumbnail from image url."""
+ """Get (optimized) thumbnail from image url.
+
+ Uses a two-tier cache (in-memory FIFO + on-disk) keyed by a hash of
+ provider + path so that repeated requests never trigger ffmpeg or
+ PIL processing again. Concurrent requests for the same thumbnail
+ are de-duplicated via create_task.
+
+ :param mass: The MusicAssistant instance.
+ :param path_or_url: Path or URL to the source image.
+ :param size: Target thumbnail size (square), or None for original.
+ :param provider: Provider identifier for the image source.
+ :param image_format: Output format (PNG or JPEG/JPG).
+ """
+ image_format = image_format.upper()
+ if image_format == "JPG":
+ image_format = "JPEG"
+
+ thumb_hash = _create_thumb_hash(provider, path_or_url)
+ cache_filename = _thumb_cache_filename(thumb_hash, size, image_format)
+
+ # 1. Check in-memory FIFO cache
+ if cached := _get_from_memory_cache(cache_filename):
+ return cached
+
+ # 2. Check on-disk cache
+ thumb_dir = os.path.join(mass.cache_path, _THUMB_CACHE_DIR)
+ cache_filepath = os.path.join(thumb_dir, cache_filename)
+ if await asyncio.to_thread(os.path.isfile, cache_filepath):
+ async with aiofiles.open(cache_filepath, "rb") as f:
+ thumb_data = cast("bytes", await f.read())
+ _put_in_memory_cache(cache_filename, thumb_data)
+ return thumb_data
+
+ # 3. Generate thumbnail (de-duplicated across concurrent requests)
+ task: asyncio.Task[bytes] = mass.create_task(
+ _generate_and_cache_thumb,
+ mass,
+ path_or_url,
+ size,
+ provider,
+ image_format,
+ cache_filepath,
+ task_id=f"thumb.{cache_filename}",
+ abort_existing=False,
+ )
+ thumb_data = await asyncio.shield(task)
+ _put_in_memory_cache(cache_filename, thumb_data)
+ return thumb_data
+
+
+async def _generate_and_cache_thumb(
+ mass: MusicAssistant,
+ path_or_url: str,
+ size: int | None,
+ provider: str,
+ image_format: str,
+ cache_filepath: str,
+) -> bytes:
+ """Generate a thumbnail, persist it on disk, and return the bytes.
+
+ :param mass: The MusicAssistant instance.
+ :param path_or_url: Path or URL to the source image.
+ :param size: Target thumbnail size (square), or None for original.
+ :param provider: Provider identifier for the image source.
+ :param image_format: Normalized output format (PNG or JPEG).
+ :param cache_filepath: Absolute path where the thumbnail will be stored.
+ """
img_data = await get_image_data(mass, path_or_url, provider)
if not img_data or not isinstance(img_data, bytes):
raise FileNotFoundError(f"Image not found: {path_or_url}")
if not size and image_format.encode() in img_data:
- return img_data
+ thumb_data = img_data
+ else:
- image_format = image_format.upper()
- if image_format == "JPG":
- image_format = "JPEG"
+ def _create_image() -> bytes:
+ data = BytesIO()
+ try:
+ img = Image.open(BytesIO(img_data))
+ except UnidentifiedImageError:
+ raise FileNotFoundError(f"Invalid image: {path_or_url}")
+ if size:
+ img.thumbnail((size, size), Image.Resampling.LANCZOS)
+ mode = "RGBA" if image_format == "PNG" else "RGB"
+ if image_format == "JPEG":
+ img.convert(mode).save(data, image_format, quality=95, optimize=False)
+ else:
+ img.convert(mode).save(data, image_format, optimize=False)
+ return data.getvalue()
- def _create_image() -> bytes:
- data = BytesIO()
- try:
- img = Image.open(BytesIO(img_data))
- except UnidentifiedImageError:
- raise FileNotFoundError(f"Invalid image: {path_or_url}")
- if size:
- # Use LANCZOS for high quality downsampling
- img.thumbnail((size, size), Image.Resampling.LANCZOS)
-
- mode = "RGBA" if image_format == "PNG" else "RGB"
-
- # Save with high quality settings
- if image_format == "JPEG":
- # For JPEG, use quality=95 for better quality
- img.convert(mode).save(data, image_format, quality=95, optimize=False)
- else:
- # For PNG, disable optimize to preserve quality
- img.convert(mode).save(data, image_format, optimize=False)
- return data.getvalue()
+ thumb_data = await asyncio.to_thread(_create_image)
- image_format = image_format.upper()
- return await asyncio.to_thread(_create_image)
+ # Persist to disk cache (best-effort, don't fail on I/O errors)
+ try:
+ await asyncio.to_thread(os.makedirs, os.path.dirname(cache_filepath), exist_ok=True)
+ async with aiofiles.open(cache_filepath, "wb") as f:
+ await f.write(thumb_data)
+ except OSError:
+ pass
+
+ return thumb_data
async def create_collage(