# so the result is sorted as each provider delivered
return [item for sublist in zip_longest(*results_per_provider) for item in sublist if item]
- async def _get_default_recommendations(self) -> list[RecommendationFolder]:
- """Return default recommendations."""
- return [
- RecommendationFolder(
- item_id="in_progress",
- provider="library",
- name="In progress",
- translation_key="in_progress_items",
- icon="mdi-motion-play",
- items=await self.in_progress_items(limit=10),
- ),
- RecommendationFolder(
- item_id="recently_played",
- provider="library",
- name="Recently played",
- translation_key="recently_played",
- icon="mdi-motion-play",
- items=await self.recently_played(limit=10),
- ),
- RecommendationFolder(
- item_id="random_artists",
- provider="library",
- name="Random artists",
- translation_key="random_artists",
- icon="mdi-account-music",
- items=await self.artists.library_items(limit=10, order_by="random"),
- ),
- RecommendationFolder(
- item_id="random_albums",
- provider="library",
- name="Random albums",
- translation_key="random_albums",
- icon="mdi-album",
- items=await self.albums.library_items(limit=10, order_by="random"),
- ),
- RecommendationFolder(
- item_id="random_tracks",
- provider="library",
- name="Random tracks",
- translation_key="random_tracks",
- icon="mdi-file-music",
- items=await self.tracks.library_items(limit=10, order_by="random"),
- ),
- RecommendationFolder(
- item_id="random_playlists",
- provider="library",
- name="Random playlists",
- translation_key="random_playlists",
- icon="mdi-playlist-music",
- items=await self.playlists.library_items(limit=10, order_by="random"),
- ),
- ]
-
@api_command("music/item")
async def get_item(
self,
domains.add(provider.domain)
return instances
- def _start_provider_sync(self, provider: MusicProvider, media_type: MediaType) -> None:
- """Start sync task on provider and track progress."""
- # check if we're not already running a sync task for this provider/mediatype
- for sync_task in self.in_progress_syncs:
- if sync_task.provider_instance != provider.instance_id:
- continue
- if media_type in sync_task.media_types:
- self.logger.debug(
- "Skip sync task for %s because another task is already in progress",
- provider.name,
- )
- return
-
- async def run_sync() -> None:
- # Wrap the provider sync into a lock to prevent
- # race conditions when multiple providers are syncing at the same time.
- async with self._sync_lock:
- await provider.sync_library(media_type)
- # precache playlist tracks
- if media_type == MediaType.PLAYLIST:
- for playlist in await self.playlists.library_items(provider=provider.instance_id):
- async for _ in self.playlists.tracks(playlist.item_id, playlist.provider):
- pass
-
- # we keep track of running sync tasks
- task = self.mass.create_task(run_sync())
- sync_spec = SyncTask(
- provider_domain=provider.domain,
- provider_instance=provider.instance_id,
- media_types=(media_type,),
- task=task,
- )
- self.in_progress_syncs.append(sync_spec)
-
- self.mass.signal_event(EventType.SYNC_TASKS_UPDATED, data=self.in_progress_syncs)
-
- def on_sync_task_done(task: asyncio.Task) -> None:
- self.in_progress_syncs.remove(sync_spec)
- if task.cancelled():
- return
- if task_err := task.exception():
- self.logger.warning(
- "Sync task for %s completed with errors",
- provider.name,
- exc_info=task_err if self.logger.isEnabledFor(10) else None,
- )
- else:
- self.logger.info("Sync task for %s completed", provider.name)
- self.mass.signal_event(EventType.SYNC_TASKS_UPDATED, data=self.in_progress_syncs)
- # schedule db cleanup after sync
- if not self.in_progress_syncs:
- self.mass.create_task(self._cleanup_database())
-
- task.add_done_callback(on_sync_task_done)
-
async def cleanup_provider(self, provider_instance: str) -> None:
"""Cleanup provider records from the database."""
if provider_instance.startswith(("filesystem", "jellyfin", "plex", "opensubsonic")):
"Provider %s was not not fully removed from library", provider_instance
)
+ async def _get_default_recommendations(self) -> list[RecommendationFolder]:
+ """Return default recommendations."""
+ return [
+ RecommendationFolder(
+ item_id="in_progress",
+ provider="library",
+ name="In progress",
+ translation_key="in_progress_items",
+ icon="mdi-motion-play",
+ items=await self.in_progress_items(limit=10),
+ ),
+ RecommendationFolder(
+ item_id="recently_played",
+ provider="library",
+ name="Recently played",
+ translation_key="recently_played",
+ icon="mdi-motion-play",
+ items=await self.recently_played(limit=10),
+ ),
+ RecommendationFolder(
+ item_id="random_artists",
+ provider="library",
+ name="Random artists",
+ translation_key="random_artists",
+ icon="mdi-account-music",
+ items=await self.artists.library_items(limit=10, order_by="random"),
+ ),
+ RecommendationFolder(
+ item_id="random_albums",
+ provider="library",
+ name="Random albums",
+ translation_key="random_albums",
+ icon="mdi-album",
+ items=await self.albums.library_items(limit=10, order_by="random"),
+ ),
+ RecommendationFolder(
+ item_id="random_tracks",
+ provider="library",
+ name="Random tracks",
+ translation_key="random_tracks",
+ icon="mdi-file-music",
+ items=await self.tracks.library_items(limit=10, order_by="random"),
+ ),
+ RecommendationFolder(
+ item_id="random_playlists",
+ provider="library",
+ name="Random playlists",
+ translation_key="random_playlists",
+ icon="mdi-playlist-music",
+ items=await self.playlists.library_items(limit=10, order_by="random"),
+ ),
+ ]
+
+ def _start_provider_sync(self, provider: MusicProvider, media_type: MediaType) -> None:
+ """Start sync task on provider and track progress."""
+ # check if we're not already running a sync task for this provider/mediatype
+ for sync_task in self.in_progress_syncs:
+ if sync_task.provider_instance != provider.instance_id:
+ continue
+ if media_type in sync_task.media_types:
+ self.logger.debug(
+ "Skip sync task for %s because another task is already in progress",
+ provider.name,
+ )
+ return
+
+ async def run_sync() -> None:
+ # Wrap the provider sync into a lock to prevent
+ # race conditions when multiple providers are syncing at the same time.
+ async with self._sync_lock:
+ await provider.sync_library(media_type)
+ # precache playlist tracks
+ if media_type == MediaType.PLAYLIST:
+ for playlist in await self.playlists.library_items(provider=provider.instance_id):
+ async for _ in self.playlists.tracks(playlist.item_id, playlist.provider):
+ pass
+
+ # we keep track of running sync tasks
+ task = self.mass.create_task(run_sync())
+ sync_spec = SyncTask(
+ provider_domain=provider.domain,
+ provider_instance=provider.instance_id,
+ media_types=(media_type,),
+ task=task,
+ )
+ self.in_progress_syncs.append(sync_spec)
+
+ self.mass.signal_event(EventType.SYNC_TASKS_UPDATED, data=self.in_progress_syncs)
+
+ def on_sync_task_done(task: asyncio.Task) -> None:
+ self.in_progress_syncs.remove(sync_spec)
+ if task.cancelled():
+ return
+ if task_err := task.exception():
+ self.logger.warning(
+ "Sync task for %s completed with errors",
+ provider.name,
+ exc_info=task_err if self.logger.isEnabledFor(10) else None,
+ )
+ else:
+ self.logger.info("Sync task for %s completed", provider.name)
+ self.mass.signal_event(EventType.SYNC_TASKS_UPDATED, data=self.in_progress_syncs)
+ # schedule db cleanup after sync
+ if not self.in_progress_syncs:
+ self.mass.create_task(self._cleanup_database())
+
+ task.add_done_callback(on_sync_task_done)
+
def _schedule_sync(self) -> None:
"""Schedule the periodic sync."""
sync_interval = self.config.get_value(CONF_SYNC_INTERVAL) * 60
# schedule the first sync run
self.mass.loop.call_later(sync_interval, run_scheduled_sync)
+ def _sort_search_result(
+ self,
+ search_query: str,
+ items: Sequence[MediaItemTypeOrItemMapping],
+ ) -> UniqueList[MediaItemTypeOrItemMapping]:
+ """Sort search results on priority/preference."""
+ scored_items: list[tuple[int, MediaItemTypeOrItemMapping]] = []
+ # search results are already sorted by (streaming) providers on relevance
+ # but we prefer exact name matches and library items so we simply put those
+ # on top of the list.
+ safe_title_str = create_safe_string(search_query)
+ if " - " in search_query:
+ artist, title_alt = search_query.split(" - ", 1)
+ safe_title_alt = create_safe_string(title_alt)
+ safe_artist_str = create_safe_string(artist)
+ else:
+ safe_artist_str = None
+ safe_title_alt = None
+ for item in items:
+ score = 0
+ if create_safe_string(item.name) not in (safe_title_str, safe_title_alt):
+ # literal name match is mandatory to get a score at all
+ continue
+ # bonus point if artist provided and exact match
+ if safe_artist_str:
+ artist: Artist | ItemMapping
+ for artist in getattr(item, "artists", []):
+ if create_safe_string(artist.name) == safe_artist_str:
+ score += 1
+ # bonus point for library items
+ if item.provider == "library":
+ score += 1
+ scored_items.append((score, item))
+ scored_items.sort(key=lambda x: x[0], reverse=True)
+ # combine it all with uniquelist, so this will deduplicated by default
+ # note that streaming provider results are already (most likely) sorted on relevance
+ # so we add all remaining items in their original order. We just prioritize
+ # exact name matches and library items.
+ return UniqueList([*[x[1] for x in scored_items], *items])
+
async def _cleanup_database(self) -> None:
"""Perform database cleanup/maintenance."""
self.logger.debug("Performing database cleanup...")
"""
)
await self.database.commit()
-
- def _sort_search_result(
- self,
- search_query: str,
- items: Sequence[MediaItemTypeOrItemMapping],
- ) -> UniqueList[MediaItemTypeOrItemMapping]:
- """Sort search results on priority/preference."""
- scored_items: list[tuple[int, MediaItemTypeOrItemMapping]] = []
- # search results are already sorted by (streaming) providers on relevance
- # but we prefer exact name matches and library items so we simply put those
- # on top of the list.
- safe_title_str = create_safe_string(search_query)
- if " - " in search_query:
- artist, title_alt = search_query.split(" - ", 1)
- safe_title_alt = create_safe_string(title_alt)
- safe_artist_str = create_safe_string(artist)
- else:
- safe_artist_str = None
- safe_title_alt = None
- for item in items:
- score = 0
- if create_safe_string(item.name) not in (safe_title_str, safe_title_alt):
- # literal name match is mandatory to get a score at all
- continue
- # bonus point if artist provided and exact match
- if safe_artist_str:
- artist: Artist | ItemMapping
- for artist in getattr(item, "artists", []):
- if create_safe_string(artist.name) == safe_artist_str:
- score += 1
- # bonus point for library items
- if item.provider == "library":
- score += 1
- scored_items.append((score, item))
- scored_items.sort(key=lambda x: x[0], reverse=True)
- # combine it all with uniquelist, so this will deduplicated by default
- # note that streaming provider results are already (most likely) sorted on relevance
- # so we add all remaining items in their original order. We just prioritize
- # exact name matches and library items.
- return UniqueList([*[x[1] for x in scored_items], *items])