from time import time
from typing import TYPE_CHECKING
-from radios import RadioBrowser, RadioBrowserError
+from radios import FilterBy, Order, RadioBrowser, RadioBrowserError
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import LinkType, ProviderFeature
from music_assistant.common.models.media_items import (
+ BrowseFolder,
ContentType,
ImageType,
MediaItemImage,
from music_assistant.server.helpers.audio import get_radio_stream
from music_assistant.server.models.music_provider import MusicProvider
-SUPPORTED_FEATURES = (ProviderFeature.SEARCH,)
+SUPPORTED_FEATURES = (ProviderFeature.SEARCH, ProviderFeature.BROWSE)
if TYPE_CHECKING:
from music_assistant.common.models.config_entries import ProviderConfig
return result
+ async def browse(self, path: str) -> BrowseFolder:
+ """Browse this provider's items.
+
+ :param path: The path to browse, (e.g. provid://artists).
+ """
+ _, subpath = path.split("://")
+ subsubpath = "" if "/" not in subpath else subpath.split("/")[-1]
+
+ if not subpath:
+ # return main listing
+ root_items: list[BrowseFolder] = []
+ root_items.append(
+ BrowseFolder(
+ item_id="popular",
+ provider=self.domain,
+ path=path + "popular",
+ name="",
+ label="By popularity",
+ )
+ )
+ root_items.append(
+ BrowseFolder(
+ item_id="country",
+ provider=self.domain,
+ path=path + "country",
+ name="",
+ label="By country",
+ )
+ )
+ root_items.append(
+ BrowseFolder(
+ item_id="tag",
+ provider=self.domain,
+ path=path + "tag",
+ name="",
+ label="By tag",
+ )
+ )
+
+ return BrowseFolder(
+ item_id="root",
+ provider=self.domain,
+ path=path,
+ name=self.name,
+ items=root_items,
+ )
+
+ if subpath == "popular":
+ return BrowseFolder(
+ item_id="radios",
+ provider=self.domain,
+ path=path,
+ name="",
+ label="radios",
+ items=[x for x in await self.get_by_popularity()],
+ )
+
+ if subpath == "tag":
+ sub_items: list[BrowseFolder] = []
+ tags = await self.radios.tags(
+ hide_broken=True,
+ limit=100,
+ order=Order.STATION_COUNT,
+ reverse=True,
+ )
+ tags.sort(key=lambda tag: tag.name)
+ for tag in tags:
+ folder = BrowseFolder(
+ item_id=tag.name.lower(),
+ provider=self.domain,
+ path=path + "/" + tag.name.lower(),
+ name="",
+ label=tag.name,
+ )
+ sub_items.append(folder)
+ return BrowseFolder(
+ item_id="tag",
+ provider=self.domain,
+ path=path,
+ name=self.name,
+ items=sub_items,
+ )
+
+ if subpath == "country":
+ sub_items: list[BrowseFolder] = []
+ for country in await self.radios.countries(order=Order.NAME):
+ folder = BrowseFolder(
+ item_id=country.name.lower(),
+ provider=self.domain,
+ path=path + "/" + country.name.lower(),
+ name="",
+ label=country.name,
+ )
+ # When folder icons become available.
+ # folder.metadata.images = [MediaItemImage(ImageType.THUMB, country.favicon)]
+ sub_items.append(folder)
+ return BrowseFolder(
+ item_id="country",
+ provider=self.domain,
+ path=path,
+ name=self.name,
+ items=sub_items,
+ )
+
+ if subsubpath in await self.get_tag_names():
+ return BrowseFolder(
+ item_id="radios",
+ provider=self.domain,
+ path=path,
+ name="",
+ label="radios",
+ items=[x for x in await self.get_by_tag(subsubpath)],
+ )
+
+ if subsubpath in await self.get_country_names():
+ return BrowseFolder(
+ item_id="radios",
+ provider=self.domain,
+ path=path,
+ name="",
+ label="radios",
+ items=[x for x in await self.get_by_country(subsubpath)],
+ )
+
+ async def get_tag_names(self):
+ """Get a list of tag names."""
+ tags = await self.radios.tags(
+ hide_broken=True,
+ limit=100,
+ order=Order.STATION_COUNT,
+ reverse=True,
+ )
+ tags.sort(key=lambda tag: tag.name)
+ tag_names = []
+ for tag in tags:
+ tag_names.append(tag.name.lower())
+ return tag_names
+
+ async def get_country_names(self):
+ """Get a list of country names."""
+ countries = await self.radios.countries(order=Order.NAME)
+ country_names = []
+ for country in countries:
+ country_names.append(country.name.lower())
+ return country_names
+
+ async def get_by_popularity(self):
+ """Get radio stations by popularity."""
+ stations = await self.radios.stations(
+ hide_broken=True,
+ limit=250,
+ order=Order.CLICK_COUNT,
+ reverse=True,
+ )
+ items = []
+ for station in stations:
+ items.append(await self._parse_radio(station))
+ return items
+
+ async def get_by_tag(self, tag: str):
+ """Get radio stations by tag."""
+ items = []
+ stations = await self.radios.stations(
+ filter_by=FilterBy.TAG_EXACT,
+ filter_term=tag,
+ hide_broken=True,
+ order=Order.NAME,
+ reverse=False,
+ )
+ for station in stations:
+ items.append(await self._parse_radio(station))
+ return items
+
+ async def get_by_country(self, country: str):
+ """Get radio stations by country."""
+ items = []
+ stations = await self.radios.stations(
+ filter_by=FilterBy.COUNTRY_EXACT,
+ filter_term=country,
+ hide_broken=True,
+ order=Order.NAME,
+ reverse=False,
+ )
+ for station in stations:
+ items.append(await self._parse_radio(station))
+ return items
+
async def get_radio(self, prov_radio_id: str) -> Radio:
"""Get radio station details."""
radio = await self.radios.station(uuid=prov_radio_id)