def parse_uri(uri: str) -> tuple[MediaType, str, str]:
"""Try to parse URI to Mass identifiers.
- Returns Tuple: MediaType, provider_domain, item_id
+ Returns Tuple: MediaType, provider_domain_or_instance_id, item_id
"""
try:
if uri.startswith("https://open."):
# public share URL (e.g. Spotify or Qobuz, not sure about others)
# https://open.spotify.com/playlist/5lH9NjOeJvctAO92ZrKQNB?si=04a63c8234ac413e
- provider_domain = uri.split(".")[1]
+ provider_domain_or_instance_id = uri.split(".")[1]
media_type_str = uri.split("/")[3]
media_type = MediaType(media_type_str)
item_id = uri.split("/")[4].split("?")[0]
elif uri.startswith("http://") or uri.startswith("https://"):
# Translate a plain URL to the URL provider
- provider_domain = "url"
+ provider_domain_or_instance_id = "url"
media_type = MediaType.UNKNOWN
item_id = uri
elif "://" in uri:
# music assistant-style uri
# provider://media_type/item_id
- provider_domain = uri.split("://")[0]
+ provider_domain_or_instance_id = uri.split("://")[0]
media_type_str = uri.split("/")[2]
media_type = MediaType(media_type_str)
item_id = uri.split(f"{media_type_str}/")[1]
elif ":" in uri:
# spotify new-style uri
- provider_domain, media_type_str, item_id = uri.split(":")
+ provider_domain_or_instance_id, media_type_str, item_id = uri.split(":")
media_type = MediaType(media_type_str)
elif os.path.isfile(uri):
# Translate a local file (which is not from file provider) to the URL provider
- provider_domain = "url"
+ provider_domain_or_instance_id = "url"
media_type = MediaType.TRACK
item_id = uri
else:
raise KeyError
except (TypeError, AttributeError, ValueError, KeyError) as err:
raise MusicAssistantError(f"Not a valid Music Assistant uri: {uri}") from err
- return (media_type, provider_domain, item_id)
+ return (media_type, provider_domain_or_instance_id, item_id)
-def create_uri(media_type: MediaType, provider_domain: str, item_id: str) -> str:
+def create_uri(media_type: MediaType, provider_domain_or_instance_id: str, item_id: str) -> str:
"""Create Music Assistant URI from MediaItem values."""
- return f"{provider_domain}://{media_type.value}/{item_id}"
+ return f"{provider_domain_or_instance_id}://{media_type.value}/{item_id}"
total: int | None = None
+@dataclass
+class SearchResults(DataClassDictMixin):
+ """Model for results from a search query."""
+
+ artists: list[Artist | ItemMapping] = field(default_factory=list)
+ albums: list[Album | ItemMapping] = field(default_factory=list)
+ tracks: list[Track | ItemMapping] = field(default_factory=list)
+ playlists: list[Playlist | ItemMapping] = field(default_factory=list)
+ radio: list[Radio | ItemMapping] = field(default_factory=list)
+
+
def media_from_dict(media_item: dict) -> MediaItemType:
"""Return MediaItem from dict."""
if media_item["media_type"] == "artist":
if cache := await self.mass.cache.get(cache_key):
return [media_from_dict(x) for x in cache]
# no items in cache - get listing from provider
- items = await prov.search(
+ searchresult = await prov.search(
search_query,
[self.media_type],
limit,
)
+ if self.media_type == MediaType.ARTIST:
+ items = searchresult.artists
+ elif self.media_type == MediaType.ALBUM:
+ items = searchresult.albums
+ elif self.media_type == MediaType.TRACK:
+ items = searchresult.tracks
+ elif self.media_type == MediaType.PLAYLIST:
+ items = searchresult.playlists
+ else:
+ items = searchresult.radio
# store (serializable items) in cache
if not prov.domain.startswith("filesystem"): # do not cache filesystem results
self.mass.create_task(
# the file provider can handle uri's from all providers so simply add the uri
track_id_to_add = track_version.url or create_uri(
MediaType.TRACK,
- track_version.provider_domain,
+ track_version.provider_instance,
track_version.item_id,
)
break
from __future__ import annotations
import asyncio
-import itertools
import logging
import statistics
+from itertools import zip_longest
from typing import TYPE_CHECKING
from music_assistant.common.helpers.datetime import utc_timestamp
BrowseFolder,
MediaItem,
MediaItemType,
- media_from_dict,
+ SearchResults,
)
from music_assistant.common.models.provider import SyncTask
from music_assistant.constants import (
search_query: str,
media_types: list[MediaType] = MediaType.ALL,
limit: int = 10,
- ) -> list[MediaItemType]:
+ ) -> SearchResults:
"""Perform global search for media items on all providers.
:param search_query: Search query.
"""
# include results from all music providers
provider_instances = (item.instance_id for item in self.providers)
- # TODO: sort by name and filter out duplicates ?
- return list(
- itertools.chain.from_iterable(
- await asyncio.gather(
- *[
- self.search_provider(
- search_query,
- media_types,
- provider_instance=provider_instance,
- limit=limit,
- )
- for provider_instance in provider_instances
- ]
+ results_per_provider: list[SearchResults] = await asyncio.gather(
+ *[
+ self.search_provider(
+ search_query,
+ media_types,
+ provider_instance=provider_instance,
+ limit=limit,
)
- )
+ for provider_instance in provider_instances
+ ]
+ )
+ # return result from all providers while keeping index
+ # so the result is sorted as each provider delivered
+ return SearchResults(
+ artists=[
+ item
+ for sublist in zip_longest(*[x.artists for x in results_per_provider])
+ for item in sublist
+ if item is not None
+ ],
+ albums=[
+ item
+ for sublist in zip_longest(*[x.albums for x in results_per_provider])
+ for item in sublist
+ if item is not None
+ ],
+ tracks=[
+ item
+ for sublist in zip_longest(*[x.tracks for x in results_per_provider])
+ for item in sublist
+ if item is not None
+ ],
+ playlists=[
+ item
+ for sublist in zip_longest(*[x.playlists for x in results_per_provider])
+ for item in sublist
+ if item is not None
+ ],
+ radio=[
+ item
+ for sublist in zip_longest(*[x.radio for x in results_per_provider])
+ for item in sublist
+ if item is not None
+ ],
)
async def search_provider(
provider_domain: str | None = None,
provider_instance: str | None = None,
limit: int = 10,
- ) -> list[MediaItemType]:
+ ) -> SearchResults:
"""Perform search on given provider.
:param search_query: Search query
try:
prov = self.mass.get_provider(provider_instance or provider_domain)
except ProviderUnavailableError:
- return []
+ return SearchResults()
if ProviderFeature.SEARCH not in prov.supported_features:
- return []
+ return SearchResults()
# create safe search string
search_query = search_query.replace("/", " ").replace("'", "")
# prefer cache items (if any)
- cache_key = f"{prov.instance_id}.search.{search_query}.{limit}"
+ media_types_str = ",".join(media_types)
+ cache_key = f"{prov.instance_id}.search.{search_query}.{limit}.{media_types_str}"
cache_key += "".join(x for x in media_types)
if cache := await self.mass.cache.get(cache_key):
- return [media_from_dict(x) for x in cache]
+ return SearchResults.from_dict(cache)
# no items in cache - get listing from provider
- items = await prov.search(
+ result = await prov.search(
search_query,
media_types,
limit,
)
# store (serializable items) in cache
self.mass.create_task(
- self.mass.cache.set(cache_key, [x.to_dict() for x in items], expiration=86400 * 7)
+ self.mass.cache.set(cache_key, result.to_dict(), expiration=86400 * 7)
)
- return items
+ return result
@api_command("music/browse")
async def browse(self, path: str | None = None) -> BrowseFolder:
item_id="root",
provider=prov.domain,
path=f"{prov.instance_id}://",
+ uri=f"{prov.instance_id}://",
name=prov.name,
)
for prov in self.providers
self, uri: str, force_refresh: bool = False, lazy: bool = True
) -> MediaItemType:
"""Fetch MediaItem by uri."""
- media_type, provider_domain, item_id = parse_uri(uri)
+ media_type, provider_domain_or_instance_id, item_id = parse_uri(uri)
+ for prov in self.providers:
+ if prov.instance_id == provider_domain_or_instance_id:
+ provider_instance = prov.instance_id
+ provider_domain = None
+ break
+ else:
+ provider_instance = None
+ provider_domain = provider_domain_or_instance_id
+
return await self.get_item(
media_type=media_type,
item_id=item_id,
provider_domain=provider_domain,
+ provider_instance=provider_instance,
force_refresh=force_refresh,
lazy=lazy,
)
except MusicAssistantError:
pass
- for item in await self.search(media_item.name, [media_item.media_type], 20):
+ searchresult = await self.search(media_item.name, [media_item.media_type], 20)
+ if media_item.media_type == MediaType.ARTIST:
+ result = searchresult.artists
+ elif media_item.media_type == MediaType.ALBUM:
+ result = searchresult.albums
+ elif media_item.media_type == MediaType.TRACK:
+ result = searchresult.tracks
+ elif media_item.media_type == MediaType.PLAYLIST:
+ result = searchresult.playlists
+ else:
+ result = searchresult.radio
+ for item in result:
if item.available:
await self.get_item(item.media_type, item.item_id, item.provider, lazy=False)
return None
MediaItemType,
Playlist,
Radio,
+ SearchResults,
StreamDetails,
Track,
)
search_query: str,
media_types: list[MediaType] | None = None,
limit: int = 5,
- ) -> list[MediaItemType]:
+ ) -> SearchResults:
"""Perform search on musicprovider.
:param search_query: Search query.
"""
if ProviderFeature.SEARCH in self.supported_features:
raise NotImplementedError
- return []
+ return SearchResults()
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve library artists from the provider."""
ContentType,
ImageType,
MediaItemImage,
- MediaItemType,
MediaType,
Playlist,
ProviderMapping,
Radio,
+ SearchResults,
StreamDetails,
Track,
)
async def search(
self, search_query: str, media_types=list[MediaType] | None, limit: int = 5 # noqa: ARG002
- ) -> list[MediaItemType]:
+ ) -> SearchResults:
"""Perform search on this file based musicprovider."""
- result: list[MediaItemType] = []
+ result = SearchResults()
# searching the filesystem is slow and unreliable,
# instead we make some (slow) freaking queries to the db ;-)
params = {
# ruff: noqa: E501
if media_types is None or MediaType.TRACK in media_types:
query = "SELECT * FROM tracks WHERE name LIKE :name AND provider_mappings LIKE :provider_instance"
- tracks = await self.mass.music.tracks.get_db_items_by_query(query, params)
- result += tracks
+ result.tracks = await self.mass.music.tracks.get_db_items_by_query(query, params)
if media_types is None or MediaType.ALBUM in media_types:
query = "SELECT * FROM albums WHERE name LIKE :name AND provider_mappings LIKE :provider_instance"
- albums = await self.mass.music.albums.get_db_items_by_query(query, params)
- result += albums
+ result.albums = await self.mass.music.albums.get_db_items_by_query(query, params)
if media_types is None or MediaType.ARTIST in media_types:
query = "SELECT * FROM artists WHERE name LIKE :name AND provider_mappings LIKE :provider_instance"
- artists = await self.mass.music.artists.get_db_items_by_query(query, params)
- result += artists
+ result.artists = await self.mass.music.artists.get_db_items_by_query(query, params)
if media_types is None or MediaType.PLAYLIST in media_types:
query = "SELECT * FROM playlists WHERE name LIKE :name AND provider_mappings LIKE :provider_instance"
- playlists = await self.mass.music.playlists.get_db_items_by_query(query, params)
- result += playlists
+ result.playlists = await self.mass.music.playlists.get_db_items_by_query(query, params)
return result
async def browse(self, path: str) -> BrowseFolder:
else:
self.mass.create_task(self.mass.players.queues.seek, number)
+ def _handle_power(self, player_id: str, value: str | int) -> int | None:
+ """Handle player `time` command."""
+ # <playerid> power <0|1|?|>
+ # The "power" command turns the player on or off.
+ # Use 0 to turn off, 1 to turn on, ? to query and
+ # no parameter to toggle the power state of the player.
+ player = self.mass.players.get(player_id)
+ assert player is not None
+
+ if value == "?":
+ return int(player.powered)
+
+ self.mass.create_task(self.mass.players.cmd_power, player_id, bool(value))
+
def _handle_playlist(
self,
player_id: str,
ContentType,
ImageType,
MediaItemImage,
- MediaItemType,
MediaType,
Playlist,
ProviderMapping,
+ SearchResults,
StreamDetails,
Track,
)
async def search(
self, search_query: str, media_types=list[MediaType] | None, limit: int = 5
- ) -> list[MediaItemType]:
+ ) -> SearchResults:
"""Perform search on musicprovider.
:param search_query: Search query.
:param media_types: A list of media_types to include. All types if None.
:param limit: Number of items to return in the search (per type).
"""
- result = []
+ result = SearchResults()
params = {"query": search_query, "limit": limit}
if len(media_types) == 1:
# qobuz does not support multiple searchtypes, falls back to all if no type given
params["type"] = "playlists"
if searchresult := await self._get_data("catalog/search", **params):
if "artists" in searchresult:
- result += [
+ result.artists += [
await self._parse_artist(item)
for item in searchresult["artists"]["items"]
if (item and item["id"])
]
if "albums" in searchresult:
- result += [
+ result.albums += [
await self._parse_album(item)
for item in searchresult["albums"]["items"]
if (item and item["id"])
]
if "tracks" in searchresult:
- result += [
+ result.tracks += [
await self._parse_track(item)
for item in searchresult["tracks"]["items"]
if (item and item["id"])
]
if "playlists" in searchresult:
- result += [
+ result.playlists += [
await self._parse_playlist(item)
for item in searchresult["playlists"]["items"]
if (item and item["id"])
ContentType,
ImageType,
MediaItemImage,
- MediaItemType,
MediaType,
Playlist,
ProviderMapping,
+ SearchResults,
StreamDetails,
Track,
)
async def search(
self, search_query: str, media_types=list[MediaType] | None, limit: int = 5
- ) -> list[MediaItemType]:
+ ) -> SearchResults:
"""Perform search on musicprovider.
:param search_query: Search query.
:param media_types: A list of media_types to include. All types if None.
:param limit: Number of items to return in the search (per type).
"""
- result = []
+ result = SearchResults()
searchtypes = []
if MediaType.ARTIST in media_types:
searchtypes.append("artist")
"search", q=search_query, type=searchtype, limit=limit
):
if "artists" in searchresult:
- result += [
+ result.artists += [
await self._parse_artist(item)
for item in searchresult["artists"]["items"]
if (item and item["id"])
]
if "albums" in searchresult:
- result += [
+ result.albums += [
await self._parse_album(item)
for item in searchresult["albums"]["items"]
if (item and item["id"])
]
if "tracks" in searchresult:
- result += [
+ result.tracks += [
await self._parse_track(item)
for item in searchresult["tracks"]["items"]
if (item and item["id"])
]
if "playlists" in searchresult:
- result += [
+ result.playlists += [
await self._parse_playlist(item)
for item in searchresult["playlists"]["items"]
if (item and item["id"])
ContentType,
ImageType,
MediaItemImage,
- MediaItemType,
MediaType,
Playlist,
ProviderMapping,
+ SearchResults,
StreamDetails,
Track,
)
async def search(
self, search_query: str, media_types=list[MediaType] | None, limit: int = 5
- ) -> list[MediaItemType]:
+ ) -> SearchResults:
"""Perform search on musicprovider.
:param search_query: Search query.
if media_types[0] == MediaType.PLAYLIST:
ytm_filter = "playlists"
results = await search(query=search_query, ytm_filter=ytm_filter, limit=limit)
- parsed_results = []
+ parsed_results = SearchResults()
for result in results:
try:
if result["resultType"] == "artist":
- parsed_results.append(await self._parse_artist(result))
+ parsed_results.artists.append(await self._parse_artist(result))
elif result["resultType"] == "album":
- parsed_results.append(await self._parse_album(result))
+ parsed_results.albums.append(await self._parse_album(result))
elif result["resultType"] == "playlist":
- parsed_results.append(await self._parse_playlist(result))
+ parsed_results.playlists.append(await self._parse_playlist(result))
elif result["resultType"] == "song" and (track := await self._parse_track(result)):
- parsed_results.append(track)
+ parsed_results.tracks.append(track)
except InvalidDataError:
pass # ignore invalid item
return parsed_results