from __future__ import annotations
import asyncio
+import collections
import logging
import os
import random
from music_assistant.server.helpers.api import api_command
from music_assistant.server.helpers.compare import compare_strings
from music_assistant.server.helpers.images import create_collage, get_image_thumb
+from music_assistant.server.helpers.throttle_retry import Throttler
from music_assistant.server.models.core_controller import CoreController
if TYPE_CHECKING:
REFRESH_INTERVAL_ALBUMS = 60 * 60 * 24 * 90 # 90 days
REFRESH_INTERVAL_TRACKS = 60 * 60 * 24 * 90 # 90 days
REFRESH_INTERVAL_PLAYLISTS = 60 * 60 * 24 * 7 # 7 days
+PERIODIC_SCAN_INTERVAL = 60 * 60 * 24 # 1 day
CONF_ENABLE_ONLINE_METADATA = "enable_online_metadata"
"Music Assistant's core controller which handles all metadata for music."
)
self.manifest.icon = "book-information-variant"
- self._scanner_task: asyncio.Task | None = None
+ self._lookup_jobs: MetadataLookupQueue = MetadataLookupQueue()
+ self._lookup_task: asyncio.Task | None = None
+ self._throttler = Throttler(1, 30)
+ self._missing_metadata_scan_task: asyncio.Task | None = None
async def get_config_entries(
self,
self._collage_images_dir = os.path.join(self.mass.storage_path, "collage_images")
if not await asyncio.to_thread(os.path.exists, self._collage_images_dir):
await asyncio.to_thread(os.mkdir, self._collage_images_dir)
-
self.mass.streams.register_dynamic_route("/imageproxy", self.handle_imageproxy)
+ # the lookup task is used to process metadata lookup jobs
+ self._lookup_task = self.mass.create_task(self._process_metadata_lookup_jobs())
+ # just tun the scan for missing metadata once at startup
+ # TODO: allows to enable/disable this in the UI and configure interval/time
+ self._missing_metadata_scan_task = self.mass.create_task(self._scan_missing_metadata())
async def close(self) -> None:
"""Handle logic on server stop."""
- self.stop_metadata_scanner()
+ if self._lookup_task and not self._lookup_task.done():
+ self._lookup_task.cancel()
+ if self._missing_metadata_scan_task and not self._missing_metadata_scan_task.done():
+ self._missing_metadata_scan_task.cancel()
self.mass.streams.unregister_dynamic_route("/imageproxy")
@property
if item.provider != "library":
# this shouldn't happen but just in case.
raise RuntimeError("Metadata can only be updated for library items")
- if item.media_type == MediaType.ARTIST:
- await self._update_artist_metadata(item, force_refresh=force_refresh)
- if item.media_type == MediaType.ALBUM:
- await self._update_album_metadata(item, force_refresh=force_refresh)
- if item.media_type == MediaType.TRACK:
- await self._update_track_metadata(item, force_refresh=force_refresh)
- if item.media_type == MediaType.PLAYLIST:
- await self._update_playlist_metadata(item, force_refresh=force_refresh)
+ # just in case it was in the queue, prevent duplicate lookups
+ self._lookup_jobs.pop(item.uri)
+ async with self._throttler:
+ if item.media_type == MediaType.ARTIST:
+ await self._update_artist_metadata(item, force_refresh=force_refresh)
+ if item.media_type == MediaType.ALBUM:
+ await self._update_album_metadata(item, force_refresh=force_refresh)
+ if item.media_type == MediaType.TRACK:
+ await self._update_track_metadata(item, force_refresh=force_refresh)
+ if item.media_type == MediaType.PLAYLIST:
+ await self._update_playlist_metadata(item, force_refresh=force_refresh)
return item
- def schedule_update_metadata(self, item: MediaItemType) -> None:
- """Schedule metadata update for given item."""
- task_id = f"metadata_update_{item.uri}"
- self.mass.call_later(5, self.update_metadata, item, task_id=task_id)
-
- @api_command("metadata/start_scan")
- def start_metadata_scanner(self) -> None:
- """
- Start scanner for (missing) metadata.
-
- Usually this is triggered by the music controller after finishing a library sync.
- """
- if self._scanner_task and not self._scanner_task.done():
- # already running
+ def schedule_update_metadata(self, uri: str) -> None:
+ """Schedule metadata update for given MediaItem uri."""
+ if "library" not in uri:
return
- self._scanner_task = self.mass.create_task(self._metadata_scanner())
-
- @api_command("metadata/stop_scan")
- def stop_metadata_scanner(self) -> None:
- """Stop scanner for (missing) metadata."""
- if self._scanner_task and not self._scanner_task.done():
- self._scanner_task.cancel()
- self._scanner_task = None
+ with suppress(asyncio.QueueFull):
+ self._lookup_jobs.put_nowait(uri)
async def get_image_data_for_item(
self,
)
return None
- async def _metadata_scanner(self) -> None:
- """Scanner for (missing) metadata."""
+ async def _process_metadata_lookup_jobs(self) -> None:
+ """Task to process metadata lookup jobs."""
+ while True:
+ item_uri = await self._lookup_jobs.get()
+ try:
+ item = await self.mass.music.get_item_by_uri(item_uri)
+ await self.update_metadata(item)
+ except Exception as err:
+ self.logger.error(
+ "Error while updating metadata for %s: %s",
+ item_uri,
+ str(err),
+ exc_info=err if self.logger.isEnabledFor(10) else None,
+ )
+
+ async def _scan_missing_metadata(self) -> None:
+ """Scanner for (missing) metadata, periodically in the background."""
+ self._periodic_scan = None
# Scan for missing artist images
self.logger.debug("Start lookup for missing artist images...")
query = (
f"OR json_extract({DB_TABLE_ARTISTS}.metadata,'$.images') = '[]')"
)
for artist in await self.mass.music.artists.library_items(extra_query=query):
- await self._update_artist_metadata(artist)
- # sleep a bit to not overload the providers
- await asyncio.sleep(10)
+ self.schedule_update_metadata(artist.uri)
# Scan for missing album images
self.logger.debug("Start lookup for missing album images...")
for album in await self.mass.music.albums.library_items(
limit=50, order_by="random", extra_query=query
):
- await self._update_album_metadata(album)
- # sleep a bit to not overload the providers
- await asyncio.sleep(10)
+ self.schedule_update_metadata(album.uri)
# Force refresh playlist metadata every refresh interval
# this will e.g. update the playlist image and genres if the tracks have changed
for playlist in await self.mass.music.playlists.library_items(
limit=10, order_by="random", extra_query=query
):
- await self._update_playlist_metadata(playlist, True)
+ self.schedule_update_metadata(playlist.uri)
+
+
+class MetadataLookupQueue(asyncio.Queue):
+ """Representation of a queue for metadata lookups."""
+
+ def _init(self, maxlen: int = 100):
+ self._queue: collections.deque[str] = collections.deque(maxlen=maxlen)
+
+ def _put(self, item: str) -> None:
+ if item not in self._queue:
+ self._queue.append(item)
+
+ def pop(self, item: str) -> None:
+ """Remove item from queue."""
+ self._queue.remove(item)