ProviderFeature,
StreamType,
)
-from music_assistant_models.errors import MediaNotFoundError
+from music_assistant_models.errors import MediaNotFoundError, ProviderUnavailableError
from music_assistant_models.media_items import (
AudioFormat,
BrowseFolder,
SUPPORTED_FEATURES = {
ProviderFeature.SEARCH,
ProviderFeature.BROWSE,
- # RadioBrowser doesn't support a library feature at all
- # but MA users like to favorite their radio stations and
- # have that included in backups so we store it in the config.
ProviderFeature.LIBRARY_RADIOS,
ProviderFeature.LIBRARY_RADIOS_EDIT,
}
session=self.mass.http_session, user_agent=f"MusicAssistant/{self.mass.version}"
)
try:
- # Try to get some stats to check connection to RadioBrowser API
await self.radios.stats()
except RadioBrowserError as err:
- self.logger.exception("%s", err)
+ raise ProviderUnavailableError(f"RadioBrowser API unavailable: {err}") from err
# copy the radiobrowser items that were added to the library
# TODO: remove this logic after version 2.3.0 or later
async def search(
self, search_query: str, media_types: list[MediaType], limit: int = 10
) -> SearchResults:
- """Perform search on musicprovider.
-
- :param search_query: Search query.
- :param media_types: A list of media_types to include. All types if None.
- :param limit: Number of items to return in the search (per type).
- """
+ """Perform search on musicprovider."""
result = SearchResults()
if MediaType.RADIO not in media_types:
return result
- searchresult = await self.radios.search(name=search_query, limit=limit)
- result.radio = [await self._parse_radio(item) for item in searchresult]
+ try:
+ searchresult = await self.radios.search(name=search_query, limit=limit)
+ result.radio = [await self._parse_radio(item) for item in searchresult]
+ except RadioBrowserError as err:
+ self.logger.warning("RadioBrowser search failed for query '%s': %s", search_query, err)
return result
async def browse(self, path: str) -> Sequence[MediaItemType | BrowseFolder]:
- """Browse this provider's items.
+ """Browse this provider's items."""
+ path_parts = [] if "://" not in path else path.split("://")[1].split("/")
- :param path: The path to browse, (e.g. provid://artists).
- """
- part_parts = path.split("://")[1].split("/")
- subpath = part_parts[0] if part_parts else ""
- subsubpath = part_parts[1] if len(part_parts) > 1 else ""
+ subpath = path_parts[0] if len(path_parts) > 0 else ""
+ subsubpath = path_parts[1] if len(path_parts) > 1 else ""
+ subsubsubpath = path_parts[2] if len(path_parts) > 2 else ""
if not subpath:
- # return main listing
return [
BrowseFolder(
- item_id="popular",
+ item_id="popularity",
provider=self.domain,
- path=path + "popular",
+ path=path + "popularity",
name="",
translation_key="radiobrowser_by_popularity",
),
BrowseFolder(
- item_id="country",
- provider=self.domain,
- path=path + "country",
- name="",
- translation_key="radiobrowser_by_country",
- ),
- BrowseFolder(
- item_id="tag",
+ item_id="category",
provider=self.domain,
- path=path + "tag",
+ path=path + "category",
name="",
- translation_key="radiobrowser_by_tag",
+ translation_key="radiobrowser_by_category",
),
]
- if subpath == "popular":
- return await self.get_by_popularity()
-
- if subpath == "tag" and subsubpath:
- return await self.get_by_tag(subsubpath)
-
- if subpath == "tag":
- return await self.get_tag_folders(path)
-
- if subpath == "country" and subsubpath:
- return await self.get_by_country(subsubpath)
-
- if subpath == "country":
- return await self.get_country_folders(path)
+ if subpath == "popularity":
+ if not subsubpath:
+ return [
+ BrowseFolder(
+ item_id="popular",
+ provider=self.domain,
+ path=path + "/popular",
+ name="",
+ translation_key="radiobrowser_by_clicks",
+ ),
+ BrowseFolder(
+ item_id="votes",
+ provider=self.domain,
+ path=path + "/votes",
+ name="",
+ translation_key="radiobrowser_by_votes",
+ ),
+ ]
+
+ if subsubpath == "popular":
+ return await self.get_by_popularity()
+
+ if subsubpath == "votes":
+ return await self.get_by_votes()
+
+ if subpath == "category":
+ if not subsubpath:
+ return [
+ BrowseFolder(
+ item_id="country",
+ provider=self.domain,
+ path=path + "/country",
+ name="",
+ translation_key="radiobrowser_by_country",
+ ),
+ BrowseFolder(
+ item_id="language",
+ provider=self.domain,
+ path=path + "/language",
+ name="",
+ translation_key="radiobrowser_by_language",
+ ),
+ BrowseFolder(
+ item_id="tag",
+ provider=self.domain,
+ path=path + "/tag",
+ name="",
+ translation_key="radiobrowser_by_tag",
+ ),
+ ]
+
+ if subsubpath == "country":
+ if subsubsubpath:
+ return await self.get_by_country(subsubsubpath)
+ return await self.get_country_folders(path)
+
+ if subsubpath == "language":
+ if subsubsubpath:
+ return await self.get_by_language(subsubsubpath)
+ return await self.get_language_folders(path)
+
+ if subsubpath == "tag":
+ if subsubsubpath:
+ return await self.get_by_tag(subsubsubpath)
+ return await self.get_tag_folders(path)
return []
for item in stored_radios:
try:
yield await self.get_radio(item)
- except MediaNotFoundError as err:
- self.logger.warning("Radio station %s not found: %s", item, err)
+ except MediaNotFoundError:
+ self.logger.warning("Radio station %s no longer exists", item)
async def library_add(self, item: MediaItemType) -> bool:
"""Add item to provider's library. Return true on success."""
self.update_config_value(CONF_STORED_RADIOS, stored_radios)
return True
- @use_cache(3600 * 24)
- async def get_tag_folders(self, base_path: str) -> list[BrowseFolder]:
- """Get a list of tag names as BrowseFolder."""
- tags = await self.radios.tags(
- hide_broken=True,
- order=Order.STATION_COUNT,
- reverse=True,
- )
- tags.sort(key=lambda tag: tag.name)
- return [
- BrowseFolder(
- item_id=tag.name.lower(),
- provider=self.domain,
- path=base_path + "/" + tag.name.lower(),
- name=tag.name,
+ @use_cache(3600)
+ async def get_by_popularity(self) -> Sequence[Radio]:
+ """Get radio stations by popularity."""
+ try:
+ stations = await self.radios.stations(
+ hide_broken=True,
+ limit=1000,
+ order=Order.CLICK_COUNT,
+ reverse=True,
)
- for tag in tags
- ]
+ return [await self._parse_radio(station) for station in stations]
+ except RadioBrowserError as err:
+ raise ProviderUnavailableError(f"Failed to fetch popular stations: {err}") from err
+
+ @use_cache(3600)
+ async def get_by_votes(self) -> Sequence[Radio]:
+ """Get radio stations by votes."""
+ try:
+ stations = await self.radios.stations(
+ hide_broken=True,
+ limit=1000,
+ order=Order.VOTES,
+ reverse=True,
+ )
+ return [await self._parse_radio(station) for station in stations]
+ except RadioBrowserError as err:
+ raise ProviderUnavailableError(f"Failed to fetch stations by votes: {err}") from err
@use_cache(3600 * 24)
async def get_country_folders(self, base_path: str) -> list[BrowseFolder]:
"""Get a list of country names as BrowseFolder."""
+ try:
+ countries = await self.radios.countries(order=Order.NAME, hide_broken=True, limit=1000)
+ except RadioBrowserError as err:
+ raise ProviderUnavailableError(f"Failed to fetch countries: {err}") from err
+
items: list[BrowseFolder] = []
- for country in await self.radios.countries(order=Order.NAME, hide_broken=True, limit=1000):
+ for country in countries:
folder = BrowseFolder(
item_id=country.code.lower(),
provider=self.domain,
path=base_path + "/" + country.code.lower(),
name=country.name,
)
- folder.image = MediaItemImage(
- type=ImageType.THUMB,
- path=country.favicon,
- provider=self.lookup_key,
- remotely_accessible=True,
- )
+ if country.favicon and country.favicon.strip():
+ folder.image = MediaItemImage(
+ type=ImageType.THUMB,
+ path=country.favicon,
+ provider=self.lookup_key,
+ remotely_accessible=True,
+ )
items.append(folder)
return items
- @use_cache(3600)
- async def get_by_popularity(self) -> Sequence[Radio]:
- """Get radio stations by popularity."""
- stations = await self.radios.stations(
- hide_broken=True,
- limit=1000,
- order=Order.CLICK_COUNT,
- reverse=True,
- )
- items = []
- for station in stations:
- items.append(await self._parse_radio(station))
- return items
+ @use_cache(3600 * 24)
+ async def get_language_folders(self, base_path: str) -> list[BrowseFolder]:
+ """Get a list of language names as BrowseFolder."""
+ try:
+ languages = await self.radios.languages(
+ order=Order.STATION_COUNT, reverse=True, hide_broken=True, limit=1000
+ )
+ except RadioBrowserError as err:
+ raise ProviderUnavailableError(f"Failed to fetch languages: {err}") from err
- @use_cache(3600)
- async def get_by_tag(self, tag: str) -> Sequence[Radio]:
- """Get radio stations by tag."""
- items = []
- stations = await self.radios.stations(
- filter_by=FilterBy.TAG_EXACT,
- filter_term=tag,
- hide_broken=True,
- limit=1000,
- order=Order.CLICK_COUNT,
- reverse=False,
- )
- for station in stations:
- items.append(await self._parse_radio(station))
- return items
+ return [
+ BrowseFolder(
+ item_id=language.name,
+ provider=self.domain,
+ path=base_path + "/" + language.name,
+ name=language.name,
+ )
+ for language in languages
+ ]
+
+ @use_cache(3600 * 24)
+ async def get_tag_folders(self, base_path: str) -> list[BrowseFolder]:
+ """Get a list of tag names as BrowseFolder."""
+ try:
+ tags = await self.radios.tags(
+ hide_broken=True,
+ order=Order.STATION_COUNT,
+ reverse=True,
+ limit=100,
+ )
+ except RadioBrowserError as err:
+ raise ProviderUnavailableError(f"Failed to fetch tags: {err}") from err
+
+ tags.sort(key=lambda tag: tag.name)
+ return [
+ BrowseFolder(
+ item_id=tag.name,
+ provider=self.domain,
+ path=base_path + "/" + tag.name,
+ name=tag.name.title(),
+ )
+ for tag in tags
+ ]
@use_cache(3600)
async def get_by_country(self, country_code: str) -> list[Radio]:
"""Get radio stations by country."""
- items = []
- stations = await self.radios.stations(
- filter_by=FilterBy.COUNTRY_CODE_EXACT,
- filter_term=country_code,
- hide_broken=True,
- limit=1000,
- order=Order.CLICK_COUNT,
- reverse=False,
- )
- for station in stations:
- items.append(await self._parse_radio(station))
- return items
+ try:
+ stations = await self.radios.stations(
+ filter_by=FilterBy.COUNTRY_CODE_EXACT,
+ filter_term=country_code,
+ hide_broken=True,
+ limit=1000,
+ order=Order.CLICK_COUNT,
+ reverse=True,
+ )
+ return [await self._parse_radio(station) for station in stations]
+ except RadioBrowserError as err:
+ raise ProviderUnavailableError(
+ f"Failed to fetch stations for country {country_code}: {err}"
+ ) from err
+
+ @use_cache(3600)
+ async def get_by_language(self, language: str) -> list[Radio]:
+ """Get radio stations by language."""
+ try:
+ stations = await self.radios.stations(
+ filter_by=FilterBy.LANGUAGE_EXACT,
+ filter_term=language,
+ hide_broken=True,
+ limit=1000,
+ order=Order.CLICK_COUNT,
+ reverse=True,
+ )
+ return [await self._parse_radio(station) for station in stations]
+ except RadioBrowserError as err:
+ raise ProviderUnavailableError(
+ f"Failed to fetch stations for language {language}: {err}"
+ ) from err
+
+ @use_cache(3600)
+ async def get_by_tag(self, tag: str) -> list[Radio]:
+ """Get radio stations by tag."""
+ try:
+ stations = await self.radios.stations(
+ filter_by=FilterBy.TAG_EXACT,
+ filter_term=tag,
+ hide_broken=True,
+ limit=1000,
+ order=Order.CLICK_COUNT,
+ reverse=True,
+ )
+ return [await self._parse_radio(station) for station in stations]
+ except RadioBrowserError as err:
+ raise ProviderUnavailableError(
+ f"Failed to fetch stations for tag {tag}: {err}"
+ ) from err
async def get_radio(self, prov_radio_id: str) -> Radio:
"""Get radio station details."""
- radio = await self.radios.station(uuid=prov_radio_id)
- if not radio:
- raise MediaNotFoundError(f"Radio station {prov_radio_id} not found")
- return await self._parse_radio(radio)
+ try:
+ radio = await self.radios.station(uuid=prov_radio_id)
+ if not radio:
+ raise MediaNotFoundError(f"Radio station {prov_radio_id} not found")
+ return await self._parse_radio(radio)
+ except RadioBrowserError as err:
+ raise ProviderUnavailableError(
+ f"Failed to fetch radio station {prov_radio_id}: {err}"
+ ) from err
async def _parse_radio(self, radio_obj: Station) -> Radio:
"""Parse Radio object from json obj returned from api."""
)
},
)
- radio.metadata.popularity = radio_obj.votes
+ radio.metadata.popularity = radio_obj.click_count
radio.metadata.links = {MediaItemLink(type=LinkType.WEBSITE, url=radio_obj.homepage)}
radio.metadata.images = UniqueList(
[
)
]
)
-
return radio
async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get streamdetails for a radio station."""
- stream = await self.radios.station(uuid=item_id)
- if not stream:
- raise MediaNotFoundError(f"Radio station {item_id} not found")
- await self.radios.station_click(uuid=item_id)
- return StreamDetails(
- provider=self.domain,
- item_id=item_id,
- audio_format=AudioFormat(
- content_type=ContentType.try_parse(stream.codec),
- ),
- media_type=MediaType.RADIO,
- stream_type=StreamType.HTTP,
- path=stream.url_resolved,
- can_seek=False,
- allow_seek=False,
- )
+ try:
+ stream = await self.radios.station(uuid=item_id)
+ if not stream:
+ raise MediaNotFoundError(f"Radio station {item_id} not found")
+
+ await self.radios.station_click(uuid=item_id)
+
+ return StreamDetails(
+ provider=self.domain,
+ item_id=item_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(stream.codec),
+ ),
+ media_type=MediaType.RADIO,
+ stream_type=StreamType.HTTP,
+ path=stream.url_resolved,
+ can_seek=False,
+ allow_seek=False,
+ )
+ except RadioBrowserError as err:
+ raise ProviderUnavailableError(
+ f"Failed to get stream details for {item_id}: {err}"
+ ) from err