from music_assistant.common.models.media_items import (
Album,
Artist,
- BrowseFolder,
MediaItemType,
PagedItems,
Playlist,
async def browse(
self,
path: str | None = None,
- ) -> BrowseFolder:
+ ) -> list[MediaItemType]:
"""Browse Music providers."""
- return BrowseFolder.from_dict(
- await self.client.send_command("music/browse", path=path),
- )
+ return [
+ media_from_dict(item)
+ for item in await self.client.send_command("music/browse", path=path)
+ ]
async def search(
self, search_query: str, media_types: tuple[MediaType] = MediaType.ALL, limit: int = 25
path: str = ""
# label: a labelid that needs to be translated by the frontend
label: str = ""
- # subitems of this folder when expanding
- items: list[MediaItemType | BrowseFolder] | None = None
provider_mappings: set[ProviderMapping] = field(default_factory=set)
def __post_init__(self):
def media_from_dict(media_item: dict) -> MediaItemType:
"""Return MediaItem from dict."""
+ if "provider_mappings" not in media_item:
+ return ItemMapping.from_dict(media_item)
if media_item["media_type"] == "artist":
return Artist.from_dict(media_item)
if media_item["media_type"] == "album":
import os
import shutil
import statistics
+from collections.abc import AsyncGenerator
from contextlib import suppress
from itertools import zip_longest
from typing import TYPE_CHECKING
return result
@api_command("music/browse")
- async def browse(self, path: str | None = None) -> BrowseFolder:
+ async def browse(self, path: str | None = None) -> AsyncGenerator[MediaItemType, None]:
"""Browse Music providers."""
- # root level; folder per provider
if not path or path == "root":
- return BrowseFolder(
- item_id="root",
- provider="library",
- path="root",
- label="browse",
- name="",
- items=[
- BrowseFolder(
- item_id="root",
- provider=prov.domain,
- path=f"{prov.instance_id}://",
- uri=f"{prov.instance_id}://",
- name=prov.name,
- )
- for prov in self.providers
- if ProviderFeature.BROWSE in prov.supported_features
- ],
- )
+ # root level; folder per provider
+ for prov in self.providers:
+ if ProviderFeature.BROWSE not in prov.supported_features:
+ continue
+ yield BrowseFolder(
+ item_id="root",
+ provider=prov.domain,
+ path=f"{prov.instance_id}://",
+ uri=f"{prov.instance_id}://",
+ name=prov.name,
+ )
+ return
+
# provider level
- provider_instance = path.split("://", 1)[0]
+ provider_instance, sub_path = path.split("://", 1)
prov = self.mass.get_provider(provider_instance)
- return await prov.browse(path)
+ # handle regular provider listing, always add back folder first
+ if not prov or not sub_path:
+ yield BrowseFolder(item_id="root", provider="library", path="root", name="..")
+ else:
+ back_path = f"{provider_instance}://" + "/".join(sub_path.split("/")[:-1])
+ yield BrowseFolder(
+ item_id="back", provider=provider_instance, path=back_path, name=".."
+ )
+ async for item in prov.browse(path):
+ yield item
@api_command("music/recently_played_items")
async def recently_played(
return await self.get_radio(prov_item_id)
return await self.get_track(prov_item_id)
- async def browse(self, path: str) -> BrowseFolder:
+ async def browse(self, path: str) -> AsyncGenerator[MediaItemType, None]:
"""Browse this provider's items.
- :param path: The path to browse, (e.g. provid://artists).
+ :param path: The path to browse, (e.g. provider_id://artists).
"""
if ProviderFeature.BROWSE not in self.supported_features:
# we may NOT use the default implementation if the provider does not support browse
raise NotImplementedError
- _, subpath = path.split("://")
-
+ subpath = path.split("://", 1)[1]
# this reference implementation can be overridden with a provider specific approach
- if not subpath:
- # return main listing
- root_items: list[BrowseFolder] = []
- if ProviderFeature.LIBRARY_ARTISTS in self.supported_features:
- root_items.append(
- BrowseFolder(
- item_id="artists",
- provider=self.domain,
- path=path + "artists",
- name="",
- label="artists",
- )
- )
- if ProviderFeature.LIBRARY_ALBUMS in self.supported_features:
- root_items.append(
- BrowseFolder(
- item_id="albums",
- provider=self.domain,
- path=path + "albums",
- name="",
- label="albums",
- )
- )
- if ProviderFeature.LIBRARY_TRACKS in self.supported_features:
- root_items.append(
- BrowseFolder(
- item_id="tracks",
- provider=self.domain,
- path=path + "tracks",
- name="",
- label="tracks",
- )
- )
- if ProviderFeature.LIBRARY_PLAYLISTS in self.supported_features:
- root_items.append(
- BrowseFolder(
- item_id="playlists",
- provider=self.domain,
- path=path + "playlists",
- name="",
- label="playlists",
- )
- )
- if ProviderFeature.LIBRARY_RADIOS in self.supported_features:
- root_items.append(
- BrowseFolder(
- item_id="radios",
- provider=self.domain,
- path=path + "radios",
- name="",
- label="radios",
- )
- )
- return BrowseFolder(
- item_id="root",
- provider=self.domain,
- path=path,
- name=self.name,
- items=root_items,
- )
- # sublevel
if subpath == "artists":
- return BrowseFolder(
+ async for artist in self.get_library_artists():
+ yield artist
+ return
+ if subpath == "albums":
+ async for album in self.get_library_albums():
+ yield album
+ return
+ if subpath == "tracks":
+ async for track in self.get_library_tracks():
+ yield track
+ return
+ if subpath == "radios":
+ async for radio in self.get_library_radios():
+ yield radio
+ return
+ if subpath == "playlists":
+ async for playlist in self.get_library_playlists():
+ yield playlist
+ return
+ if subpath:
+ # unknown path
+ raise KeyError("Invalid subpath")
+ # no subpath: return main listing
+ if ProviderFeature.LIBRARY_ARTISTS in self.supported_features:
+ yield BrowseFolder(
item_id="artists",
provider=self.domain,
- path=path,
+ path=path + "artists",
name="",
label="artists",
- items=[x async for x in self.get_library_artists()],
)
- if subpath == "albums":
- return BrowseFolder(
+ if ProviderFeature.LIBRARY_ALBUMS in self.supported_features:
+ yield BrowseFolder(
item_id="albums",
provider=self.domain,
- path=path,
+ path=path + "albums",
name="",
label="albums",
- items=[x async for x in self.get_library_albums()],
)
- if subpath == "tracks":
- return BrowseFolder(
+ if ProviderFeature.LIBRARY_TRACKS in self.supported_features:
+ yield BrowseFolder(
item_id="tracks",
provider=self.domain,
- path=path,
+ path=path + "tracks",
name="",
label="tracks",
- items=[x async for x in self.get_library_tracks()],
)
- if subpath == "radios":
- return BrowseFolder(
- item_id="radios",
+ if ProviderFeature.LIBRARY_PLAYLISTS in self.supported_features:
+ yield BrowseFolder(
+ item_id="playlists",
provider=self.domain,
- path=path,
+ path=path + "playlists",
name="",
- label="radios",
- items=[x async for x in self.get_library_radios()],
+ label="playlists",
)
- if subpath == "playlists":
- return BrowseFolder(
- item_id="playlists",
+ if ProviderFeature.LIBRARY_RADIOS in self.supported_features:
+ yield BrowseFolder(
+ item_id="radios",
provider=self.domain,
- path=path,
+ path=path + "radios",
name="",
- label="playlists",
- items=[x async for x in self.get_library_playlists()],
+ label="radios",
)
- raise KeyError("Invalid subpath")
- async def recommendations(self) -> list[BrowseFolder]:
+ async def recommendations(self) -> list[MediaItemType]:
"""Get this provider's recommendations.
- Returns a list of BrowseFolder items with (max 25) mediaitems in the items attribute.
+ Returns a actual and personalised list of Media items with recommendations
+ form this provider for the user/account. It may return nested levels with
+ BrowseFolder items.
"""
if ProviderFeature.RECOMMENDATIONS in self.supported_features:
raise NotImplementedError
BrowseFolder,
ContentType,
ImageType,
+ ItemMapping,
MediaItemImage,
+ MediaItemType,
MediaType,
Playlist,
PlaylistTrack,
).items
return result
- async def browse(self, path: str) -> BrowseFolder:
+ async def browse(self, path: str) -> AsyncGenerator[MediaItemType, None]:
"""Browse this provider's items.
:param path: The path to browse, (e.g. provid://artists).
"""
- _, item_path = path.split("://")
+ item_path = path.split("://", 1)[1]
if not item_path:
item_path = ""
- subitems = []
async for item in self.listdir(item_path, recursive=False):
if item.is_dir:
- subitems.append(
- BrowseFolder(
- item_id=item.path,
- provider=self.instance_id,
- path=f"{self.instance_id}://{item.path}",
- name=item.name,
- )
+ yield BrowseFolder(
+ item_id=item.path,
+ provider=self.instance_id,
+ path=f"{self.instance_id}://{item.path}",
+ name=item.name,
)
continue
continue
if item.ext in TRACK_EXTENSIONS:
- if library_item := await self.mass.music.tracks.get_library_item_by_prov_id(
- item.path, self.instance_id
- ):
- subitems.append(library_item)
- elif track := await self.get_track(item.path):
- # make sure that the item exists
- # https://github.com/music-assistant/hass-music-assistant/issues/707
- library_item = await self.mass.music.tracks.add_item_to_library(
- track, metadata_lookup=False
- )
- subitems.append(library_item)
+ yield ItemMapping(
+ media_type=MediaType.TRACK,
+ item_id=item.path,
+ provider=self.instance_id,
+ name=item.name,
+ )
continue
if item.ext in PLAYLIST_EXTENSIONS:
- if library_item := await self.mass.music.playlists.get_library_item_by_prov_id(
- item.path, self.instance_id
- ):
- subitems.append(library_item)
- elif playlist := await self.get_playlist(item.path):
- # make sure that the item exists
- # https://github.com/music-assistant/hass-music-assistant/issues/707
- library_item = await self.mass.music.playlists.add_item_to_library(
- playlist, metadata_lookup=False
- )
- subitems.append(library_item)
- continue
-
- return BrowseFolder(
- item_id=item_path,
- provider=self.instance_id,
- path=path,
- name=item_path or self.name,
- # make sure to sort the resulting listing
- items=sorted(subitems, key=lambda x: (x.name.casefold(), x.name)),
- )
+ yield ItemMapping(
+ media_type=MediaType.PLAYLIST,
+ item_id=item.path,
+ provider=self.instance_id,
+ name=item.name,
+ )
async def sync_library(self, media_types: tuple[MediaType, ...]) -> None: # noqa: ARG002
"""Run library sync for this provider."""
ImageType,
MediaItemImage,
MediaItemLink,
+ MediaItemType,
MediaType,
ProviderMapping,
Radio,
return result
- async def browse(self, path: str) -> BrowseFolder:
+ async def browse(self, path: str) -> AsyncGenerator[MediaItemType, None]:
"""Browse this provider's items.
:param path: The path to browse, (e.g. provid://artists).
"""
- _, subpath = path.split("://")
+ subpath = path.split("://", 1)[1]
subsubpath = "" if "/" not in subpath else subpath.split("/")[-1]
if not subpath:
# return main listing
- root_items: list[BrowseFolder] = []
- root_items.append(
- BrowseFolder(
- item_id="popular",
- provider=self.domain,
- path=path + "popular",
- name="",
- label="By popularity",
- )
- )
- root_items.append(
- BrowseFolder(
- item_id="country",
- provider=self.domain,
- path=path + "country",
- name="",
- label="By country",
- )
- )
- root_items.append(
- BrowseFolder(
- item_id="tag",
- provider=self.domain,
- path=path + "tag",
- name="",
- label="By tag",
- )
+ yield BrowseFolder(
+ item_id="popular",
+ provider=self.domain,
+ path=path + "popular",
+ name="",
+ label="radiobrowser_by_popularity",
)
-
- return BrowseFolder(
- item_id="root",
+ yield BrowseFolder(
+ item_id="country",
provider=self.domain,
- path=path,
- name=self.name,
- items=root_items,
+ path=path + "country",
+ name="",
+ label="radiobrowser_by_country",
)
-
- if subpath == "popular":
- return BrowseFolder(
- item_id="radios",
+ yield BrowseFolder(
+ item_id="tag",
provider=self.domain,
- path=path,
+ path=path + "tag",
name="",
- label="radios",
- items=[x for x in await self.get_by_popularity()],
+ label="radiobrowser_by_tag",
)
+ return
+
+ if subpath == "popular":
+ for item in await self.get_by_popularity():
+ yield item
+ return
if subpath == "tag":
- sub_items: list[BrowseFolder] = []
tags = await self.radios.tags(
hide_broken=True,
limit=100,
)
tags.sort(key=lambda tag: tag.name)
for tag in tags:
- folder = BrowseFolder(
+ yield BrowseFolder(
item_id=tag.name.lower(),
provider=self.domain,
path=path + "/" + tag.name.lower(),
- name="",
- label=tag.name,
+ name=tag.name,
)
- sub_items.append(folder)
- return BrowseFolder(
- item_id="tag",
- provider=self.domain,
- path=path,
- name=self.name,
- items=sub_items,
- )
+ return
if subpath == "country":
- sub_items: list[BrowseFolder] = []
for country in await self.radios.countries(order=Order.NAME):
folder = BrowseFolder(
item_id=country.code.lower(),
provider=self.domain,
path=path + "/" + country.code.lower(),
- name="",
- label=country.name,
+ name=country.name,
)
folder.metadata.images = [
MediaItemImage(type=ImageType.THUMB, path=country.favicon)
]
- sub_items.append(folder)
- return BrowseFolder(
- item_id="country",
- provider=self.domain,
- path=path,
- name=self.name,
- items=sub_items,
- )
+ yield folder
+ return
if subsubpath in await self.get_tag_names():
- return BrowseFolder(
- item_id="radios",
- provider=self.domain,
- path=path,
- name="",
- label="radios",
- items=[x for x in await self.get_by_tag(subsubpath)],
- )
+ for item in await self.get_by_tag(subsubpath):
+ yield item
+ return
if subsubpath in await self.get_country_codes():
- return BrowseFolder(
- item_id="radios",
- provider=self.domain,
- path=path,
- name="",
- label="radios",
- items=[x for x in await self.get_by_country(subsubpath)],
- )
+ for item in await self.get_by_country(subsubpath):
+ yield item
async def get_tag_names(self):
"""Get a list of tag names."""