await self.update_db_item(db_album.item_id, prov_album)
match_found = True
# while we're here, also match the artist
- if db_album.artist.provider == "database":
+ if db_album.artist.provider == ProviderType.DATABASE:
await self.mass.music.artists.update_db_item(
db_album.artist.item_id, prov_album.artist
)
if isinstance(album.artist, ItemMapping):
return album.artist
- if album.artist.provider == "database":
+ if album.artist.provider == ProviderType.DATABASE:
return ItemMapping.from_item(album.artist)
if album.artist.musicbrainz_id:
"""Filesystem musicprovider support for MusicAssistant."""
from __future__ import annotations
-import asyncio
import os
import urllib.parse
from contextlib import asynccontextmanager
MediaType.ALBUM,
]
- def __init__(self, *args, **kwargs) -> None:
- """Initialize MusicProvider."""
- super().__init__(*args, **kwargs)
- self._cache_built = asyncio.Event()
-
async def setup(self) -> bool:
"""Handle async initialization of the provider."""
return True
- @staticmethod
- async def search(*args, **kwargs) -> List[MediaItemType]:
+ async def search(
+ self, search_query: str, media_types=Optional[List[MediaType]], limit: int = 5
+ ) -> List[MediaItemType]:
"""Perform search on musicprovider."""
- # items for the filesystem provider are already returned by the database
- return []
+ result = []
+ # searching the filesystem is slow and unreliable,
+ # instead we make some (slow) freaking queries to the db ;-)
+ if " - " in search_query:
+ artist, title = search_query.split(" - ", 1)
+ else:
+ artist = None
+ title = search_query
+ if media_types is None or MediaType.TRACK in media_types:
+ query = f"SELECT * FROM tracks WHERE name LIKE '%{title}%'"
+ query += f" AND provider_ids LIKE '%\"{self.type.value}\"%')"
+ if artist:
+ query += f" AND artists LIKE '%{artist}%')"
+ tracks = await self.mass.music.tracks.get_db_items(query)
+ result.append(tracks)
+ if media_types is None or MediaType.ALBUM in media_types:
+ query = f"SELECT * FROM albums WHERE name LIKE '%{title}%'"
+ query += f" AND provider_ids LIKE '%\"{self.type.value}\"%')"
+ if artist:
+ query += f" AND artist LIKE '%{artist}%')"
+ albums = await self.mass.music.albums.get_db_items(query)
+ result.append(albums)
+ if media_types is None or MediaType.ARTIST in media_types:
+ query = f"SELECT * FROM artists WHERE name LIKE '%{title}%'"
+ query += f" AND provider_ids LIKE '%\"{self.type.value}\"%')"
+ artists = await self.mass.music.artists.get_db_items(query)
+ result.append(artists)
+ if media_types is None or MediaType.PLAYLIST in media_types:
+ query = f"SELECT * FROM playlists WHERE name LIKE '%{title}%'"
+ query += f" AND provider_ids LIKE '%\"{self.type.value}\"%')"
+ playlists = await self.mass.music.playlists.get_db_items(query)
+ result.append(playlists)
+ return result
async def sync_library(self) -> None:
"""Run library sync for this provider."""
async def library_remove(self, *args, **kwargs) -> bool:
"""Remove item from provider's library. Return true on succes."""
# already handled by database
- # TODO: do we want to process deletions here ?
+ # TODO: do we want to process/offer deletions here ?
async def add_playlist_tracks(
self, prov_playlist_id: str, prov_track_ids: List[str]