print(f"Got {track_count} tracks ({track_count_lib} in library)")
radio_count = await mass.music.radio.count(True)
print(f"Got {radio_count} radio stations in library")
- playlist_count = await mass.music.playlists.library(True)
+ playlist_count = await mass.music.playlists.db_items(True)
print(f"Got {len(playlist_count)} playlists in library")
# register a player
test_player1 = TestPlayer("test1")
await mass.music.start_sync(schedule=3)
# get some data
- await mass.music.artists.library()
- await mass.music.tracks.library()
- await mass.music.radio.library()
+ await mass.music.artists.db_items()
+ await mass.music.tracks.db_items()
+ await mass.music.radio.db_items()
# run for an hour until someone hits CTRL+C
await asyncio.sleep(3600)
ProviderUnavailableError,
SetupFailedError,
)
-from music_assistant.models.media_items import MediaItem, MediaItemType, media_from_dict
+from music_assistant.models.media_items import (
+ BrowseFolder,
+ MediaItem,
+ MediaItemType,
+ media_from_dict,
+)
from music_assistant.models.music_provider import MusicProvider
from music_assistant.music_providers.filesystem import FileSystemProvider
from music_assistant.music_providers.qobuz import QobuzProvider
)
return items
+ async def browse(self, uri: Optional[str] = None) -> List[BrowseFolder]:
+ """Browse Music providers."""
+ # root level; folder per provider
+ if not uri:
+ return [
+ BrowseFolder(prov.id, prov.type, prov.name, uri=f"{prov.id}://")
+ for prov in self.providers
+ if prov.supports_browse
+ ]
+ # provider level
+ provider_id, path = uri.split("://", 1)
+ prov = self.get_provider(provider_id)
+ return await prov.browse(path)
+
async def get_item_by_uri(
self, uri: str, force_refresh: bool = False, lazy: bool = True
) -> MediaItemType:
from music_assistant.helpers.database import TABLE_ALBUMS, TABLE_TRACKS
from music_assistant.helpers.json import json_serializer
from music_assistant.helpers.tags import FALLBACK_ARTIST
-from music_assistant.models.enums import EventType, ProviderType
-from music_assistant.models.event import MassEvent
+from music_assistant.models.enums import ProviderType
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
Album,
# insert new item
album_artists = await self._get_album_artists(item, cur_item)
+ if album_artists:
+ sort_artist = album_artists[0].sort_name
+ else:
+ sort_artist = ""
new_item = await self.mass.database.insert(
self.db_table,
{
**item.to_db_row(),
"artists": json_serializer(album_artists) or None,
+ "sort_artist": sort_artist,
},
)
item_id = new_item["item_id"]
self.logger.debug("added %s to database", item.name)
# return created object
db_item = await self.get_db_item(item_id)
- self.mass.signal_event(
- MassEvent(EventType.MEDIA_ITEM_ADDED, object_id=db_item.uri, data=db_item)
- )
return db_item
async def update_db_item(
else:
album_type = cur_item.album_type
+ if album_artists:
+ sort_artist = album_artists[0].sort_name
+ else:
+ sort_artist = ""
+
await self.mass.database.update(
self.db_table,
{"item_id": item_id},
{
"name": item.name if overwrite else cur_item.name,
"sort_name": item.sort_name if overwrite else cur_item.sort_name,
+ "sort_artist": sort_artist,
"version": item.version if overwrite else cur_item.version,
"year": item.year or cur_item.year,
"upc": item.upc or cur_item.upc,
)
self.logger.debug("updated %s in database: %s", item.name, item_id)
db_item = await self.get_db_item(item_id)
- self.mass.signal_event(
- MassEvent(EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item)
- )
return db_item
async def delete_db_item(self, item_id: int) -> None:
import asyncio
import itertools
+from time import time
from typing import Any, Dict, List, Optional
from music_assistant.helpers.database import TABLE_ALBUMS, TABLE_ARTISTS, TABLE_TRACKS
from music_assistant.helpers.json import json_serializer
-from music_assistant.models.enums import EventType, ProviderType
-from music_assistant.models.event import MassEvent
+from music_assistant.models.enums import ProviderType
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
Album,
)
# insert item
+ if item.in_library and not item.timestamp:
+ item.timestamp = int(time())
new_item = await self.mass.database.insert(self.db_table, item.to_db_row())
item_id = new_item["item_id"]
self.logger.debug("added %s to database", item.name)
# return created object
db_item = await self.get_db_item(item_id)
- self.mass.signal_event(
- MassEvent(EventType.MEDIA_ITEM_ADDED, object_id=db_item.uri, data=db_item)
- )
return db_item
async def update_db_item(
)
self.logger.debug("updated %s in database: %s", item.name, item_id)
db_item = await self.get_db_item(item_id)
- self.mass.signal_event(
- MassEvent(EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item)
- )
return db_item
async def delete_db_item(self, item_id: int) -> None:
from music_assistant.helpers.database import TABLE_PLAYLISTS
from music_assistant.helpers.json import json_serializer
from music_assistant.helpers.uri import create_uri
-from music_assistant.models.enums import EventType, MediaType, ProviderType
+from music_assistant.models.enums import MediaType, ProviderType
from music_assistant.models.errors import InvalidDataError, MediaNotFoundError
-from music_assistant.models.event import MassEvent
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import Playlist, Track
async def add(self, item: Playlist, overwrite_existing: bool = False) -> Playlist:
"""Add playlist to local db and return the new database item."""
item.metadata.last_refresh = int(time())
- await self.mass.metadata.get_playlist_metadata(item, overwrite_existing)
- return await self.add_db_item(item)
+ await self.mass.metadata.get_playlist_metadata(item)
+ return await self.add_db_item(item, overwrite_existing)
async def add_playlist_tracks(self, db_playlist_id: str, uris: List[str]) -> None:
"""Add multiple tracks to playlist. Creates background tasks to process the action."""
# actually add the tracks to the playlist on the provider
provider = self.mass.music.get_provider(playlist_prov.prov_id)
await provider.add_playlist_tracks(playlist_prov.item_id, [track_id_to_add])
- # update local db entry
- self.mass.signal_event(
- MassEvent(
- type=EventType.MEDIA_ITEM_UPDATED,
- object_id=db_playlist_id,
- data=playlist,
- )
- )
async def remove_playlist_tracks(
self, db_playlist_id: str, positions: List[int]
if track_ids_to_remove:
provider = self.mass.music.get_provider(prov.prov_id)
await provider.remove_playlist_tracks(prov.item_id, track_ids_to_remove)
- self.mass.signal_event(
- MassEvent(
- type=EventType.MEDIA_ITEM_UPDATED,
- object_id=db_playlist_id,
- data=playlist,
- )
- )
async def add_db_item(
self, item: Playlist, overwrite_existing: bool = False
self.logger.debug("added %s to database", item.name)
# return created object
db_item = await self.get_db_item(item_id)
- self.mass.signal_event(
- MassEvent(EventType.MEDIA_ITEM_ADDED, object_id=db_item.uri, data=db_item)
- )
return db_item
async def update_db_item(
},
)
self.logger.debug("updated %s in database: %s", item.name, item_id)
- db_item = await self.get_db_item(item_id)
- self.mass.signal_event(
- MassEvent(EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item)
- )
- return db_item
+ return await self.get_db_item(item_id)
from music_assistant.helpers.database import TABLE_RADIOS
from music_assistant.helpers.json import json_serializer
-from music_assistant.models.enums import EventType, MediaType
-from music_assistant.models.event import MassEvent
+from music_assistant.models.enums import MediaType
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import Radio
self.logger.debug("added %s to database", item.name)
# return created object
db_item = await self.get_db_item(item_id)
- self.mass.signal_event(
- MassEvent(EventType.MEDIA_ITEM_ADDED, object_id=db_item.uri, data=db_item)
- )
return db_item
async def update_db_item(
)
self.logger.debug("updated %s in database: %s", item.name, item_id)
db_item = await self.get_db_item(item_id)
- self.mass.signal_event(
- MassEvent(EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item)
- )
return db_item
from music_assistant.helpers.compare import compare_artists, compare_track
from music_assistant.helpers.database import TABLE_TRACKS
from music_assistant.helpers.json import json_serializer
-from music_assistant.models.enums import EventType, MediaType, ProviderType
-from music_assistant.models.event import MassEvent
+from music_assistant.models.enums import MediaType, ProviderType
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
Album,
# no existing match found: insert new item
track_artists = await self._get_track_artists(item)
track_albums = await self._get_track_albums(item, overwrite=overwrite_existing)
+ if track_artists:
+ sort_artist = track_artists[0].sort_name
+ else:
+ sort_artist = ""
+ if track_albums:
+ sort_album = track_albums[0].sort_name
+ else:
+ sort_album = ""
new_item = await self.mass.database.insert(
self.db_table,
{
**item.to_db_row(),
"artists": json_serializer(track_artists),
"albums": json_serializer(track_albums),
+ "sort_artist": sort_artist,
+ "sort_album": sort_album,
},
)
item_id = new_item["item_id"]
# return created object
self.logger.debug("added %s to database: %s", item.name, item_id)
db_item = await self.get_db_item(item_id)
- self.mass.signal_event(
- MassEvent(EventType.MEDIA_ITEM_ADDED, object_id=db_item.uri, data=db_item)
- )
return db_item
async def update_db_item(
)
self.logger.debug("updated %s in database: %s", item.name, item_id)
db_item = await self.get_db_item(item_id)
- self.mass.signal_event(
- MassEvent(EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item)
- )
return db_item
async def _get_track_artists(
params = {"search": f"%{search}%"}
return await self._db.fetch_all(sql_query, params)
- async def get_row(self, table: str, match: Dict[str, Any] = None) -> Mapping | None:
+ async def get_row(self, table: str, match: Dict[str, Any]) -> Mapping | None:
"""Get single row for given table where column matches keys/values."""
- # async with Db(self.url, timeout=360) as db:
sql_query = f"SELECT * FROM {table} WHERE "
sql_query += " AND ".join((f"{x} = :{x}" for x in match))
return await self._db.fetch_one(sql_query, match)
item_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
sort_name TEXT NOT NULL,
+ sort_artist TEXT,
album_type TEXT,
year INTEGER,
version TEXT,
item_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
sort_name TEXT NOT NULL,
+ sort_artist TEXT,
+ sort_album TEXT,
version TEXT,
duration INTEGER,
in_library BOOLEAN DEFAULT 0,
async def get_embedded_image(file_path: str) -> bytes | None:
"""Return embedded image data."""
- args = ("ffmpeg", "-i", file_path, "-map", "0:v", "-c", "copy", "-f", "mjpeg", "-")
+ args = (
+ "ffmpeg",
+ "-hide_banner",
+ "-loglevel",
+ "fatal",
+ "-i",
+ file_path,
+ "-map",
+ "0:v",
+ "-c",
+ "copy",
+ "-f",
+ "mjpeg",
+ "-",
+ )
async with AsyncProcess(
args, enable_stdin=False, enable_stdout=True, enable_stderr=False
PLAYLIST = "playlist"
RADIO = "radio"
URL = "url"
+ FOLDER = "folder"
UNKNOWN = "unknown"
QUEUE_ITEMS_UPDATED = "queue_items_updated"
QUEUE_TIME_UPDATED = "queue_time_updated"
SHUTDOWN = "application_shutdown"
- MEDIA_ITEM_ADDED = "media_item_added"
- MEDIA_ITEM_UPDATED = "media_item_updated"
BACKGROUND_JOB_UPDATED = "background_job_updated"
from typing import TYPE_CHECKING, Generic, List, Optional, Tuple, TypeVar
from music_assistant.models.errors import MediaNotFoundError
-from music_assistant.models.event import MassEvent
-from .enums import EventType, MediaType, ProviderType
+from .enums import MediaType, ProviderType
from .media_items import MediaItemType, media_from_dict
if TYPE_CHECKING:
"""Update record in the database, merging data."""
raise NotImplementedError
- async def library(self, limit: int = 500, offset: int = 0) -> List[ItemCls]:
- """Get all in-library items."""
- match = {"in_library": True}
- return await self.get_db_items(match=match, limit=limit, offset=offset)
+ async def db_items(
+ self,
+ in_library: Optional[bool] = None,
+ search: Optional[str] = None,
+ limit: int = 500,
+ offset: int = 0,
+ order_by: str = "sort_name",
+ ) -> List[ItemCls]:
+ """Get in-database items."""
+ sql_query = f"SELECT * FROM {self.db_table}"
+ params = {}
+ query_parts = []
+ if search:
+ params["search"] = f"%{search}%"
+ query_parts.append("name LIKE :search")
+ if in_library is not None:
+ query_parts.append("in_library = :in_library")
+ params["in_library"] = in_library
+ if query_parts:
+ sql_query += " WHERE " + " AND ".join(query_parts)
+ sql_query += f" ORDER BY {order_by}"
+ return await self.get_db_items_by_query(
+ sql_query, params, limit=limit, offset=offset
+ )
- async def count(self, in_library: bool = False) -> int:
+ async def count(self, in_library: Optional[bool] = None) -> int:
"""Return number of in-library items for this MediaType."""
- return await self.mass.database.get_count(
- self.db_table, {"in_library": in_library}
- )
+ if in_library is not None:
+ match = {"in_library": in_library}
+ else:
+ match = None
+ return await self.mass.database.get_count(self.db_table, match)
async def get(
self,
if not db_item.in_library:
db_item.in_library = True
await self.set_db_library(db_item.item_id, True)
- self.mass.signal_event(
- MassEvent(
- EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item
- )
- )
async def remove_from_library(
self,
if db_item.in_library:
db_item.in_library = False
await self.set_db_library(db_item.item_id, False)
- self.mass.signal_event(
- MassEvent(
- EventType.MEDIA_ITEM_UPDATED, object_id=db_item.uri, data=db_item
- )
- )
async def get_provider_id(self, item: ItemCls) -> Tuple[str, str]:
"""Return provider and item id."""
return (prov.prov_id, prov.item_id)
return None, None
- async def get_db_items(
+ async def get_db_items_by_query(
self,
- query: Optional[str] = None,
+ custom_query: Optional[str] = None,
query_params: Optional[dict] = None,
- match: Optional[dict] = None,
limit: int = 500,
offset: int = 0,
) -> List[ItemCls]:
- """Fetch all records from database."""
- assert not (query and match), "query and match are mutually exclusive"
- if query is not None:
- func = self.mass.database.get_rows_from_query(
- query, query_params, limit=limit, offset=offset
- )
- else:
- func = self.mass.database.get_rows(
- self.db_table, match, limit=limit, offset=offset
+ """Fetch MediaItem records from database given a custom query."""
+ return [
+ self.item_cls.from_db_row(db_row)
+ for db_row in await self.mass.database.get_rows_from_query(
+ custom_query, query_params, limit=limit, offset=offset
)
- return [self.item_cls.from_db_row(db_row) for db_row in await func]
+ ]
async def get_db_item(self, item_id: int) -> ItemCls:
"""Get record by id."""
if isinstance(provider, str):
provider = ProviderType(provider)
if provider == ProviderType.DATABASE or provider_id == "database":
- return await self.get_db_items(limit=limit, offset=offset)
+ return await self.get_db_items_by_query(limit=limit, offset=offset)
query = f"SELECT * FROM {self.db_table}, json_each(provider_ids)"
if provider_id is not None:
prov_ids = prov_ids.replace(",)", ")")
query += f" AND json_extract(json_each.value, '$.item_id') in {prov_ids}"
- return await self.get_db_items(query, limit=limit, offset=offset)
+ return await self.get_db_items_by_query(query, limit=limit, offset=offset)
async def set_db_library(self, item_id: int, in_library: bool) -> None:
"""Set the in-library bool on a database item."""
match = {"item_id": item_id}
+ timestamp = int(time()) if in_library else 0
await self.mass.database.update(
- self.db_table, match, {"in_library": in_library}
+ self.db_table, match, {"in_library": in_library, "timestamp": timestamp}
)
async def get_provider_item(
return hash((self.provider, self.item_id))
-MediaItemType = Union[Artist, Album, Track, Radio, Playlist]
+@dataclass
+class BrowseFolder(MediaItem):
+ """Representation of a Folder used in Browse (which contains media items)."""
+
+ media_type: MediaType = MediaType.FOLDER
+ # label: a labelid that needs to be translated by the frontend
+ label: str = ""
+ # items (max 25) to provide in recommendation listings
+ items: Optional[List[MediaItemType]] = None
+
+
+MediaItemType = Union[Artist, Album, Track, Radio, Playlist, BrowseFolder]
def media_from_dict(media_item: dict) -> MediaItemType:
from music_assistant.models.media_items import (
Album,
Artist,
+ BrowseFolder,
MediaItemType,
Playlist,
Radio,
_attr_name: str = None
_attr_type: ProviderType = None
_attr_available: bool = True
+ _attr_supports_browse: bool = True
_attr_supported_mediatypes: List[MediaType] = []
def __init__(self, mass: MusicAssistant, config: MusicProviderConfig) -> None:
"""Return boolean if this provider is available/initialized."""
return self._attr_available
+ @property
+ def supports_browse(self) -> bool:
+ """Return boolean if this provider supports browsing."""
+ return self._attr_supports_browse
+
@property
def supported_mediatypes(self) -> List[MediaType]:
"""Return MediaTypes the provider supports."""
if media_type == MediaType.RADIO:
return await self.get_radio(prov_item_id)
+ async def browse(self, path: Optional[str] = None) -> List[MediaItemType]:
+ """
+ Browse this provider's items.
+
+ :param path: The path to browse, (e.g. artists) or None for root level.
+ """
+ # this reference implementation can be overridden with provider specific approach
+ if not path:
+ # return main listing
+ root_items = []
+ if MediaType.ARTIST in self.supported_mediatypes:
+ root_items.append(
+ BrowseFolder(
+ item_id="artists",
+ provider=self.type,
+ name="",
+ label="artists",
+ uri=f"{self.type.value}://artists",
+ )
+ )
+ if MediaType.ALBUM in self.supported_mediatypes:
+ root_items.append(
+ BrowseFolder(
+ item_id="albums",
+ provider=self.type,
+ name="",
+ label="albums",
+ uri=f"{self.type.value}://albums",
+ )
+ )
+ if MediaType.TRACK in self.supported_mediatypes:
+ root_items.append(
+ BrowseFolder(
+ item_id="tracks",
+ provider=self.type,
+ name="",
+ label="tracks",
+ uri=f"{self.type.value}://tracks",
+ )
+ )
+ if MediaType.PLAYLIST in self.supported_mediatypes:
+ root_items.append(
+ BrowseFolder(
+ item_id="playlists",
+ provider=self.type,
+ name="",
+ label="playlists",
+ uri=f"{self.type.value}://playlists",
+ )
+ )
+ if MediaType.RADIO in self.supported_mediatypes:
+ root_items.append(
+ BrowseFolder(
+ item_id="radios",
+ provider=self.type,
+ name="",
+ label="radios",
+ uri=f"{self.type.value}://radios",
+ )
+ )
+ return root_items
+ # sublevel
+ if path == "artists":
+ return [x async for x in self.get_library_artists()]
+ if path == "albums":
+ return [x async for x in self.get_library_albums()]
+ if path == "tracks":
+ return [x async for x in self.get_library_tracks()]
+ if path == "radios":
+ return [x async for x in self.get_library_radios()]
+ if path == "playlists":
+ return [x async for x in self.get_library_playlists()]
+
+ @abstractmethod
+ async def recommendations(self) -> List[BrowseFolder]:
+ """
+ Get this provider's recommendations.
+
+ Returns a list of BrowseFolder items with (max 25) mediaitems in the items attribute.
+ """
+ return []
+
async def sync_library(
self, media_types: Optional[Tuple[MediaType]] = None
) -> None:
# Bottomline this means that we don't do a full 2 way sync if multiple
# providers are attached to the same media item.
prev_ids = set()
- for db_item in await controller.library():
+ for db_item in await controller.db_items(True):
prov_types = {x.prov_type for x in db_item.provider_ids}
if len(prov_types) > 1:
continue
Album,
AlbumType,
Artist,
+ BrowseFolder,
ContentType,
ImageType,
ItemMapping,
SCHEMA_VERSION = 17
LOGGER = logging.getLogger(__name__)
+listdir = wrap(os.listdir)
+isdir = wrap(os.path.isdir)
+isfile = wrap(os.path.isfile)
+
async def scantree(path: str) -> AsyncGenerator[os.DirEntry, None]:
"""Recursively yield DirEntry objects for given directory."""
async def setup(self) -> bool:
"""Handle async initialization of the provider."""
- isdir = wrap(os.path.exists)
-
if not await isdir(self.config.path):
raise MediaNotFoundError(
f"Music Directory {self.config.path} does not exist"
params = {"name": f"%{search_query}%", "prov_type": f"%{self.type.value}%"}
if media_types is None or MediaType.TRACK in media_types:
query = "SELECT * FROM tracks WHERE name LIKE :name AND provider_ids LIKE :prov_type"
- tracks = await self.mass.music.tracks.get_db_items(query, params)
+ tracks = await self.mass.music.tracks.get_db_items_by_query(query, params)
result += tracks
if media_types is None or MediaType.ALBUM in media_types:
query = "SELECT * FROM albums WHERE name LIKE :name AND provider_ids LIKE :prov_type"
- albums = await self.mass.music.albums.get_db_items(query, params)
+ albums = await self.mass.music.albums.get_db_items_by_query(query, params)
result += albums
if media_types is None or MediaType.ARTIST in media_types:
query = "SELECT * FROM artists WHERE name LIKE :name AND provider_ids LIKE :prov_type"
- artists = await self.mass.music.artists.get_db_items(query, params)
+ artists = await self.mass.music.artists.get_db_items_by_query(query, params)
result += artists
if media_types is None or MediaType.PLAYLIST in media_types:
query = "SELECT * FROM playlists WHERE name LIKE :name AND provider_ids LIKE :prov_type"
- playlists = await self.mass.music.playlists.get_db_items(query, params)
+ playlists = await self.mass.music.playlists.get_db_items_by_query(
+ query, params
+ )
result += playlists
return result
+ async def browse(self, path: Optional[str] = None) -> List[MediaItemType]:
+ """
+ Browse this provider's items.
+
+ :param path: The path to browse, (e.g. artists) or None for root level.
+ """
+ if not path:
+ path = self.config.path
+ else:
+ path = os.path.join(self.config.path, path)
+ result = []
+ for filename in await listdir(path):
+ full_path: str = os.path.join(path, filename)
+ rel_path = full_path.replace(self.config.path + os.sep, "")
+ if await isdir(full_path):
+ result.append(
+ BrowseFolder(
+ item_id=rel_path,
+ provider=self.type,
+ name=filename,
+ uri=f"{self.type.value}://{rel_path}",
+ )
+ )
+ elif track := await self._parse_track(full_path):
+ result.append(track)
+ return result
+
async def sync_library(
self, media_types: Optional[Tuple[MediaType]] = None
) -> None:
query = f"SELECT * FROM tracks WHERE albums LIKE '%\"{db_album.item_id}\"%'"
query += f" AND provider_ids LIKE '%\"{self.type.value}\"%'"
result = []
- for track in await self.mass.music.tracks.get_db_items(query):
+ for track in await self.mass.music.tracks.get_db_items_by_query(query):
track.album = db_album
album_mapping = next(
(x for x in track.albums if x.item_id == db_album.item_id), None
# TODO: adjust to json query instead of text search
query = f"SELECT * FROM albums WHERE artists LIKE '%\"{db_artist.item_id}\"%'"
query += f" AND provider_ids LIKE '%\"{self.type.value}\"%'"
- return await self.mass.music.albums.get_db_items(query)
+ return await self.mass.music.albums.get_db_items_by_query(query)
async def get_artist_toptracks(self, prov_artist_id: str) -> List[Track]:
"""Get a list of all tracks as we have no clue about preference."""
# TODO: adjust to json query instead of text search
query = f"SELECT * FROM tracks WHERE artists LIKE '%\"{db_artist.item_id}\"%'"
query += f" AND provider_ids LIKE '%\"{self.type.value}\"%'"
- return await self.mass.music.tracks.get_db_items(query)
+ return await self.mass.music.tracks.get_db_items_by_query(query)
async def library_add(self, *args, **kwargs) -> bool:
"""Add item to provider's library. Return true on succes."""
_attr_type = ProviderType.TUNEIN
_attr_name = "Tune-in Radio"
+ _attr_supports_browse: bool = False
_attr_supported_mediatypes = [MediaType.RADIO]
_throttler = Throttler(rate_limit=1, period=1)
_attr_name: str = "URL"
_attr_type: ProviderType = ProviderType.URL
_attr_available: bool = True
+ _attr_supports_browse: bool = False
_attr_supported_mediatypes: List[MediaType] = []
async def setup(self) -> bool: