From: Kieran Hogg Date: Tue, 25 Nov 2025 17:45:29 +0000 (+0000) Subject: Add bbc sounds provider (#2567) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=4202bbb086093e9fe5968384a2e43ec2de68df77;p=music-assistant-server.git Add bbc sounds provider (#2567) * Initial commit of BBC Sounds provider * Remove unused SUPPORTED_FEATURES * Remove commented code; tidy stream and provider variables; accept ruff formatting * Add search * Initial commit of BBC Sounds provider * Remove unused SUPPORTED_FEATURES * Remove commented code; tidy stream and provider variables; accept ruff formatting * Add search * Initial commit of BBC Sounds provider * First version of (almost) the full Sounds functionality * Remove accidental leading _ in get_podcast_episode() * Use network logo and not programme image for schedule listing * Remove commented out code * Tidy up object titles and update to latest auntie-sounds API * Remove unused supported features * Add feature flag for blank image check * Cancel now playing task and tidy up * Update icons * Return SUPPORTED_FEATURES with instance in setup() * Remove the enable UK content toggle * Remove boiler comments and tweak wording * Check image_url is set after library change * Update library version * Fix typo in comment Co-authored-by: OzGav * Remove the check to provide the seekable stream version. It's not yet used and has the potential to affect international users, so remove it be safe. * Move fetching the menu to prevent it not being available if accessed by another route * Add an advanced option to choose a stream format * Update dependency version * Remove unnecessary config options * Remove a couple of incorrectly-set StreamDetails variables on Radio streams * Tidy up after removing config options * Add some sensible caching * Increase cache expiry * Add cache to get_podcast() * Disable 'now playing' until supported in core --------- Co-authored-by: OzGav --- diff --git a/music_assistant/providers/bbc_sounds/__init__.py b/music_assistant/providers/bbc_sounds/__init__.py new file mode 100644 index 00000000..2f91511b --- /dev/null +++ b/music_assistant/providers/bbc_sounds/__init__.py @@ -0,0 +1,802 @@ +""" +BBC Sounds music provider support for MusicAssistant. + +TODO implement seeking of live stream +TODO watch for settings change +TODO add podcast menu to non-UK menu +FIXME skipping in non-live radio shows restarts the stream but keeps the seek time +""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncGenerator +from datetime import timedelta +from typing import TYPE_CHECKING, Literal + +from music_assistant_models.config_entries import ( + ConfigEntry, + ConfigValueOption, + ConfigValueType, + ProviderConfig, +) +from music_assistant_models.enums import ConfigEntryType, ImageType, MediaType, ProviderFeature +from music_assistant_models.errors import LoginFailed, MusicAssistantError +from music_assistant_models.media_items import ( + BrowseFolder, + ItemMapping, + MediaItemImage, + MediaItemMetadata, + MediaItemType, + Podcast, + PodcastEpisode, + ProviderMapping, + Radio, + RecommendationFolder, + SearchResults, + Track, +) +from music_assistant_models.unique_list import UniqueList + +import music_assistant.helpers.datetime as dt +from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME +from music_assistant.controllers.cache import use_cache +from music_assistant.helpers.datetime import LOCAL_TIMEZONE +from music_assistant.models.music_provider import MusicProvider +from music_assistant.providers.bbc_sounds.adaptor import Adaptor + +if TYPE_CHECKING: + from collections.abc import Sequence + + from music_assistant_models.provider import ProviderManifest + from music_assistant_models.streamdetails import StreamDetails + from sounds.models import SoundsTypes + + from music_assistant.mass import MusicAssistant + from music_assistant.models import ProviderInstanceType + +from sounds import ( + Container, + LiveStation, + Menu, + MenuRecommendationOptions, + PlayStatus, + SoundsClient, + exceptions, +) + +SUPPORTED_FEATURES = { + ProviderFeature.BROWSE, + ProviderFeature.RECOMMENDATIONS, + ProviderFeature.SEARCH, +} + +FEATURES = {"now_playing": False, "catchup_segments": False, "check_blank_image": False} + +type _StreamTypes = Literal["hls", "dash"] + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Create new provider instance.""" + instance = BBCSoundsProvider(mass, manifest, config, SUPPORTED_FEATURES) + await instance.handle_async_init() + return instance + + +async def get_config_entries( + mass: MusicAssistant, + instance_id: str | None = None, + action: str | None = None, + values: dict[str, ConfigValueType] | None = None, +) -> tuple[ConfigEntry, ...]: + """ + Return Config entries to setup this provider. + + instance_id: id of an existing provider instance (None if new instance setup). + action: [optional] action key called from config entries UI. + values: the (intermediate) raw values for config entries sent with the action. + """ + # ruff: noqa: ARG001 + + return ( + ConfigEntry( + key=_Constants.CONF_INTRO, + type=ConfigEntryType.LABEL, + label="A BBC Sounds account is optional, but some UK-only content may not work without" + " it", + ), + ConfigEntry( + key=CONF_USERNAME, + type=ConfigEntryType.STRING, + label="Email or username", + required=False, + ), + ConfigEntry( + key=CONF_PASSWORD, + type=ConfigEntryType.SECURE_STRING, + label="Password", + required=False, + ), + ConfigEntry( + key=_Constants.CONF_SHOW_LOCAL, + category="advanced", + type=ConfigEntryType.BOOLEAN, + label="Show local radio stations?", + default_value=False, + ), + ConfigEntry( + key=_Constants.CONF_STREAM_FORMAT, + category="advanced", + label="Preferred stream format", + type=ConfigEntryType.STRING, + options=[ + ConfigValueOption( + "HLS", + _Constants.CONF_STREAM_FORMAT_HLS, + ), + ConfigValueOption( + "MPEG-DASH", + _Constants.CONF_STREAM_FORMAT_DASH, + ), + ], + default_value=_Constants.CONF_STREAM_FORMAT_HLS, + ), + ) + + +class _Constants: + # This is the image id that is shown when there's no track image + BLANK_IMAGE_NAME: str = "p0bqcdzf" + DEFAULT_IMAGE_SIZE = 1280 + TRACK_DURATION_THRESHOLD: int = 300 # 5 minutes + NOW_PLAYING_REFRESH_TIME: int = 5 + HLS: Literal["hls"] = "hls" + DASH: Literal["dash"] = "dash" + CONF_SHOW_LOCAL: str = "show_local" + CONF_INTRO: str = "intro" + CONF_STREAM_FORMAT: str = "stream_format" + CONF_STREAM_FORMAT_HLS: str = HLS + CONF_STREAM_FORMAT_DASH: str = DASH + DEFAULT_EXPIRATION = 60 * 60 * 24 * 30 # 30 days + SHORT_EXPIRATION = 60 * 60 * 3 # 3 hours + + +class BBCSoundsProvider(MusicProvider): + """A MusicProvider class to interact with the BBC Sounds API via auntie-sounds.""" + + client: SoundsClient + menu: Menu | None = None + current_task: asyncio.Task[None] | None = None + + async def handle_async_init(self) -> None: + """Handle async initialization of the provider.""" + self.client = SoundsClient( + session=self.mass.http_session, + logger=self.logger, + timezone=LOCAL_TIMEZONE, + ) + + self.show_local_stations: bool = bool( + self.config.get_value(_Constants.CONF_SHOW_LOCAL, False) + ) + self.stream_format: _StreamTypes = ( + _Constants.DASH + if self.config.get_value(_Constants.CONF_STREAM_FORMAT) == _Constants.DASH + else _Constants.HLS + ) + self.adaptor = Adaptor(self) + + # If we have an account, authenticate. Testing shows all features work without auth + # but BBC will be disabling BBC Sounds from outside the UK at some point + if self.config.get_value(CONF_USERNAME) and self.config.get_value(CONF_PASSWORD): + if self.client.auth.is_logged_in: + # Check if we need to reauth + try: + await self.client.personal.get_experience_menu() + return + except (exceptions.UnauthorisedError, exceptions.APIResponseError): + await self.client.auth.renew_session() + + try: + await self.client.auth.authenticate( + username=str(self.config.get_value(CONF_USERNAME)), + password=str(self.config.get_value(CONF_PASSWORD)), + ) + except exceptions.LoginFailedError as e: + raise LoginFailed(e) + + async def loaded_in_mass(self) -> None: + """Do post-loaded actions.""" + if not self.menu or ( + isinstance(self.menu, Menu) and self.menu.sub_items and len(self.menu.sub_items) == 0 + ): + await self._fetch_menu() + + def _get_provider_mapping(self, item_id: str) -> ProviderMapping: + return ProviderMapping( + item_id=item_id, + provider_domain=self.domain, + provider_instance=self.instance_id, + ) + + async def _fetch_menu(self) -> None: + self.logger.debug("No cached menu, fetching from API") + self.menu = await self.client.personal.get_experience_menu( + recommendations=MenuRecommendationOptions.EXCLUDE + ) + + def _stream_error(self, item_id: str, media_type: MediaType) -> MusicAssistantError: + return MusicAssistantError(f"Couldn't get stream details for {item_id} ({media_type})") + + @property + def is_streaming_provider(self) -> bool: + """Return True as the provider is a streaming provider.""" + return True + + @use_cache(expiration=_Constants.DEFAULT_EXPIRATION) + async def get_track(self, prov_track_id: str) -> Track: + """Get full track details by id.""" + episode_info = await self.client.streaming.get_by_pid( + pid=prov_track_id, stream_format=self.stream_format + ) + track = await self.adaptor.new_object(episode_info, force_type=Track) + if not isinstance(track, Track): + raise MusicAssistantError(f"Incorrect track returned for {prov_track_id}") + return track + + @use_cache(expiration=_Constants.DEFAULT_EXPIRATION) + async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode: + # If we are requesting a previously-aired radio show, we lose access to the + # schedule time. The best we can find out from the API is original release + # date, so the stream title loses access to the air date + """Get full podcast episode details by id.""" + self.logger.debug(f"Getting podcast episode for {prov_episode_id}") + episode = await self.client.streaming.get_podcast_episode(prov_episode_id) + ma_episode = await self.adaptor.new_object(episode, force_type=PodcastEpisode) + if not isinstance(ma_episode, PodcastEpisode): + raise MusicAssistantError(f"Incorrect format for podcast episode {prov_episode_id}") + return ma_episode + + async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails: + """Get streamdetails for a track/radio.""" + self.logger.debug(f"Getting stream details for {item_id} ({media_type})") + if media_type == MediaType.PODCAST_EPISODE: + episode_info = await self.client.streaming.get_by_pid( + item_id, include_stream=True, stream_format=self.stream_format + ) + stream_details = await self.adaptor.new_streamable_object(episode_info) + if not stream_details: + raise self._stream_error(item_id, media_type) + + # Hide behind a feature flag until it can be better tested + if episode_info and FEATURES["catchup_segments"]: + # .item_id is the VPID + self.current_task = self.mass.create_task( + self._check_for_segments(item_id, stream_details) + ) + return stream_details + elif media_type is MediaType.TRACK: + track = await self.client.streaming.get_by_pid( + item_id, include_stream=True, stream_format=self.stream_format + ) + stream_details = await self.adaptor.new_streamable_object(track) + if not stream_details: + raise self._stream_error(item_id, media_type) + self.current_task = self.mass.create_task( + self._check_for_segments(item_id, stream_details) + ) + return stream_details + else: + self.logger.debug(f"Getting stream details for station {item_id}") + station = await self.client.stations.get_station( + item_id, include_stream=True, stream_format=self.stream_format + ) + if not station: + raise MusicAssistantError(f"Couldn't get stream details for station {item_id}") + + self.logger.debug(f"Found station: {station}") + if not station.stream: + raise MusicAssistantError(f"No stream found for {item_id}") + + stream_details = await self.adaptor.new_streamable_object(station) + + if not stream_details: + raise self._stream_error(item_id, media_type) + + # Start a background task to keep these details updated + if FEATURES["now_playing"]: + self.current_task = self.mass.create_task( + self._watch_stream_details(stream_details) + ) + return stream_details + + async def _check_for_segments(self, vpid: str, stream_details: StreamDetails) -> None: + # seeking past the current segment needs fixing + segments = await self.client.streaming.get_show_segments(vpid) + offset = stream_details.seek_position + (stream_details.seconds_streamed or 0) + if segments: + seconds = 0 + offset + segments_iter = iter(segments) + segment = next(segments_iter) + if seconds > 0: + # Skip to the correct segment + prev = None + while seconds > segment.offset["start"]: + self.logger.info("Advancing to next segment") + prev = segment + segment = next(segments_iter) + self.logger.warning("Starting with first segment") + if prev and seconds > prev.offset["start"] and seconds < prev.offset["end"]: + if stream_details.stream_metadata: + stream_details.stream_metadata.artist = prev.titles["primary"] + stream_details.stream_metadata.title = prev.titles["secondary"] + if prev.image_url: + stream_details.stream_metadata.image_url = prev.image_url + while True: + if seconds == segment.offset["start"] and stream_details.stream_metadata: + self.logger.warning("Updating segment") + stream_details.stream_metadata.artist = segment.titles["primary"] + stream_details.stream_metadata.title = segment.titles["secondary"] + if segment.image_url: + stream_details.stream_metadata.image_url = segment.image_url + segment = next(segments_iter) + await asyncio.sleep(1) + seconds += 1 + else: + self.logger.warning("No segments found") + + async def _watch_stream_details(self, stream_details: StreamDetails) -> None: + station_id = stream_details.data["station"] + + while True: + if not stream_details.stream_metadata: + await asyncio.sleep(_Constants.NOW_PLAYING_REFRESH_TIME) + continue + + now_playing = await self.client.schedules.currently_playing_song( + station_id, image_size=_Constants.DEFAULT_IMAGE_SIZE + ) + + if now_playing and stream_details.stream_metadata: + self.logger.debug(f"Now playing for {station_id}: {now_playing}") + + # removed check temporarily as images not working + if not FEATURES["check_blank_image"] or ( + now_playing.image_url + and _Constants.BLANK_IMAGE_NAME not in now_playing.image_url + ): + stream_details.stream_metadata.image_url = now_playing.image_url + song = now_playing.titles["secondary"] + artist = now_playing.titles["primary"] + stream_details.stream_metadata.title = song + stream_details.stream_metadata.artist = artist + elif stream_details.stream_metadata: + station = await self.client.stations.get_station(station_id=station_id) + if station: + self.logger.debug(f"Station details: {station}") + display = self._station_programme_display(station) + if display: + stream_details.stream_metadata.title = display + stream_details.stream_metadata.artist = None + stream_details.stream_metadata.image_url = station.image_url + await asyncio.sleep(_Constants.NOW_PLAYING_REFRESH_TIME) + + def _station_programme_display(self, station: LiveStation) -> str | None: + if station and station.titles: + return f"{station.titles.get('secondary')} • {station.titles.get('primary')}" + return None + + async def _station_list(self, include_local: bool = False) -> list[Radio]: + return [ + Radio( + item_id=station.item_id, + name=( + station.network.short_title + if station.network and station.network.short_title + else "Unknown station" + ), + provider=self.domain, + metadata=MediaItemMetadata( + description=self._station_programme_display(station=station), + images=( + UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + provider=self.domain, + path=station.network.logo_url, + remotely_accessible=True, + ), + ] + ) + if station.network and station.network.logo_url + else None + ), + ), + provider_mappings={ + ProviderMapping( + item_id=station.item_id, + provider_domain=self.domain, + provider_instance=self.instance_id, + ) + }, + ) + for station in await self.client.stations.get_stations(include_local=include_local) + if station and station.item_id + ] + + async def _get_category( + self, category_name: str + ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]: + category = await self.client.streaming.get_category(category=category_name) + + if category is not None and category.sub_items: + return [ + obj + for obj in [await self._render_browse_item(item) for item in category.sub_items] + if obj is not None + ] + else: + return [] + + async def _get_collection( + self, pid: str + ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]: + collection = await self.client.streaming.get_collection(pid=pid) + if collection and collection.sub_items: + return [ + obj + for obj in [ + await self._render_browse_item(item) for item in collection.sub_items if item + ] + if obj + ] + else: + return [] + + async def _get_menu( + self, path_parts: list[str] | None = None + ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]: + if self.client.auth.is_logged_in and await self.client.auth.is_uk_listener: + return await self._get_full_menu(path_parts=path_parts) + else: + return await self._get_slim_menu(path_parts=path_parts) + + async def _get_full_menu( + self, path_parts: list[str] | None = None + ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]: + if not self.menu or not self.menu.sub_items: + raise MusicAssistantError("Menu API response is empty or invalid") + menu_items = [] + for item in self.menu.sub_items: + new_item = await self._render_browse_item(item, path_parts) + if isinstance(new_item, (MediaItemType | ItemMapping | BrowseFolder)): + menu_items.append(new_item) + + # The Sounds default menu doesn't include listings as they are linked elsewhere + menu_items.insert( + 1, + BrowseFolder( + item_id="stations", + provider=self.domain, + name="Schedule and Programmes", + path=f"{self.domain}://stations", + image=MediaItemImage( + path="https://cdn.jsdelivr.net/gh/kieranhogg/auntie-sounds@main/src/sounds/icons/solid/latest.png", + remotely_accessible=True, + provider=self.domain, + type=ImageType.THUMB, + ), + ), + ) + return menu_items + + async def _get_slim_menu( + self, path_parts: list[str] | None + ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]: + return [ + BrowseFolder( + item_id="listen_live", + provider=self.domain, + name="Listen Live", + path=f"{self.domain}://listen_live", + image=MediaItemImage( + path="https://cdn.jsdelivr.net/gh/kieranhogg/auntie-sounds@main/src/sounds/icons/solid/listen_live.png", + remotely_accessible=True, + provider=self.domain, + type=ImageType.THUMB, + ), + ), + BrowseFolder( + item_id="stations", + provider=self.domain, + name="Schedules and Programmes", + path=f"{self.domain}://stations", + image=MediaItemImage( + path="https://cdn.jsdelivr.net/gh/kieranhogg/auntie-sounds@main/src/sounds/icons/solid/latest.png", + remotely_accessible=True, + provider=self.domain, + type=ImageType.THUMB, + ), + ), + ] + + async def _render_browse_item( + self, + item: SoundsTypes, + path_parts: list[str] | None = None, + ) -> BrowseFolder | Track | Podcast | PodcastEpisode | RecommendationFolder | Radio | None: + new_item = await self.adaptor.new_object(item, path_parts=path_parts) + if isinstance( + new_item, + (BrowseFolder | Track | Podcast | PodcastEpisode | RecommendationFolder | Radio), + ): + return new_item + else: + return None + + async def _get_subpath_menu( + self, sub_path: str + ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]: + if not self.menu: + return [] + sub_menu = self.menu.get(sub_path) + item_list = [] + + if sub_menu and isinstance(sub_menu, Container): + if sub_menu.sub_items: + # We have some sub-items, so let's show those + for item in sub_menu.sub_items: + new_item = await self._render_browse_item(item) + if new_item: + item_list.append(new_item) + else: + new_item = await self._render_browse_item(sub_menu) + if new_item: + item_list.append(new_item) + + if sub_path == "listen_live": + for item in await self.client.stations.get_stations(): + new_item = await self._render_browse_item(item) + if new_item: + item_list.append(new_item) + # Check if we need to append local stations + if self.show_local_stations: + for item in await self.client.stations.get_local_stations(): + new_item = await self._render_browse_item(item) + if new_item is not None: + item_list.append(new_item) + return item_list + + async def _get_station_schedule_menu( + self, + show_local: bool, + path_parts: list[str], + sub_sub_path: str, + sub_sub_sub_path: str, + ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]: + if sub_sub_sub_path: + # Lookup a date schedule + self.logger.debug( + await self.client.schedules.get_schedule( + station_id=sub_sub_path, + date=sub_sub_sub_path, + ) + ) + schedule = await self.client.schedules.get_schedule( + station_id=sub_sub_path, + date=sub_sub_sub_path, + ) + items = [] + if schedule and schedule.sub_items: + for folder in schedule.sub_items: + new_folder = await self._render_browse_item(folder, path_parts=path_parts) + if new_folder: + items.append(new_folder) + return items + elif sub_sub_path: + # Date listings for a station + date_folders = [ + BrowseFolder( + item_id="today", + name="Today", + provider=self.domain, + path="/".join([*path_parts, dt.now().strftime("%Y-%m-%d")]), + ), + BrowseFolder( + item_id="yesterday", + name="Yesterday", + provider=self.domain, + path="/".join( + [ + *path_parts, + (dt.now() - timedelta(days=1)).strftime("%Y-%m-%d"), + ] + ), + ), + ] + # Maximum is 30 days prior + for diff in range(28): + this_date = dt.now() - timedelta(days=2 + diff) + date_string = this_date.strftime("%Y-%m-%d") + date_folders.extend( + [ + BrowseFolder( + item_id=date_string, + name=date_string, + provider=self.domain, + path="/".join([*path_parts, date_string]), + ) + ] + ) + return date_folders + else: + return [ + BrowseFolder( + item_id=station.item_id, + provider=self.domain, + name=station.name, + path="/".join([*path_parts, station.item_id]), + image=( + MediaItemImage( + type=ImageType.THUMB, + path=station.metadata.images[0].path, + provider=self.domain, + ) + if station.metadata.images + else None + ), + ) + for station in await self._station_list(include_local=show_local) + ] + + async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]: + """Browse this provider's items. + + :param path: The path to browse, (e.g. provider_id://artists). + """ + self.logger.debug(f"Browsing path: {path}") + if not path.startswith(f"{self.domain}://"): + raise MusicAssistantError(f"Invalid path for {self.domain} provider: {path}") + path_parts = path.split("://", 1)[1].split("/") + self.logger.debug(f"Path parts: {path_parts}") + sub_path = path_parts[0] if path_parts else "" + sub_sub_path = path_parts[1] if len(path_parts) > 1 else "" + sub_sub_sub_path = path_parts[2] if len(path_parts) > 2 else "" + path_parts = [ + f"{self.domain}:/", + *[part for part in path_parts if len(part) > 0], + ] + + if sub_path == "": + return await self._get_menu() + elif sub_path == "categories" and sub_sub_path: + return await self._get_category(sub_sub_path) + elif sub_path == "collections" and sub_sub_path: + return await self._get_collection(sub_sub_path) + elif sub_path != "stations": + return await self._get_subpath_menu(sub_path) + elif sub_path == "stations": + return await self._get_station_schedule_menu( + self.show_local_stations, path_parts, sub_sub_path, sub_sub_sub_path + ) + else: + return [] + + async def search( + self, search_query: str, media_types: list[MediaType] | None, limit: int = 5 + ) -> SearchResults: + """Perform search for BBC Sounds stations.""" + results = SearchResults() + search_result = await self.client.streaming.search(search_query) + self.logger.debug(search_result) + if media_types is None or MediaType.RADIO in media_types: + radios = [await self.adaptor.new_object(radio) for radio in search_result.stations] + results.radio = [radio for radio in radios if isinstance(radio, Radio)] + if ( + media_types is None + or MediaType.TRACK in media_types + or MediaType.PODCAST_EPISODE in media_types + ): + episodes = [await self.adaptor.new_object(track) for track in search_result.episodes] + results.tracks = [track for track in episodes if type(track) is Track] + + if media_types is None or MediaType.PODCAST in media_types: + podcasts = [await self.adaptor.new_object(show) for show in search_result.shows] + results.podcasts = [podcast for podcast in podcasts if isinstance(podcast, Podcast)] + + return results + + @use_cache(expiration=_Constants.DEFAULT_EXPIRATION) + async def get_podcast(self, prov_podcast_id: str) -> Podcast: + """Get full podcast details by id.""" + self.logger.debug(f"Getting podcast for {prov_podcast_id}") + podcast = await self.client.streaming.get_podcast(pid=prov_podcast_id) + ma_podcast = await self.adaptor.new_object(source_obj=podcast, force_type=Podcast) + + if isinstance(ma_podcast, Podcast): + return ma_podcast + raise MusicAssistantError("Incorrect format for podcast") + + @use_cache(expiration=_Constants.SHORT_EXPIRATION) # type: ignore[arg-type] + async def get_podcast_episodes( # type: ignore[override] + self, + prov_podcast_id: str, + ) -> AsyncGenerator[PodcastEpisode, None]: + """Get all PodcastEpisodes for given podcast id.""" + podcast_episodes = await self.client.streaming.get_podcast_episodes(prov_podcast_id) + + if podcast_episodes: + for episode in podcast_episodes: + this_episode = await self.adaptor.new_object( + source_obj=episode, force_type=PodcastEpisode + ) + if this_episode and isinstance(this_episode, PodcastEpisode): + yield this_episode + + @use_cache(expiration=_Constants.SHORT_EXPIRATION) + async def recommendations(self) -> list[RecommendationFolder]: + """Get available recommendations.""" + folders = [] + + if self.client.auth.is_logged_in: + recommendations = await self.client.personal.get_experience_menu( + recommendations=MenuRecommendationOptions.ONLY + ) + self.logger.debug("Getting recommendations from API") + if recommendations.sub_items: + for recommendation in recommendations.sub_items: + # recommendation is a RecommendedMenuItem + folder = await self.adaptor.new_object( + recommendation, force_type=RecommendationFolder + ) + if isinstance(folder, RecommendationFolder): + folders.append(folder) + return folders + return [] + + async def get_radio(self, prov_radio_id: str) -> Radio: + """Get full radio details by id.""" + self.logger.debug(f"Getting radio for {prov_radio_id}") + station = await self.client.stations.get_station(prov_radio_id, include_stream=True) + if station: + ma_radio = await self.adaptor.new_object(station, force_type=Radio) + if ma_radio and isinstance(ma_radio, Radio): + return ma_radio + else: + raise MusicAssistantError(f"No station found: {prov_radio_id}") + + self.logger.debug(f"{station} {ma_radio} {type(ma_radio)}") + raise MusicAssistantError("No valid radio stream found") + + async def on_played( + self, + media_type: MediaType, + prov_item_id: str, + fully_played: bool, + position: int, + media_item: MediaItemType, + is_playing: bool = False, + ) -> None: + """Handle callback when a (playable) media item has been played.""" + if media_type != MediaType.RADIO: + # Handle Sounds API play status updates + action = None + + if is_playing: + action = PlayStatus.STARTED if position < 30 else PlayStatus.HEARTBEAT + elif fully_played: + action = PlayStatus.ENDED + else: + action = PlayStatus.PAUSED + + if action: + success = await self.client.streaming.update_play_status( + pid=media_item.item_id, elapsed_time=position, action=action + ) + self.logger.info(f"Updated play status: {success}") + # Cancel now playing task + if FEATURES["now_playing"] and not is_playing and self.current_task: + self.current_task.cancel() diff --git a/music_assistant/providers/bbc_sounds/adaptor.py b/music_assistant/providers/bbc_sounds/adaptor.py new file mode 100644 index 00000000..1365a8f6 --- /dev/null +++ b/music_assistant/providers/bbc_sounds/adaptor.py @@ -0,0 +1,875 @@ +"""Adaptor for converting BBC Sounds objects to Music Assistant media items. + +Many Sounds API endpoints return containers of "PlayableObjects" which can be a +range of different types. The auntie-sounds library detects these differing +types and provides a sensible set of objects to work with, e.g. RadioShow. + +This adaptor maps those objects to the most sensible type for MA. +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime, tzinfo +from typing import TYPE_CHECKING, Any + +from music_assistant_models.enums import ContentType, ImageType, MediaType, StreamType +from music_assistant_models.media_items import ( + AudioFormat, + BrowseFolder, + MediaItemChapter, + MediaItemImage, + MediaItemMetadata, + ProviderMapping, + Radio, + RecommendationFolder, + Track, +) +from music_assistant_models.media_items import Podcast as MAPodcast +from music_assistant_models.media_items import PodcastEpisode as MAPodcastEpisode +from music_assistant_models.streamdetails import StreamDetails, StreamMetadata +from music_assistant_models.unique_list import UniqueList +from sounds.models import ( + Category, + Collection, + LiveStation, + MenuItem, + Podcast, + PodcastEpisode, + RadioClip, + RadioSeries, + RadioShow, + RecommendedMenuItem, + Schedule, + SoundsTypes, + Station, + StationSearchResult, +) + +import music_assistant.helpers.datetime as dt +from music_assistant.helpers.datetime import LOCAL_TIMEZONE + +if TYPE_CHECKING: + from music_assistant.providers.bbc_sounds import BBCSoundsProvider + + +def _date_convertor( + timestamp: str | datetime, + date_format: str, + timezone: tzinfo | None = LOCAL_TIMEZONE, +) -> str: + if isinstance(timestamp, str): + timestamp = dt.from_iso_string(timestamp) + else: + timestamp = timestamp.astimezone(timezone) + return timestamp.strftime(date_format) + + +def _to_time(timestamp: str | datetime) -> str: + return _date_convertor(timestamp, "%H:%M") + + +def _to_date_and_time(timestamp: str | datetime) -> str: + return _date_convertor(timestamp, "%a %d %B %H:%M") + + +def _to_date(timestamp: str | datetime) -> str: + return _date_convertor(timestamp, "%d/%m/%y") + + +class ConversionError(Exception): + """Raised when object conversion fails.""" + + +class ImageProvider: + """Handles image URL resolution and MediaItemImage creation.""" + + # TODO: keeping this in for demo purposes + ICON_BASE_URL = ( + "https://cdn.jsdelivr.net/gh/kieranhogg/auntie-sounds@main/src/sounds/icons/solid" + ) + + ICON_MAPPING = { + "listen_live": "listen_live", + "continue_listening": "continue", + "editorial_collection": "editorial", + "local_rail": "my_location", + "single_item_promo": "featured", + "collections": "collections", + "categories": "categories", + "recommendations": "my_sounds", + "unmissable_speech": "speech", + "unmissable_music": "music", + } + + @classmethod + def get_icon_url(cls, icon_id: str) -> str | None: + """Get icon URL for a given icon ID.""" + if icon_id is not None: + if icon_id in cls.ICON_MAPPING: + return f"{cls.ICON_BASE_URL}/{cls.ICON_MAPPING[icon_id]}.png" + if "latest_playables_for_curation" in icon_id: + return f"{cls.ICON_BASE_URL}/news.png" + return None + + @classmethod + def create_image( + cls, url: str, provider: str, image_type: ImageType = ImageType.THUMB + ) -> MediaItemImage: + """Create a MediaItemImage from a URL.""" + return MediaItemImage( + path=url, + provider=provider, + type=image_type, + remotely_accessible=True, + ) + + @classmethod + def create_metadata_with_image( + cls, + url: str | None, + provider: str, + description: str | None = None, + chapters: list[MediaItemChapter] | None = None, + ) -> MediaItemMetadata: + """Create metadata with optional image and description.""" + metadata = MediaItemMetadata() + if url: + metadata.add_image(cls.create_image(url, provider)) + if description: + metadata.description = description + if chapters: + metadata.chapters = chapters + return metadata + + +@dataclass +class Context: + """Context information for object conversion.""" + + provider: "BBCSoundsProvider" + provider_domain: str + path_parts: list[str] | None = None + force_type: ( + type[Track] + | type[LiveStation] + | type[Radio] + | type[MAPodcast] + | type[MAPodcastEpisode] + | type[BrowseFolder] + | type[RecommendationFolder] + | type[RecommendedMenuItem] + | None + ) = None + + +class BaseConverter(ABC): + """Base model.""" + + def __init__(self, context: Context): + """Create a new instance.""" + self.context = context + self.logger = self.context.provider.logger + + @abstractmethod + def can_convert(self, source_obj: Any) -> bool: + """Check if this converter can handle the source object.""" + + @abstractmethod + async def get_stream_details(self, source_obj: Any) -> StreamDetails | None: + """Convert the source object to a stream.""" + + @abstractmethod + async def convert( + self, source_obj: Any + ) -> ( + Track + | LiveStation + | Radio + | MAPodcast + | MAPodcastEpisode + | BrowseFolder + | RecommendationFolder + | RecommendedMenuItem + ): + """Convert the source object to target type.""" + + def _create_provider_mapping(self, item_id: str) -> ProviderMapping: + """Create provider mapping for the item.""" + return self.context.provider._get_provider_mapping(item_id) + + def _get_attr(self, obj: Any, attr_path: str, default: Any = None) -> Any: + """Get (optionally-nested) attribute from object. + + Supports e.g. _get_attr(object, "thing.other_thing") + """ + # TODO: I'm fairly sure there is existing code/libs for this? + try: + current = obj + for part in attr_path.split("."): + if hasattr(current, part): + current = getattr(current, part) + elif isinstance(current, dict) and part in current: + current = current[part] + else: + return default + return current + except (AttributeError, KeyError, TypeError): + return default + + +class StationConverter(BaseConverter): + """Converts Station-related objects.""" + + type ConvertableTypes = Station | LiveStation | StationSearchResult + convertable_types = (Station, LiveStation, StationSearchResult) + + def can_convert(self, source_obj: ConvertableTypes) -> bool: + """Check if this converter can convert to a Station object.""" + return isinstance(source_obj, self.convertable_types) + + async def get_stream_details(self, source_obj: Station | LiveStation) -> StreamDetails | None: + """Convert the source object to a stream.""" + from music_assistant.providers.bbc_sounds import ( # noqa: PLC0415 + FEATURES, + _Constants, + ) + + # TODO: can't seek this stream + station = await self.convert(source_obj) + if not station or not source_obj.stream: + return None + show_time = self._get_attr(source_obj, "titles.secondary") + show_title = self._get_attr(source_obj, "titles.primary") + programme_name = f"{show_time} • {show_title}" + stream_details = None + if station and source_obj.stream: + if FEATURES["now_playing"]: + stream_metadata = StreamMetadata( + title=programme_name, + ) + + if station.image is not None: + stream_metadata.image_url = station.image.path + else: + stream_metadata = None + + stream_details = StreamDetails( + stream_metadata=stream_metadata, + media_type=MediaType.RADIO, + stream_type=StreamType.HLS + if self.context.provider.stream_format == _Constants.HLS + else StreamType.HTTP, + path=str(source_obj.stream), + item_id=station.item_id, + provider=station.provider, + audio_format=AudioFormat( + content_type=ContentType.try_parse(str(source_obj.stream)) + ), + data={ + "provider": self.context.provider_domain, + "station": station.item_id, + }, + ) + return stream_details + + async def convert(self, source_obj: ConvertableTypes) -> Radio: + """Convert the source object to target type.""" + if isinstance(source_obj, Station): + return self._convert_station(source_obj) + elif isinstance(source_obj, LiveStation): + return self._convert_live_station(source_obj) + elif isinstance(source_obj, StationSearchResult): + return self._convert_station_search_result(source_obj) + self.logger.error(f"Failed to convert station {type(source_obj)}: {source_obj}") + raise ConversionError(f"Failed to convert station {type(source_obj)}: {source_obj}") + + def _convert_station(self, station: Station) -> Radio: + """Convert Station object.""" + image_url = self._get_attr(station, "image_url") + + radio = Radio( + item_id=station.item_id, + # Add BBC prefix back to station to help identify station within MA + name=f"BBC {self._get_attr(station, 'title', 'Unknown')}", + provider=self.context.provider_domain, + metadata=ImageProvider.create_metadata_with_image( + image_url, self.context.provider_domain + ), + provider_mappings={self._create_provider_mapping(station.item_id)}, + ) + if station.stream: + radio.uri = station.stream.uri + return radio + + def _convert_live_station(self, station: LiveStation) -> Radio: + """Convert LiveStation object.""" + network_id = self._get_attr(station, "network.id") + name = self._get_attr(station, "network.short_title", "Unknown") + image_url = self._get_attr(station, "network.logo_url") + + return Radio( + item_id=network_id, + name=f"BBC {name}", + provider=self.context.provider_domain, + metadata=ImageProvider.create_metadata_with_image( + image_url, self.context.provider_domain + ), + provider_mappings={self._create_provider_mapping(network_id)}, + ) + + def _convert_station_search_result(self, station: StationSearchResult) -> Radio: + """Convert StationSearchResult object.""" + return Radio( + item_id=station.service_id, + name=f"BBC {station.station_name}", + provider=self.context.provider_domain, + metadata=ImageProvider.create_metadata_with_image( + station.station_image_url, self.context.provider_domain + ), + provider_mappings={self._create_provider_mapping(station.service_id)}, + ) + + +class PodcastConverter(BaseConverter): + """Converts podcast-related objects.""" + + type ConvertableTypes = Podcast | PodcastEpisode | RadioShow | RadioClip | RadioSeries + convertable_types = (Podcast, PodcastEpisode, RadioShow, RadioClip, RadioSeries) + type OutputTypes = MAPodcast | MAPodcastEpisode | Track + output_types = MAPodcast | MAPodcastEpisode | Track + SCHEDULE_ITEM_FORMAT = "{start} {show_name} • {show_title} ({date})" + SCHEDULE_ITEM_DEFAULT_FORMAT = "{show_name} • {show_title}" + PODCAST_EPISODE_DEFAULT_FORMAT = "{episode_title} ({date})" + PODCAST_EPISODE_DETAILED_FORMAT = "{episode_title} • {detail} ({date})" + + def _format_show_title(self, show: RadioShow) -> str: + if show is None: + return "Unknown show" + if show.start and show.titles: + return self.SCHEDULE_ITEM_FORMAT.format( + start=_to_time(show.start), + show_name=show.titles["primary"], + show_title=show.titles["secondary"], + date=_to_date(show.start), + ) + elif show.titles: + # TODO: when getting a schedule listing, we have a broadcast time + # when we fetch the streaming details later we lose that from the new API call + title = self.SCHEDULE_ITEM_DEFAULT_FORMAT.format( + show_name=show.titles["primary"], + show_title=show.titles["secondary"], + ) + date = show.release.get("date") if show.release else None + if date and isinstance(date, (str, datetime)): + title += f" ({_to_date(date)})" + return title + return "Unknown" + + def _format_podcast_episode_title(self, episode: PodcastEpisode) -> str: + # Similar to show, but not quite: we expect to see this in the context of a podcast detail + # page + if episode is None: + return "Unknown episode" + + if episode.release: + date = episode.release.get("date") + elif episode.availability: + date = episode.availability.get("from") + else: + date = None + if isinstance(date, (str, datetime)) and episode.titles: + datestamp = _to_date(date) + title = self.PODCAST_EPISODE_DEFAULT_FORMAT.format( + episode_title=episode.titles.get("secondary"), + date=datestamp, + ) + else: + title = str(episode.titles.get("secondary")) if episode.titles else "Unknown episode" + return title + + def can_convert(self, source_obj: ConvertableTypes) -> bool: + """Check if this converter can convert to a Podcast object.""" + # Can't use type alias here https://github.com/python/mypy/issues/11673 + if self.context.force_type: + return issubclass(self.context.force_type, self.output_types) + return isinstance(source_obj, self.convertable_types) + + async def get_stream_details(self, source_obj: ConvertableTypes) -> StreamDetails | None: + """Convert the source object to a stream.""" + from music_assistant.providers.bbc_sounds import _Constants # noqa: PLC0415 + + if isinstance(source_obj, (Podcast, RadioSeries)): + return None + stream_details = None + episode = await self.convert(source_obj) + if ( + episode + and isinstance(episode, MAPodcastEpisode) + and (episode.metadata.description or episode.name) + and source_obj.stream + ): + stream_details = StreamDetails( + stream_metadata=StreamMetadata( + title=episode.metadata.description or episode.name, + uri=source_obj.stream, + ), + media_type=MediaType.PODCAST_EPISODE, + stream_type=StreamType.HLS + if self.context.provider.stream_format == _Constants.HLS + else StreamType.HTTP, + path=source_obj.stream, + item_id=source_obj.id, + provider=self.context.provider_domain, + audio_format=AudioFormat(content_type=ContentType.try_parse(source_obj.stream)), + allow_seek=True, + can_seek=True, + duration=(episode.duration if episode.duration else None), + seek_position=(int(episode.position) if episode.position else 0), + seconds_streamed=(int(episode.position) if episode.position else 0), + ) + elif episode and isinstance(episode, Track) and source_obj.stream: + metadata = StreamMetadata( + title=f"BBC {episode.metadata.description}", uri=source_obj.stream + ) + if episode.metadata.images: + metadata.image_url = episode.metadata.images[0].path + + stream_details = StreamDetails( + stream_metadata=metadata, + media_type=MediaType.TRACK, + stream_type=StreamType.HLS + if self.context.provider.stream_format == _Constants.HLS + else StreamType.HTTP, + path=source_obj.stream, + item_id=episode.item_id, + provider=self.context.provider_domain, + audio_format=AudioFormat(content_type=ContentType.try_parse(source_obj.stream)), + can_seek=True, + duration=episode.duration, + ) + return stream_details + + async def convert(self, source_obj: ConvertableTypes) -> OutputTypes: + """Convert podcast objects.""" + if isinstance(source_obj, (Podcast, RadioSeries)) or self.context.force_type is Podcast: + return await self._convert_podcast(source_obj) + elif isinstance(source_obj, PodcastEpisode): + return await self._convert_podcast_episode(source_obj) + elif isinstance(source_obj, RadioShow): + return await self._convert_radio_show(source_obj) + elif isinstance(source_obj, RadioClip) or self.context.force_type is Track: + return await self._convert_radio_clip(source_obj) + return source_obj + + async def _convert_podcast(self, podcast: Podcast | RadioSeries) -> MAPodcast: + name = self._get_attr(podcast, "titles.primary") or self._get_attr(podcast, "title") + description = self._get_attr(podcast, "synopses.long") or self._get_attr( + podcast, "synopses.short" + ) + image_url = self._get_attr(podcast, "image_url") or self._get_attr( + podcast, "sub_items.image_url" + ) + + return MAPodcast( + item_id=podcast.id, + name=name, + provider=self.context.provider_domain, + metadata=ImageProvider.create_metadata_with_image( + image_url, self.context.provider_domain, description + ), + provider_mappings={self._create_provider_mapping(podcast.item_id)}, + ) + + async def _convert_podcast_episode(self, episode: PodcastEpisode) -> MAPodcastEpisode: + duration = self._get_attr(episode, "duration.value") + progress_ms = self._get_attr(episode, "progress.value") + resume_position = (progress_ms * 1000) if progress_ms else None + description = self._get_attr(episode, "synopses.short") + + # Handle parent podcast + podcast = None + if hasattr(episode, "container") and episode.container: + podcast = await PodcastConverter(self.context).convert(episode.container) + + if not podcast or not isinstance(podcast, MAPodcast): + raise ConversionError(f"No podcast for episode {episode}") + if not episode or not episode.pid: + raise ConversionError(f"No podcast episode for {episode}") + + return MAPodcastEpisode( + item_id=episode.pid, + name=self._format_podcast_episode_title(episode), + provider=self.context.provider_domain, + duration=duration, + position=0, + resume_position_ms=resume_position, + metadata=ImageProvider.create_metadata_with_image( + episode.image_url, + self.context.provider_domain, + description, + ), + podcast=podcast, + provider_mappings={self._create_provider_mapping(episode.pid)}, + uri=episode.stream, + ) + + async def _convert_radio_show(self, show: RadioShow) -> MAPodcastEpisode | Track: + from music_assistant.providers.bbc_sounds import _Constants # noqa: PLC0415 + + duration = self._get_attr(show, "duration.value") + progress_ms = self._get_attr(show, "progress.value") + resume_position = (progress_ms * 1000) if progress_ms else None + + if not show or not show.pid: + raise ConversionError(f"No radio show for {show}") + + # Determine if this should be an episode or track based on duration/context + # TODO: picked a sensible default but need to investigate if this makes sense + # Track example: latest BBC News, PodcastEpisode: latest episode of a radio show + if ( + self.context.force_type == Track + or ( + not self.context.force_type + and duration + and duration < _Constants.TRACK_DURATION_THRESHOLD + ) + or (not hasattr(show, "container") or not show.container) + ): + return Track( + item_id=show.pid, + name=self._format_show_title(show), + provider=self.context.provider_domain, + duration=duration, + metadata=ImageProvider.create_metadata_with_image( + url=show.image_url, + provider=self.context.provider_domain, + description=show.synopses.get("long") if show.synopses else None, + ), + provider_mappings={self._create_provider_mapping(show.pid)}, + ) + else: + # Handle as episode + podcast = None + if hasattr(show, "container") and show.container: + podcast = await PodcastConverter(self.context).convert(show.container) + + if not podcast or not isinstance(podcast, MAPodcast): + raise ConversionError(f"No podcast for episode for {show}") + + return MAPodcastEpisode( + item_id=show.pid, + name=self._format_show_title(show), + provider=self.context.provider_domain, + duration=duration, + resume_position_ms=resume_position, + metadata=ImageProvider.create_metadata_with_image( + show.image_url, self.context.provider_domain + ), + podcast=podcast, + provider_mappings={self._create_provider_mapping(show.pid)}, + position=1, + ) + + async def _convert_radio_clip(self, clip: RadioClip) -> Track | MAPodcastEpisode: + duration = self._get_attr(clip, "duration.value") + description = self._get_attr(clip, "network.short_title") + + if not clip or not clip.pid: + raise ConversionError(f"No clip for {clip}") + + if self.context.force_type is MAPodcastEpisode: + podcast = None + if hasattr(clip, "container") and clip.container: + podcast = await PodcastConverter(self.context).convert(clip.container) + + if not podcast or not isinstance(podcast, MAPodcast): + raise ConversionError(f"No podcast for episode for {clip}") + return MAPodcastEpisode( + item_id=clip.pid, + name=self._get_attr(clip, "titles.entity_title", "Unknown title"), + provider=self.context.provider_domain, + duration=duration, + metadata=ImageProvider.create_metadata_with_image( + clip.image_url, self.context.provider_domain, description + ), + provider_mappings={self._create_provider_mapping(clip.pid)}, + podcast=podcast, + position=0, + ) + else: + return Track( + item_id=clip.pid, + name=self._get_attr(clip, "titles.entity_title", "Unknown Track"), + provider=self.context.provider_domain, + duration=duration, + metadata=ImageProvider.create_metadata_with_image( + clip.image_url, self.context.provider_domain, description + ), + provider_mappings={self._create_provider_mapping(clip.pid)}, + ) + + +class BrowseConverter(BaseConverter): + """Converts browsable objects like menus, categories, collections.""" + + type ConvertableTypes = MenuItem | Category | Collection | Schedule | RecommendedMenuItem + convertable_types = (MenuItem, Category, Collection, Schedule, RecommendedMenuItem) + type OutputTypes = BrowseFolder | RecommendationFolder + output_types = (BrowseFolder, RecommendationFolder) + + def can_convert(self, source_obj: ConvertableTypes) -> bool: + """Check if this converter can convert to a Browsable object.""" + can_convert = False + if self.context.force_type: + can_convert = issubclass(self.context.force_type, self.output_types) + else: + can_convert = isinstance(source_obj, self.convertable_types) + return can_convert + + async def get_stream_details(self, source_obj: ConvertableTypes) -> StreamDetails | None: + """Convert the source object to a stream.""" + return None + + async def convert(self, source_obj: ConvertableTypes) -> OutputTypes: + """Convert browsable objects.""" + if isinstance(source_obj, MenuItem) and self.context.force_type is not RecommendationFolder: + return self._convert_menu_item(source_obj) + elif isinstance(source_obj, (Category, Collection)): + return self._convert_category_or_collection(source_obj) + elif isinstance(source_obj, Schedule): + return self._convert_schedule(source_obj) + elif isinstance(source_obj, RecommendedMenuItem): + return await self._convert_recommended_item(source_obj) + self.logger.error(f"Failed to convert browse object {type(source_obj)}: {source_obj}") + raise ConversionError(f"Browse conversion failed: {source_obj}") + + def _convert_menu_item(self, item: MenuItem) -> BrowseFolder | RecommendationFolder: + """Convert MenuItem to BrowseFolder or RecommendationFolder.""" + image_url = ImageProvider.get_icon_url(item.item_id) + image = ( + ImageProvider.create_image(image_url, self.context.provider_domain) + if image_url + else None + ) + if not item or not item.title: + raise ConversionError(f"No menu item {item}") + path = self._build_path(item.item_id) + + return_type = BrowseFolder + + if self.context.force_type is RecommendationFolder: + return_type = RecommendationFolder + + return return_type( + item_id=item.item_id, + name=item.title, + provider=self.context.provider_domain, + path=path, + image=image, + ) + + def _convert_category_or_collection(self, item: Category | Collection) -> BrowseFolder: + """Convert Category or Collection to BrowseFolder.""" + path_prefix = "categories" if isinstance(item, Category) else "collections" + path = f"{self.context.provider_domain}://{path_prefix}/{item.item_id}" + + return BrowseFolder( + item_id=item.item_id, + name=self._get_attr(item, "titles.primary", "Untitled folder"), + provider=self.context.provider_domain, + path=path, + image=( + ImageProvider.create_image(item.image_url, self.context.provider_domain) + if item.image_url + else None + ), + ) + + def _convert_schedule(self, schedule: Schedule) -> BrowseFolder: + """Convert Schedule to BrowseFolder.""" + return BrowseFolder( + item_id="schedule", + name="Schedule", + provider=self.context.provider_domain, + path=self._build_path("schedule"), + ) + + async def _convert_recommended_item(self, item: RecommendedMenuItem) -> RecommendationFolder: + """Convert RecommendedMenuItem to RecommendationFolder.""" + if not item or not item.sub_items or not item.title: + raise ConversionError(f"Incorrect format for item {item}") + + # TODO this is messy + new_adaptor = Adaptor(provider=self.context.provider) + items: list[Track | Radio | MAPodcast | MAPodcastEpisode | BrowseFolder] = [] + for sub_item in item.sub_items: + new_item = await new_adaptor.new_object(sub_item) + if ( + new_item is not None + and not isinstance(new_item, RecommendationFolder) + and not isinstance(new_item, RecommendedMenuItem) + ): + items.append(new_item) + + return RecommendationFolder( + item_id=item.item_id, + name=item.title, + provider=self.context.provider_domain, + items=UniqueList(items), + ) + + def _build_path(self, item_id: str) -> str: + """Build path for browse items.""" + if self.context.path_parts: + return "/".join([*self.context.path_parts, item_id]) + return f"{self.context.provider_domain}://{item_id}" + + +class Adaptor: + """An adaptor object to convert Sounds API objects into MA ones.""" + + def __init__(self, provider: "BBCSoundsProvider"): + """Create new adaptor.""" + self.provider = provider + self.logger = self.provider.logger + self._converters: list[BaseConverter] = [] + + def _create_context( + self, + path_parts: list[str] | None = None, + force_type: ( + type[Track] + | type[Any] + | type[Radio] + | type[Podcast] + | type[PodcastEpisode] + | type[BrowseFolder] + | type[RecommendationFolder] + | None + ) = None, + ) -> Context: + return Context( + provider=self.provider, + provider_domain=self.provider.domain, + path_parts=path_parts, + force_type=force_type, + ) + + async def new_streamable_object( + self, + source_obj: SoundsTypes, + force_type: type[Track] | type[Radio] | type[MAPodcastEpisode] | None = None, + path_parts: list[str] | None = None, + ) -> StreamDetails | None: + """ + Convert an auntie-sounds object to appropriate Music Assistant object. + + Args: + source_obj: The source object from Sounds API via auntie-sounds + force_type: Force conversion to specific type if the expected target type is known + path_parts: Path parts for browse items to construct the object's path + + Returns: + Converted Music Assistant media item or None if no converter found + """ + if source_obj is None: + return None + + context = self._create_context(path_parts, force_type) + + converters = [ + StationConverter(context), + PodcastConverter(context), + BrowseConverter(context), + ] + + for converter in converters: + if converter.can_convert(source_obj): + try: + stream_details = await converter.get_stream_details(source_obj) + self.provider.logger.debug( + f"Successfully converted {type(source_obj).__name__}" + f" to {type(stream_details).__name__}" + ) + return stream_details + except Exception as e: + self.provider.logger.error( + f"Unexpected error in converter {type(converter).__name__}: {e}" + ) + raise + self.provider.logger.warning( + f"No stream converter found for type {type(source_obj).__name__}" + ) + return None + + async def new_object( + self, + source_obj: SoundsTypes, + force_type: ( + type[ + Track + | Radio + | MAPodcast + | MAPodcastEpisode + | BrowseFolder + | RecommendationFolder + | RecommendedMenuItem + ] + | None + ) = None, + path_parts: list[str] | None = None, + ) -> ( + Track + | Radio + | MAPodcast + | MAPodcastEpisode + | BrowseFolder + | RecommendationFolder + | RecommendedMenuItem + | None + ): + """ + Convert an auntie-sounds object to appropriate Music Assistant object. + + Args: + source_obj: The source object from Sounds API via auntie-sounds + force_type: Force conversion to specific type if the expected target type is known + path_parts: Path parts for browse items to construct the object's path + + Returns: + Converted Music Assistant media item or None if no converter found + """ + if source_obj is None: + return None + + context = self._create_context(path_parts, force_type) + + converters = [ + StationConverter(context), + PodcastConverter(context), + BrowseConverter(context), + ] + for converter in converters: + self.logger.debug(f"Checking if converter {converter} can convert {type(source_obj)}") + if converter.can_convert(source_obj): + try: + result = await converter.convert(source_obj) + if context.force_type: + assert type(result) is context.force_type, ( + f"Forced type to {context.force_type} but received {type(result)} " + f"using {type(converter)}" + ) + self.provider.logger.debug( + f"Successfully converted {type(source_obj).__name__}" + f" to {type(result).__name__} {result}" + ) + return result + except Exception as e: + self.provider.logger.error( + f"Unexpected error in converter {type(converter).__name__}: {e}" + ) + raise + self.logger.debug(f"Converter {converter} could not convert {type(source_obj)}") + + self.logger.warning(f"No converter found for type {type(source_obj).__name__}") + return None diff --git a/music_assistant/providers/bbc_sounds/icon.svg b/music_assistant/providers/bbc_sounds/icon.svg new file mode 100644 index 00000000..52799557 --- /dev/null +++ b/music_assistant/providers/bbc_sounds/icon.svg @@ -0,0 +1,11 @@ + + + + + + + + + + + diff --git a/music_assistant/providers/bbc_sounds/manifest.json b/music_assistant/providers/bbc_sounds/manifest.json new file mode 100644 index 00000000..d96de02f --- /dev/null +++ b/music_assistant/providers/bbc_sounds/manifest.json @@ -0,0 +1,14 @@ +{ + "type": "music", + "domain": "bbc_sounds", + "stage": "beta", + "name": "BBC Sounds", + "description": "Stream live BBC radio shows, podcast series and on-demand audio.", + "codeowners": [ + "@kieranhogg" + ], + "requirements": [ + "auntie-sounds==1.1.1" + ], + "multi_instance": false +} diff --git a/requirements_all.txt b/requirements_all.txt index 96085f7d..d5d850a7 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -18,6 +18,7 @@ aiovban>=0.6.3 alexapy==1.29.10 async-upnp-client==0.45.0 audible==0.10.0 +auntie-sounds==1.1.1 bidict==0.23.1 certifi==2025.11.12 chardet>=5.2.0