--- /dev/null
+"""
+BBC Sounds music provider support for MusicAssistant.
+
+TODO implement seeking of live stream
+TODO watch for settings change
+TODO add podcast menu to non-UK menu
+FIXME skipping in non-live radio shows restarts the stream but keeps the seek time
+"""
+
+from __future__ import annotations
+
+import asyncio
+from collections.abc import AsyncGenerator
+from datetime import timedelta
+from typing import TYPE_CHECKING, Literal
+
+from music_assistant_models.config_entries import (
+ ConfigEntry,
+ ConfigValueOption,
+ ConfigValueType,
+ ProviderConfig,
+)
+from music_assistant_models.enums import ConfigEntryType, ImageType, MediaType, ProviderFeature
+from music_assistant_models.errors import LoginFailed, MusicAssistantError
+from music_assistant_models.media_items import (
+ BrowseFolder,
+ ItemMapping,
+ MediaItemImage,
+ MediaItemMetadata,
+ MediaItemType,
+ Podcast,
+ PodcastEpisode,
+ ProviderMapping,
+ Radio,
+ RecommendationFolder,
+ SearchResults,
+ Track,
+)
+from music_assistant_models.unique_list import UniqueList
+
+import music_assistant.helpers.datetime as dt
+from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
+from music_assistant.controllers.cache import use_cache
+from music_assistant.helpers.datetime import LOCAL_TIMEZONE
+from music_assistant.models.music_provider import MusicProvider
+from music_assistant.providers.bbc_sounds.adaptor import Adaptor
+
+if TYPE_CHECKING:
+ from collections.abc import Sequence
+
+ from music_assistant_models.provider import ProviderManifest
+ from music_assistant_models.streamdetails import StreamDetails
+ from sounds.models import SoundsTypes
+
+ from music_assistant.mass import MusicAssistant
+ from music_assistant.models import ProviderInstanceType
+
+from sounds import (
+ Container,
+ LiveStation,
+ Menu,
+ MenuRecommendationOptions,
+ PlayStatus,
+ SoundsClient,
+ exceptions,
+)
+
+SUPPORTED_FEATURES = {
+ ProviderFeature.BROWSE,
+ ProviderFeature.RECOMMENDATIONS,
+ ProviderFeature.SEARCH,
+}
+
+FEATURES = {"now_playing": False, "catchup_segments": False, "check_blank_image": False}
+
+type _StreamTypes = Literal["hls", "dash"]
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Create new provider instance."""
+ instance = BBCSoundsProvider(mass, manifest, config, SUPPORTED_FEATURES)
+ await instance.handle_async_init()
+ return instance
+
+
+async def get_config_entries(
+ mass: MusicAssistant,
+ instance_id: str | None = None,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to setup this provider.
+
+ instance_id: id of an existing provider instance (None if new instance setup).
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ # ruff: noqa: ARG001
+
+ return (
+ ConfigEntry(
+ key=_Constants.CONF_INTRO,
+ type=ConfigEntryType.LABEL,
+ label="A BBC Sounds account is optional, but some UK-only content may not work without"
+ " it",
+ ),
+ ConfigEntry(
+ key=CONF_USERNAME,
+ type=ConfigEntryType.STRING,
+ label="Email or username",
+ required=False,
+ ),
+ ConfigEntry(
+ key=CONF_PASSWORD,
+ type=ConfigEntryType.SECURE_STRING,
+ label="Password",
+ required=False,
+ ),
+ ConfigEntry(
+ key=_Constants.CONF_SHOW_LOCAL,
+ category="advanced",
+ type=ConfigEntryType.BOOLEAN,
+ label="Show local radio stations?",
+ default_value=False,
+ ),
+ ConfigEntry(
+ key=_Constants.CONF_STREAM_FORMAT,
+ category="advanced",
+ label="Preferred stream format",
+ type=ConfigEntryType.STRING,
+ options=[
+ ConfigValueOption(
+ "HLS",
+ _Constants.CONF_STREAM_FORMAT_HLS,
+ ),
+ ConfigValueOption(
+ "MPEG-DASH",
+ _Constants.CONF_STREAM_FORMAT_DASH,
+ ),
+ ],
+ default_value=_Constants.CONF_STREAM_FORMAT_HLS,
+ ),
+ )
+
+
+class _Constants:
+ # This is the image id that is shown when there's no track image
+ BLANK_IMAGE_NAME: str = "p0bqcdzf"
+ DEFAULT_IMAGE_SIZE = 1280
+ TRACK_DURATION_THRESHOLD: int = 300 # 5 minutes
+ NOW_PLAYING_REFRESH_TIME: int = 5
+ HLS: Literal["hls"] = "hls"
+ DASH: Literal["dash"] = "dash"
+ CONF_SHOW_LOCAL: str = "show_local"
+ CONF_INTRO: str = "intro"
+ CONF_STREAM_FORMAT: str = "stream_format"
+ CONF_STREAM_FORMAT_HLS: str = HLS
+ CONF_STREAM_FORMAT_DASH: str = DASH
+ DEFAULT_EXPIRATION = 60 * 60 * 24 * 30 # 30 days
+ SHORT_EXPIRATION = 60 * 60 * 3 # 3 hours
+
+
+class BBCSoundsProvider(MusicProvider):
+ """A MusicProvider class to interact with the BBC Sounds API via auntie-sounds."""
+
+ client: SoundsClient
+ menu: Menu | None = None
+ current_task: asyncio.Task[None] | None = None
+
+ async def handle_async_init(self) -> None:
+ """Handle async initialization of the provider."""
+ self.client = SoundsClient(
+ session=self.mass.http_session,
+ logger=self.logger,
+ timezone=LOCAL_TIMEZONE,
+ )
+
+ self.show_local_stations: bool = bool(
+ self.config.get_value(_Constants.CONF_SHOW_LOCAL, False)
+ )
+ self.stream_format: _StreamTypes = (
+ _Constants.DASH
+ if self.config.get_value(_Constants.CONF_STREAM_FORMAT) == _Constants.DASH
+ else _Constants.HLS
+ )
+ self.adaptor = Adaptor(self)
+
+ # If we have an account, authenticate. Testing shows all features work without auth
+ # but BBC will be disabling BBC Sounds from outside the UK at some point
+ if self.config.get_value(CONF_USERNAME) and self.config.get_value(CONF_PASSWORD):
+ if self.client.auth.is_logged_in:
+ # Check if we need to reauth
+ try:
+ await self.client.personal.get_experience_menu()
+ return
+ except (exceptions.UnauthorisedError, exceptions.APIResponseError):
+ await self.client.auth.renew_session()
+
+ try:
+ await self.client.auth.authenticate(
+ username=str(self.config.get_value(CONF_USERNAME)),
+ password=str(self.config.get_value(CONF_PASSWORD)),
+ )
+ except exceptions.LoginFailedError as e:
+ raise LoginFailed(e)
+
+ async def loaded_in_mass(self) -> None:
+ """Do post-loaded actions."""
+ if not self.menu or (
+ isinstance(self.menu, Menu) and self.menu.sub_items and len(self.menu.sub_items) == 0
+ ):
+ await self._fetch_menu()
+
+ def _get_provider_mapping(self, item_id: str) -> ProviderMapping:
+ return ProviderMapping(
+ item_id=item_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+
+ async def _fetch_menu(self) -> None:
+ self.logger.debug("No cached menu, fetching from API")
+ self.menu = await self.client.personal.get_experience_menu(
+ recommendations=MenuRecommendationOptions.EXCLUDE
+ )
+
+ def _stream_error(self, item_id: str, media_type: MediaType) -> MusicAssistantError:
+ return MusicAssistantError(f"Couldn't get stream details for {item_id} ({media_type})")
+
+ @property
+ def is_streaming_provider(self) -> bool:
+ """Return True as the provider is a streaming provider."""
+ return True
+
+ @use_cache(expiration=_Constants.DEFAULT_EXPIRATION)
+ async def get_track(self, prov_track_id: str) -> Track:
+ """Get full track details by id."""
+ episode_info = await self.client.streaming.get_by_pid(
+ pid=prov_track_id, stream_format=self.stream_format
+ )
+ track = await self.adaptor.new_object(episode_info, force_type=Track)
+ if not isinstance(track, Track):
+ raise MusicAssistantError(f"Incorrect track returned for {prov_track_id}")
+ return track
+
+ @use_cache(expiration=_Constants.DEFAULT_EXPIRATION)
+ async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
+ # If we are requesting a previously-aired radio show, we lose access to the
+ # schedule time. The best we can find out from the API is original release
+ # date, so the stream title loses access to the air date
+ """Get full podcast episode details by id."""
+ self.logger.debug(f"Getting podcast episode for {prov_episode_id}")
+ episode = await self.client.streaming.get_podcast_episode(prov_episode_id)
+ ma_episode = await self.adaptor.new_object(episode, force_type=PodcastEpisode)
+ if not isinstance(ma_episode, PodcastEpisode):
+ raise MusicAssistantError(f"Incorrect format for podcast episode {prov_episode_id}")
+ return ma_episode
+
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
+ """Get streamdetails for a track/radio."""
+ self.logger.debug(f"Getting stream details for {item_id} ({media_type})")
+ if media_type == MediaType.PODCAST_EPISODE:
+ episode_info = await self.client.streaming.get_by_pid(
+ item_id, include_stream=True, stream_format=self.stream_format
+ )
+ stream_details = await self.adaptor.new_streamable_object(episode_info)
+ if not stream_details:
+ raise self._stream_error(item_id, media_type)
+
+ # Hide behind a feature flag until it can be better tested
+ if episode_info and FEATURES["catchup_segments"]:
+ # .item_id is the VPID
+ self.current_task = self.mass.create_task(
+ self._check_for_segments(item_id, stream_details)
+ )
+ return stream_details
+ elif media_type is MediaType.TRACK:
+ track = await self.client.streaming.get_by_pid(
+ item_id, include_stream=True, stream_format=self.stream_format
+ )
+ stream_details = await self.adaptor.new_streamable_object(track)
+ if not stream_details:
+ raise self._stream_error(item_id, media_type)
+ self.current_task = self.mass.create_task(
+ self._check_for_segments(item_id, stream_details)
+ )
+ return stream_details
+ else:
+ self.logger.debug(f"Getting stream details for station {item_id}")
+ station = await self.client.stations.get_station(
+ item_id, include_stream=True, stream_format=self.stream_format
+ )
+ if not station:
+ raise MusicAssistantError(f"Couldn't get stream details for station {item_id}")
+
+ self.logger.debug(f"Found station: {station}")
+ if not station.stream:
+ raise MusicAssistantError(f"No stream found for {item_id}")
+
+ stream_details = await self.adaptor.new_streamable_object(station)
+
+ if not stream_details:
+ raise self._stream_error(item_id, media_type)
+
+ # Start a background task to keep these details updated
+ if FEATURES["now_playing"]:
+ self.current_task = self.mass.create_task(
+ self._watch_stream_details(stream_details)
+ )
+ return stream_details
+
+ async def _check_for_segments(self, vpid: str, stream_details: StreamDetails) -> None:
+ # seeking past the current segment needs fixing
+ segments = await self.client.streaming.get_show_segments(vpid)
+ offset = stream_details.seek_position + (stream_details.seconds_streamed or 0)
+ if segments:
+ seconds = 0 + offset
+ segments_iter = iter(segments)
+ segment = next(segments_iter)
+ if seconds > 0:
+ # Skip to the correct segment
+ prev = None
+ while seconds > segment.offset["start"]:
+ self.logger.info("Advancing to next segment")
+ prev = segment
+ segment = next(segments_iter)
+ self.logger.warning("Starting with first segment")
+ if prev and seconds > prev.offset["start"] and seconds < prev.offset["end"]:
+ if stream_details.stream_metadata:
+ stream_details.stream_metadata.artist = prev.titles["primary"]
+ stream_details.stream_metadata.title = prev.titles["secondary"]
+ if prev.image_url:
+ stream_details.stream_metadata.image_url = prev.image_url
+ while True:
+ if seconds == segment.offset["start"] and stream_details.stream_metadata:
+ self.logger.warning("Updating segment")
+ stream_details.stream_metadata.artist = segment.titles["primary"]
+ stream_details.stream_metadata.title = segment.titles["secondary"]
+ if segment.image_url:
+ stream_details.stream_metadata.image_url = segment.image_url
+ segment = next(segments_iter)
+ await asyncio.sleep(1)
+ seconds += 1
+ else:
+ self.logger.warning("No segments found")
+
+ async def _watch_stream_details(self, stream_details: StreamDetails) -> None:
+ station_id = stream_details.data["station"]
+
+ while True:
+ if not stream_details.stream_metadata:
+ await asyncio.sleep(_Constants.NOW_PLAYING_REFRESH_TIME)
+ continue
+
+ now_playing = await self.client.schedules.currently_playing_song(
+ station_id, image_size=_Constants.DEFAULT_IMAGE_SIZE
+ )
+
+ if now_playing and stream_details.stream_metadata:
+ self.logger.debug(f"Now playing for {station_id}: {now_playing}")
+
+ # removed check temporarily as images not working
+ if not FEATURES["check_blank_image"] or (
+ now_playing.image_url
+ and _Constants.BLANK_IMAGE_NAME not in now_playing.image_url
+ ):
+ stream_details.stream_metadata.image_url = now_playing.image_url
+ song = now_playing.titles["secondary"]
+ artist = now_playing.titles["primary"]
+ stream_details.stream_metadata.title = song
+ stream_details.stream_metadata.artist = artist
+ elif stream_details.stream_metadata:
+ station = await self.client.stations.get_station(station_id=station_id)
+ if station:
+ self.logger.debug(f"Station details: {station}")
+ display = self._station_programme_display(station)
+ if display:
+ stream_details.stream_metadata.title = display
+ stream_details.stream_metadata.artist = None
+ stream_details.stream_metadata.image_url = station.image_url
+ await asyncio.sleep(_Constants.NOW_PLAYING_REFRESH_TIME)
+
+ def _station_programme_display(self, station: LiveStation) -> str | None:
+ if station and station.titles:
+ return f"{station.titles.get('secondary')} • {station.titles.get('primary')}"
+ return None
+
+ async def _station_list(self, include_local: bool = False) -> list[Radio]:
+ return [
+ Radio(
+ item_id=station.item_id,
+ name=(
+ station.network.short_title
+ if station.network and station.network.short_title
+ else "Unknown station"
+ ),
+ provider=self.domain,
+ metadata=MediaItemMetadata(
+ description=self._station_programme_display(station=station),
+ images=(
+ UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ provider=self.domain,
+ path=station.network.logo_url,
+ remotely_accessible=True,
+ ),
+ ]
+ )
+ if station.network and station.network.logo_url
+ else None
+ ),
+ ),
+ provider_mappings={
+ ProviderMapping(
+ item_id=station.item_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+ for station in await self.client.stations.get_stations(include_local=include_local)
+ if station and station.item_id
+ ]
+
+ async def _get_category(
+ self, category_name: str
+ ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
+ category = await self.client.streaming.get_category(category=category_name)
+
+ if category is not None and category.sub_items:
+ return [
+ obj
+ for obj in [await self._render_browse_item(item) for item in category.sub_items]
+ if obj is not None
+ ]
+ else:
+ return []
+
+ async def _get_collection(
+ self, pid: str
+ ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
+ collection = await self.client.streaming.get_collection(pid=pid)
+ if collection and collection.sub_items:
+ return [
+ obj
+ for obj in [
+ await self._render_browse_item(item) for item in collection.sub_items if item
+ ]
+ if obj
+ ]
+ else:
+ return []
+
+ async def _get_menu(
+ self, path_parts: list[str] | None = None
+ ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
+ if self.client.auth.is_logged_in and await self.client.auth.is_uk_listener:
+ return await self._get_full_menu(path_parts=path_parts)
+ else:
+ return await self._get_slim_menu(path_parts=path_parts)
+
+ async def _get_full_menu(
+ self, path_parts: list[str] | None = None
+ ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
+ if not self.menu or not self.menu.sub_items:
+ raise MusicAssistantError("Menu API response is empty or invalid")
+ menu_items = []
+ for item in self.menu.sub_items:
+ new_item = await self._render_browse_item(item, path_parts)
+ if isinstance(new_item, (MediaItemType | ItemMapping | BrowseFolder)):
+ menu_items.append(new_item)
+
+ # The Sounds default menu doesn't include listings as they are linked elsewhere
+ menu_items.insert(
+ 1,
+ BrowseFolder(
+ item_id="stations",
+ provider=self.domain,
+ name="Schedule and Programmes",
+ path=f"{self.domain}://stations",
+ image=MediaItemImage(
+ path="https://cdn.jsdelivr.net/gh/kieranhogg/auntie-sounds@main/src/sounds/icons/solid/latest.png",
+ remotely_accessible=True,
+ provider=self.domain,
+ type=ImageType.THUMB,
+ ),
+ ),
+ )
+ return menu_items
+
+ async def _get_slim_menu(
+ self, path_parts: list[str] | None
+ ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
+ return [
+ BrowseFolder(
+ item_id="listen_live",
+ provider=self.domain,
+ name="Listen Live",
+ path=f"{self.domain}://listen_live",
+ image=MediaItemImage(
+ path="https://cdn.jsdelivr.net/gh/kieranhogg/auntie-sounds@main/src/sounds/icons/solid/listen_live.png",
+ remotely_accessible=True,
+ provider=self.domain,
+ type=ImageType.THUMB,
+ ),
+ ),
+ BrowseFolder(
+ item_id="stations",
+ provider=self.domain,
+ name="Schedules and Programmes",
+ path=f"{self.domain}://stations",
+ image=MediaItemImage(
+ path="https://cdn.jsdelivr.net/gh/kieranhogg/auntie-sounds@main/src/sounds/icons/solid/latest.png",
+ remotely_accessible=True,
+ provider=self.domain,
+ type=ImageType.THUMB,
+ ),
+ ),
+ ]
+
+ async def _render_browse_item(
+ self,
+ item: SoundsTypes,
+ path_parts: list[str] | None = None,
+ ) -> BrowseFolder | Track | Podcast | PodcastEpisode | RecommendationFolder | Radio | None:
+ new_item = await self.adaptor.new_object(item, path_parts=path_parts)
+ if isinstance(
+ new_item,
+ (BrowseFolder | Track | Podcast | PodcastEpisode | RecommendationFolder | Radio),
+ ):
+ return new_item
+ else:
+ return None
+
+ async def _get_subpath_menu(
+ self, sub_path: str
+ ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
+ if not self.menu:
+ return []
+ sub_menu = self.menu.get(sub_path)
+ item_list = []
+
+ if sub_menu and isinstance(sub_menu, Container):
+ if sub_menu.sub_items:
+ # We have some sub-items, so let's show those
+ for item in sub_menu.sub_items:
+ new_item = await self._render_browse_item(item)
+ if new_item:
+ item_list.append(new_item)
+ else:
+ new_item = await self._render_browse_item(sub_menu)
+ if new_item:
+ item_list.append(new_item)
+
+ if sub_path == "listen_live":
+ for item in await self.client.stations.get_stations():
+ new_item = await self._render_browse_item(item)
+ if new_item:
+ item_list.append(new_item)
+ # Check if we need to append local stations
+ if self.show_local_stations:
+ for item in await self.client.stations.get_local_stations():
+ new_item = await self._render_browse_item(item)
+ if new_item is not None:
+ item_list.append(new_item)
+ return item_list
+
+ async def _get_station_schedule_menu(
+ self,
+ show_local: bool,
+ path_parts: list[str],
+ sub_sub_path: str,
+ sub_sub_sub_path: str,
+ ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
+ if sub_sub_sub_path:
+ # Lookup a date schedule
+ self.logger.debug(
+ await self.client.schedules.get_schedule(
+ station_id=sub_sub_path,
+ date=sub_sub_sub_path,
+ )
+ )
+ schedule = await self.client.schedules.get_schedule(
+ station_id=sub_sub_path,
+ date=sub_sub_sub_path,
+ )
+ items = []
+ if schedule and schedule.sub_items:
+ for folder in schedule.sub_items:
+ new_folder = await self._render_browse_item(folder, path_parts=path_parts)
+ if new_folder:
+ items.append(new_folder)
+ return items
+ elif sub_sub_path:
+ # Date listings for a station
+ date_folders = [
+ BrowseFolder(
+ item_id="today",
+ name="Today",
+ provider=self.domain,
+ path="/".join([*path_parts, dt.now().strftime("%Y-%m-%d")]),
+ ),
+ BrowseFolder(
+ item_id="yesterday",
+ name="Yesterday",
+ provider=self.domain,
+ path="/".join(
+ [
+ *path_parts,
+ (dt.now() - timedelta(days=1)).strftime("%Y-%m-%d"),
+ ]
+ ),
+ ),
+ ]
+ # Maximum is 30 days prior
+ for diff in range(28):
+ this_date = dt.now() - timedelta(days=2 + diff)
+ date_string = this_date.strftime("%Y-%m-%d")
+ date_folders.extend(
+ [
+ BrowseFolder(
+ item_id=date_string,
+ name=date_string,
+ provider=self.domain,
+ path="/".join([*path_parts, date_string]),
+ )
+ ]
+ )
+ return date_folders
+ else:
+ return [
+ BrowseFolder(
+ item_id=station.item_id,
+ provider=self.domain,
+ name=station.name,
+ path="/".join([*path_parts, station.item_id]),
+ image=(
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=station.metadata.images[0].path,
+ provider=self.domain,
+ )
+ if station.metadata.images
+ else None
+ ),
+ )
+ for station in await self._station_list(include_local=show_local)
+ ]
+
+ async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
+ """Browse this provider's items.
+
+ :param path: The path to browse, (e.g. provider_id://artists).
+ """
+ self.logger.debug(f"Browsing path: {path}")
+ if not path.startswith(f"{self.domain}://"):
+ raise MusicAssistantError(f"Invalid path for {self.domain} provider: {path}")
+ path_parts = path.split("://", 1)[1].split("/")
+ self.logger.debug(f"Path parts: {path_parts}")
+ sub_path = path_parts[0] if path_parts else ""
+ sub_sub_path = path_parts[1] if len(path_parts) > 1 else ""
+ sub_sub_sub_path = path_parts[2] if len(path_parts) > 2 else ""
+ path_parts = [
+ f"{self.domain}:/",
+ *[part for part in path_parts if len(part) > 0],
+ ]
+
+ if sub_path == "":
+ return await self._get_menu()
+ elif sub_path == "categories" and sub_sub_path:
+ return await self._get_category(sub_sub_path)
+ elif sub_path == "collections" and sub_sub_path:
+ return await self._get_collection(sub_sub_path)
+ elif sub_path != "stations":
+ return await self._get_subpath_menu(sub_path)
+ elif sub_path == "stations":
+ return await self._get_station_schedule_menu(
+ self.show_local_stations, path_parts, sub_sub_path, sub_sub_sub_path
+ )
+ else:
+ return []
+
+ async def search(
+ self, search_query: str, media_types: list[MediaType] | None, limit: int = 5
+ ) -> SearchResults:
+ """Perform search for BBC Sounds stations."""
+ results = SearchResults()
+ search_result = await self.client.streaming.search(search_query)
+ self.logger.debug(search_result)
+ if media_types is None or MediaType.RADIO in media_types:
+ radios = [await self.adaptor.new_object(radio) for radio in search_result.stations]
+ results.radio = [radio for radio in radios if isinstance(radio, Radio)]
+ if (
+ media_types is None
+ or MediaType.TRACK in media_types
+ or MediaType.PODCAST_EPISODE in media_types
+ ):
+ episodes = [await self.adaptor.new_object(track) for track in search_result.episodes]
+ results.tracks = [track for track in episodes if type(track) is Track]
+
+ if media_types is None or MediaType.PODCAST in media_types:
+ podcasts = [await self.adaptor.new_object(show) for show in search_result.shows]
+ results.podcasts = [podcast for podcast in podcasts if isinstance(podcast, Podcast)]
+
+ return results
+
+ @use_cache(expiration=_Constants.DEFAULT_EXPIRATION)
+ async def get_podcast(self, prov_podcast_id: str) -> Podcast:
+ """Get full podcast details by id."""
+ self.logger.debug(f"Getting podcast for {prov_podcast_id}")
+ podcast = await self.client.streaming.get_podcast(pid=prov_podcast_id)
+ ma_podcast = await self.adaptor.new_object(source_obj=podcast, force_type=Podcast)
+
+ if isinstance(ma_podcast, Podcast):
+ return ma_podcast
+ raise MusicAssistantError("Incorrect format for podcast")
+
+ @use_cache(expiration=_Constants.SHORT_EXPIRATION) # type: ignore[arg-type]
+ async def get_podcast_episodes( # type: ignore[override]
+ self,
+ prov_podcast_id: str,
+ ) -> AsyncGenerator[PodcastEpisode, None]:
+ """Get all PodcastEpisodes for given podcast id."""
+ podcast_episodes = await self.client.streaming.get_podcast_episodes(prov_podcast_id)
+
+ if podcast_episodes:
+ for episode in podcast_episodes:
+ this_episode = await self.adaptor.new_object(
+ source_obj=episode, force_type=PodcastEpisode
+ )
+ if this_episode and isinstance(this_episode, PodcastEpisode):
+ yield this_episode
+
+ @use_cache(expiration=_Constants.SHORT_EXPIRATION)
+ async def recommendations(self) -> list[RecommendationFolder]:
+ """Get available recommendations."""
+ folders = []
+
+ if self.client.auth.is_logged_in:
+ recommendations = await self.client.personal.get_experience_menu(
+ recommendations=MenuRecommendationOptions.ONLY
+ )
+ self.logger.debug("Getting recommendations from API")
+ if recommendations.sub_items:
+ for recommendation in recommendations.sub_items:
+ # recommendation is a RecommendedMenuItem
+ folder = await self.adaptor.new_object(
+ recommendation, force_type=RecommendationFolder
+ )
+ if isinstance(folder, RecommendationFolder):
+ folders.append(folder)
+ return folders
+ return []
+
+ async def get_radio(self, prov_radio_id: str) -> Radio:
+ """Get full radio details by id."""
+ self.logger.debug(f"Getting radio for {prov_radio_id}")
+ station = await self.client.stations.get_station(prov_radio_id, include_stream=True)
+ if station:
+ ma_radio = await self.adaptor.new_object(station, force_type=Radio)
+ if ma_radio and isinstance(ma_radio, Radio):
+ return ma_radio
+ else:
+ raise MusicAssistantError(f"No station found: {prov_radio_id}")
+
+ self.logger.debug(f"{station} {ma_radio} {type(ma_radio)}")
+ raise MusicAssistantError("No valid radio stream found")
+
+ async def on_played(
+ self,
+ media_type: MediaType,
+ prov_item_id: str,
+ fully_played: bool,
+ position: int,
+ media_item: MediaItemType,
+ is_playing: bool = False,
+ ) -> None:
+ """Handle callback when a (playable) media item has been played."""
+ if media_type != MediaType.RADIO:
+ # Handle Sounds API play status updates
+ action = None
+
+ if is_playing:
+ action = PlayStatus.STARTED if position < 30 else PlayStatus.HEARTBEAT
+ elif fully_played:
+ action = PlayStatus.ENDED
+ else:
+ action = PlayStatus.PAUSED
+
+ if action:
+ success = await self.client.streaming.update_play_status(
+ pid=media_item.item_id, elapsed_time=position, action=action
+ )
+ self.logger.info(f"Updated play status: {success}")
+ # Cancel now playing task
+ if FEATURES["now_playing"] and not is_playing and self.current_task:
+ self.current_task.cancel()
--- /dev/null
+"""Adaptor for converting BBC Sounds objects to Music Assistant media items.
+
+Many Sounds API endpoints return containers of "PlayableObjects" which can be a
+range of different types. The auntie-sounds library detects these differing
+types and provides a sensible set of objects to work with, e.g. RadioShow.
+
+This adaptor maps those objects to the most sensible type for MA.
+"""
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from datetime import datetime, tzinfo
+from typing import TYPE_CHECKING, Any
+
+from music_assistant_models.enums import ContentType, ImageType, MediaType, StreamType
+from music_assistant_models.media_items import (
+ AudioFormat,
+ BrowseFolder,
+ MediaItemChapter,
+ MediaItemImage,
+ MediaItemMetadata,
+ ProviderMapping,
+ Radio,
+ RecommendationFolder,
+ Track,
+)
+from music_assistant_models.media_items import Podcast as MAPodcast
+from music_assistant_models.media_items import PodcastEpisode as MAPodcastEpisode
+from music_assistant_models.streamdetails import StreamDetails, StreamMetadata
+from music_assistant_models.unique_list import UniqueList
+from sounds.models import (
+ Category,
+ Collection,
+ LiveStation,
+ MenuItem,
+ Podcast,
+ PodcastEpisode,
+ RadioClip,
+ RadioSeries,
+ RadioShow,
+ RecommendedMenuItem,
+ Schedule,
+ SoundsTypes,
+ Station,
+ StationSearchResult,
+)
+
+import music_assistant.helpers.datetime as dt
+from music_assistant.helpers.datetime import LOCAL_TIMEZONE
+
+if TYPE_CHECKING:
+ from music_assistant.providers.bbc_sounds import BBCSoundsProvider
+
+
+def _date_convertor(
+ timestamp: str | datetime,
+ date_format: str,
+ timezone: tzinfo | None = LOCAL_TIMEZONE,
+) -> str:
+ if isinstance(timestamp, str):
+ timestamp = dt.from_iso_string(timestamp)
+ else:
+ timestamp = timestamp.astimezone(timezone)
+ return timestamp.strftime(date_format)
+
+
+def _to_time(timestamp: str | datetime) -> str:
+ return _date_convertor(timestamp, "%H:%M")
+
+
+def _to_date_and_time(timestamp: str | datetime) -> str:
+ return _date_convertor(timestamp, "%a %d %B %H:%M")
+
+
+def _to_date(timestamp: str | datetime) -> str:
+ return _date_convertor(timestamp, "%d/%m/%y")
+
+
+class ConversionError(Exception):
+ """Raised when object conversion fails."""
+
+
+class ImageProvider:
+ """Handles image URL resolution and MediaItemImage creation."""
+
+ # TODO: keeping this in for demo purposes
+ ICON_BASE_URL = (
+ "https://cdn.jsdelivr.net/gh/kieranhogg/auntie-sounds@main/src/sounds/icons/solid"
+ )
+
+ ICON_MAPPING = {
+ "listen_live": "listen_live",
+ "continue_listening": "continue",
+ "editorial_collection": "editorial",
+ "local_rail": "my_location",
+ "single_item_promo": "featured",
+ "collections": "collections",
+ "categories": "categories",
+ "recommendations": "my_sounds",
+ "unmissable_speech": "speech",
+ "unmissable_music": "music",
+ }
+
+ @classmethod
+ def get_icon_url(cls, icon_id: str) -> str | None:
+ """Get icon URL for a given icon ID."""
+ if icon_id is not None:
+ if icon_id in cls.ICON_MAPPING:
+ return f"{cls.ICON_BASE_URL}/{cls.ICON_MAPPING[icon_id]}.png"
+ if "latest_playables_for_curation" in icon_id:
+ return f"{cls.ICON_BASE_URL}/news.png"
+ return None
+
+ @classmethod
+ def create_image(
+ cls, url: str, provider: str, image_type: ImageType = ImageType.THUMB
+ ) -> MediaItemImage:
+ """Create a MediaItemImage from a URL."""
+ return MediaItemImage(
+ path=url,
+ provider=provider,
+ type=image_type,
+ remotely_accessible=True,
+ )
+
+ @classmethod
+ def create_metadata_with_image(
+ cls,
+ url: str | None,
+ provider: str,
+ description: str | None = None,
+ chapters: list[MediaItemChapter] | None = None,
+ ) -> MediaItemMetadata:
+ """Create metadata with optional image and description."""
+ metadata = MediaItemMetadata()
+ if url:
+ metadata.add_image(cls.create_image(url, provider))
+ if description:
+ metadata.description = description
+ if chapters:
+ metadata.chapters = chapters
+ return metadata
+
+
+@dataclass
+class Context:
+ """Context information for object conversion."""
+
+ provider: "BBCSoundsProvider"
+ provider_domain: str
+ path_parts: list[str] | None = None
+ force_type: (
+ type[Track]
+ | type[LiveStation]
+ | type[Radio]
+ | type[MAPodcast]
+ | type[MAPodcastEpisode]
+ | type[BrowseFolder]
+ | type[RecommendationFolder]
+ | type[RecommendedMenuItem]
+ | None
+ ) = None
+
+
+class BaseConverter(ABC):
+ """Base model."""
+
+ def __init__(self, context: Context):
+ """Create a new instance."""
+ self.context = context
+ self.logger = self.context.provider.logger
+
+ @abstractmethod
+ def can_convert(self, source_obj: Any) -> bool:
+ """Check if this converter can handle the source object."""
+
+ @abstractmethod
+ async def get_stream_details(self, source_obj: Any) -> StreamDetails | None:
+ """Convert the source object to a stream."""
+
+ @abstractmethod
+ async def convert(
+ self, source_obj: Any
+ ) -> (
+ Track
+ | LiveStation
+ | Radio
+ | MAPodcast
+ | MAPodcastEpisode
+ | BrowseFolder
+ | RecommendationFolder
+ | RecommendedMenuItem
+ ):
+ """Convert the source object to target type."""
+
+ def _create_provider_mapping(self, item_id: str) -> ProviderMapping:
+ """Create provider mapping for the item."""
+ return self.context.provider._get_provider_mapping(item_id)
+
+ def _get_attr(self, obj: Any, attr_path: str, default: Any = None) -> Any:
+ """Get (optionally-nested) attribute from object.
+
+ Supports e.g. _get_attr(object, "thing.other_thing")
+ """
+ # TODO: I'm fairly sure there is existing code/libs for this?
+ try:
+ current = obj
+ for part in attr_path.split("."):
+ if hasattr(current, part):
+ current = getattr(current, part)
+ elif isinstance(current, dict) and part in current:
+ current = current[part]
+ else:
+ return default
+ return current
+ except (AttributeError, KeyError, TypeError):
+ return default
+
+
+class StationConverter(BaseConverter):
+ """Converts Station-related objects."""
+
+ type ConvertableTypes = Station | LiveStation | StationSearchResult
+ convertable_types = (Station, LiveStation, StationSearchResult)
+
+ def can_convert(self, source_obj: ConvertableTypes) -> bool:
+ """Check if this converter can convert to a Station object."""
+ return isinstance(source_obj, self.convertable_types)
+
+ async def get_stream_details(self, source_obj: Station | LiveStation) -> StreamDetails | None:
+ """Convert the source object to a stream."""
+ from music_assistant.providers.bbc_sounds import ( # noqa: PLC0415
+ FEATURES,
+ _Constants,
+ )
+
+ # TODO: can't seek this stream
+ station = await self.convert(source_obj)
+ if not station or not source_obj.stream:
+ return None
+ show_time = self._get_attr(source_obj, "titles.secondary")
+ show_title = self._get_attr(source_obj, "titles.primary")
+ programme_name = f"{show_time} • {show_title}"
+ stream_details = None
+ if station and source_obj.stream:
+ if FEATURES["now_playing"]:
+ stream_metadata = StreamMetadata(
+ title=programme_name,
+ )
+
+ if station.image is not None:
+ stream_metadata.image_url = station.image.path
+ else:
+ stream_metadata = None
+
+ stream_details = StreamDetails(
+ stream_metadata=stream_metadata,
+ media_type=MediaType.RADIO,
+ stream_type=StreamType.HLS
+ if self.context.provider.stream_format == _Constants.HLS
+ else StreamType.HTTP,
+ path=str(source_obj.stream),
+ item_id=station.item_id,
+ provider=station.provider,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(str(source_obj.stream))
+ ),
+ data={
+ "provider": self.context.provider_domain,
+ "station": station.item_id,
+ },
+ )
+ return stream_details
+
+ async def convert(self, source_obj: ConvertableTypes) -> Radio:
+ """Convert the source object to target type."""
+ if isinstance(source_obj, Station):
+ return self._convert_station(source_obj)
+ elif isinstance(source_obj, LiveStation):
+ return self._convert_live_station(source_obj)
+ elif isinstance(source_obj, StationSearchResult):
+ return self._convert_station_search_result(source_obj)
+ self.logger.error(f"Failed to convert station {type(source_obj)}: {source_obj}")
+ raise ConversionError(f"Failed to convert station {type(source_obj)}: {source_obj}")
+
+ def _convert_station(self, station: Station) -> Radio:
+ """Convert Station object."""
+ image_url = self._get_attr(station, "image_url")
+
+ radio = Radio(
+ item_id=station.item_id,
+ # Add BBC prefix back to station to help identify station within MA
+ name=f"BBC {self._get_attr(station, 'title', 'Unknown')}",
+ provider=self.context.provider_domain,
+ metadata=ImageProvider.create_metadata_with_image(
+ image_url, self.context.provider_domain
+ ),
+ provider_mappings={self._create_provider_mapping(station.item_id)},
+ )
+ if station.stream:
+ radio.uri = station.stream.uri
+ return radio
+
+ def _convert_live_station(self, station: LiveStation) -> Radio:
+ """Convert LiveStation object."""
+ network_id = self._get_attr(station, "network.id")
+ name = self._get_attr(station, "network.short_title", "Unknown")
+ image_url = self._get_attr(station, "network.logo_url")
+
+ return Radio(
+ item_id=network_id,
+ name=f"BBC {name}",
+ provider=self.context.provider_domain,
+ metadata=ImageProvider.create_metadata_with_image(
+ image_url, self.context.provider_domain
+ ),
+ provider_mappings={self._create_provider_mapping(network_id)},
+ )
+
+ def _convert_station_search_result(self, station: StationSearchResult) -> Radio:
+ """Convert StationSearchResult object."""
+ return Radio(
+ item_id=station.service_id,
+ name=f"BBC {station.station_name}",
+ provider=self.context.provider_domain,
+ metadata=ImageProvider.create_metadata_with_image(
+ station.station_image_url, self.context.provider_domain
+ ),
+ provider_mappings={self._create_provider_mapping(station.service_id)},
+ )
+
+
+class PodcastConverter(BaseConverter):
+ """Converts podcast-related objects."""
+
+ type ConvertableTypes = Podcast | PodcastEpisode | RadioShow | RadioClip | RadioSeries
+ convertable_types = (Podcast, PodcastEpisode, RadioShow, RadioClip, RadioSeries)
+ type OutputTypes = MAPodcast | MAPodcastEpisode | Track
+ output_types = MAPodcast | MAPodcastEpisode | Track
+ SCHEDULE_ITEM_FORMAT = "{start} {show_name} • {show_title} ({date})"
+ SCHEDULE_ITEM_DEFAULT_FORMAT = "{show_name} • {show_title}"
+ PODCAST_EPISODE_DEFAULT_FORMAT = "{episode_title} ({date})"
+ PODCAST_EPISODE_DETAILED_FORMAT = "{episode_title} • {detail} ({date})"
+
+ def _format_show_title(self, show: RadioShow) -> str:
+ if show is None:
+ return "Unknown show"
+ if show.start and show.titles:
+ return self.SCHEDULE_ITEM_FORMAT.format(
+ start=_to_time(show.start),
+ show_name=show.titles["primary"],
+ show_title=show.titles["secondary"],
+ date=_to_date(show.start),
+ )
+ elif show.titles:
+ # TODO: when getting a schedule listing, we have a broadcast time
+ # when we fetch the streaming details later we lose that from the new API call
+ title = self.SCHEDULE_ITEM_DEFAULT_FORMAT.format(
+ show_name=show.titles["primary"],
+ show_title=show.titles["secondary"],
+ )
+ date = show.release.get("date") if show.release else None
+ if date and isinstance(date, (str, datetime)):
+ title += f" ({_to_date(date)})"
+ return title
+ return "Unknown"
+
+ def _format_podcast_episode_title(self, episode: PodcastEpisode) -> str:
+ # Similar to show, but not quite: we expect to see this in the context of a podcast detail
+ # page
+ if episode is None:
+ return "Unknown episode"
+
+ if episode.release:
+ date = episode.release.get("date")
+ elif episode.availability:
+ date = episode.availability.get("from")
+ else:
+ date = None
+ if isinstance(date, (str, datetime)) and episode.titles:
+ datestamp = _to_date(date)
+ title = self.PODCAST_EPISODE_DEFAULT_FORMAT.format(
+ episode_title=episode.titles.get("secondary"),
+ date=datestamp,
+ )
+ else:
+ title = str(episode.titles.get("secondary")) if episode.titles else "Unknown episode"
+ return title
+
+ def can_convert(self, source_obj: ConvertableTypes) -> bool:
+ """Check if this converter can convert to a Podcast object."""
+ # Can't use type alias here https://github.com/python/mypy/issues/11673
+ if self.context.force_type:
+ return issubclass(self.context.force_type, self.output_types)
+ return isinstance(source_obj, self.convertable_types)
+
+ async def get_stream_details(self, source_obj: ConvertableTypes) -> StreamDetails | None:
+ """Convert the source object to a stream."""
+ from music_assistant.providers.bbc_sounds import _Constants # noqa: PLC0415
+
+ if isinstance(source_obj, (Podcast, RadioSeries)):
+ return None
+ stream_details = None
+ episode = await self.convert(source_obj)
+ if (
+ episode
+ and isinstance(episode, MAPodcastEpisode)
+ and (episode.metadata.description or episode.name)
+ and source_obj.stream
+ ):
+ stream_details = StreamDetails(
+ stream_metadata=StreamMetadata(
+ title=episode.metadata.description or episode.name,
+ uri=source_obj.stream,
+ ),
+ media_type=MediaType.PODCAST_EPISODE,
+ stream_type=StreamType.HLS
+ if self.context.provider.stream_format == _Constants.HLS
+ else StreamType.HTTP,
+ path=source_obj.stream,
+ item_id=source_obj.id,
+ provider=self.context.provider_domain,
+ audio_format=AudioFormat(content_type=ContentType.try_parse(source_obj.stream)),
+ allow_seek=True,
+ can_seek=True,
+ duration=(episode.duration if episode.duration else None),
+ seek_position=(int(episode.position) if episode.position else 0),
+ seconds_streamed=(int(episode.position) if episode.position else 0),
+ )
+ elif episode and isinstance(episode, Track) and source_obj.stream:
+ metadata = StreamMetadata(
+ title=f"BBC {episode.metadata.description}", uri=source_obj.stream
+ )
+ if episode.metadata.images:
+ metadata.image_url = episode.metadata.images[0].path
+
+ stream_details = StreamDetails(
+ stream_metadata=metadata,
+ media_type=MediaType.TRACK,
+ stream_type=StreamType.HLS
+ if self.context.provider.stream_format == _Constants.HLS
+ else StreamType.HTTP,
+ path=source_obj.stream,
+ item_id=episode.item_id,
+ provider=self.context.provider_domain,
+ audio_format=AudioFormat(content_type=ContentType.try_parse(source_obj.stream)),
+ can_seek=True,
+ duration=episode.duration,
+ )
+ return stream_details
+
+ async def convert(self, source_obj: ConvertableTypes) -> OutputTypes:
+ """Convert podcast objects."""
+ if isinstance(source_obj, (Podcast, RadioSeries)) or self.context.force_type is Podcast:
+ return await self._convert_podcast(source_obj)
+ elif isinstance(source_obj, PodcastEpisode):
+ return await self._convert_podcast_episode(source_obj)
+ elif isinstance(source_obj, RadioShow):
+ return await self._convert_radio_show(source_obj)
+ elif isinstance(source_obj, RadioClip) or self.context.force_type is Track:
+ return await self._convert_radio_clip(source_obj)
+ return source_obj
+
+ async def _convert_podcast(self, podcast: Podcast | RadioSeries) -> MAPodcast:
+ name = self._get_attr(podcast, "titles.primary") or self._get_attr(podcast, "title")
+ description = self._get_attr(podcast, "synopses.long") or self._get_attr(
+ podcast, "synopses.short"
+ )
+ image_url = self._get_attr(podcast, "image_url") or self._get_attr(
+ podcast, "sub_items.image_url"
+ )
+
+ return MAPodcast(
+ item_id=podcast.id,
+ name=name,
+ provider=self.context.provider_domain,
+ metadata=ImageProvider.create_metadata_with_image(
+ image_url, self.context.provider_domain, description
+ ),
+ provider_mappings={self._create_provider_mapping(podcast.item_id)},
+ )
+
+ async def _convert_podcast_episode(self, episode: PodcastEpisode) -> MAPodcastEpisode:
+ duration = self._get_attr(episode, "duration.value")
+ progress_ms = self._get_attr(episode, "progress.value")
+ resume_position = (progress_ms * 1000) if progress_ms else None
+ description = self._get_attr(episode, "synopses.short")
+
+ # Handle parent podcast
+ podcast = None
+ if hasattr(episode, "container") and episode.container:
+ podcast = await PodcastConverter(self.context).convert(episode.container)
+
+ if not podcast or not isinstance(podcast, MAPodcast):
+ raise ConversionError(f"No podcast for episode {episode}")
+ if not episode or not episode.pid:
+ raise ConversionError(f"No podcast episode for {episode}")
+
+ return MAPodcastEpisode(
+ item_id=episode.pid,
+ name=self._format_podcast_episode_title(episode),
+ provider=self.context.provider_domain,
+ duration=duration,
+ position=0,
+ resume_position_ms=resume_position,
+ metadata=ImageProvider.create_metadata_with_image(
+ episode.image_url,
+ self.context.provider_domain,
+ description,
+ ),
+ podcast=podcast,
+ provider_mappings={self._create_provider_mapping(episode.pid)},
+ uri=episode.stream,
+ )
+
+ async def _convert_radio_show(self, show: RadioShow) -> MAPodcastEpisode | Track:
+ from music_assistant.providers.bbc_sounds import _Constants # noqa: PLC0415
+
+ duration = self._get_attr(show, "duration.value")
+ progress_ms = self._get_attr(show, "progress.value")
+ resume_position = (progress_ms * 1000) if progress_ms else None
+
+ if not show or not show.pid:
+ raise ConversionError(f"No radio show for {show}")
+
+ # Determine if this should be an episode or track based on duration/context
+ # TODO: picked a sensible default but need to investigate if this makes sense
+ # Track example: latest BBC News, PodcastEpisode: latest episode of a radio show
+ if (
+ self.context.force_type == Track
+ or (
+ not self.context.force_type
+ and duration
+ and duration < _Constants.TRACK_DURATION_THRESHOLD
+ )
+ or (not hasattr(show, "container") or not show.container)
+ ):
+ return Track(
+ item_id=show.pid,
+ name=self._format_show_title(show),
+ provider=self.context.provider_domain,
+ duration=duration,
+ metadata=ImageProvider.create_metadata_with_image(
+ url=show.image_url,
+ provider=self.context.provider_domain,
+ description=show.synopses.get("long") if show.synopses else None,
+ ),
+ provider_mappings={self._create_provider_mapping(show.pid)},
+ )
+ else:
+ # Handle as episode
+ podcast = None
+ if hasattr(show, "container") and show.container:
+ podcast = await PodcastConverter(self.context).convert(show.container)
+
+ if not podcast or not isinstance(podcast, MAPodcast):
+ raise ConversionError(f"No podcast for episode for {show}")
+
+ return MAPodcastEpisode(
+ item_id=show.pid,
+ name=self._format_show_title(show),
+ provider=self.context.provider_domain,
+ duration=duration,
+ resume_position_ms=resume_position,
+ metadata=ImageProvider.create_metadata_with_image(
+ show.image_url, self.context.provider_domain
+ ),
+ podcast=podcast,
+ provider_mappings={self._create_provider_mapping(show.pid)},
+ position=1,
+ )
+
+ async def _convert_radio_clip(self, clip: RadioClip) -> Track | MAPodcastEpisode:
+ duration = self._get_attr(clip, "duration.value")
+ description = self._get_attr(clip, "network.short_title")
+
+ if not clip or not clip.pid:
+ raise ConversionError(f"No clip for {clip}")
+
+ if self.context.force_type is MAPodcastEpisode:
+ podcast = None
+ if hasattr(clip, "container") and clip.container:
+ podcast = await PodcastConverter(self.context).convert(clip.container)
+
+ if not podcast or not isinstance(podcast, MAPodcast):
+ raise ConversionError(f"No podcast for episode for {clip}")
+ return MAPodcastEpisode(
+ item_id=clip.pid,
+ name=self._get_attr(clip, "titles.entity_title", "Unknown title"),
+ provider=self.context.provider_domain,
+ duration=duration,
+ metadata=ImageProvider.create_metadata_with_image(
+ clip.image_url, self.context.provider_domain, description
+ ),
+ provider_mappings={self._create_provider_mapping(clip.pid)},
+ podcast=podcast,
+ position=0,
+ )
+ else:
+ return Track(
+ item_id=clip.pid,
+ name=self._get_attr(clip, "titles.entity_title", "Unknown Track"),
+ provider=self.context.provider_domain,
+ duration=duration,
+ metadata=ImageProvider.create_metadata_with_image(
+ clip.image_url, self.context.provider_domain, description
+ ),
+ provider_mappings={self._create_provider_mapping(clip.pid)},
+ )
+
+
+class BrowseConverter(BaseConverter):
+ """Converts browsable objects like menus, categories, collections."""
+
+ type ConvertableTypes = MenuItem | Category | Collection | Schedule | RecommendedMenuItem
+ convertable_types = (MenuItem, Category, Collection, Schedule, RecommendedMenuItem)
+ type OutputTypes = BrowseFolder | RecommendationFolder
+ output_types = (BrowseFolder, RecommendationFolder)
+
+ def can_convert(self, source_obj: ConvertableTypes) -> bool:
+ """Check if this converter can convert to a Browsable object."""
+ can_convert = False
+ if self.context.force_type:
+ can_convert = issubclass(self.context.force_type, self.output_types)
+ else:
+ can_convert = isinstance(source_obj, self.convertable_types)
+ return can_convert
+
+ async def get_stream_details(self, source_obj: ConvertableTypes) -> StreamDetails | None:
+ """Convert the source object to a stream."""
+ return None
+
+ async def convert(self, source_obj: ConvertableTypes) -> OutputTypes:
+ """Convert browsable objects."""
+ if isinstance(source_obj, MenuItem) and self.context.force_type is not RecommendationFolder:
+ return self._convert_menu_item(source_obj)
+ elif isinstance(source_obj, (Category, Collection)):
+ return self._convert_category_or_collection(source_obj)
+ elif isinstance(source_obj, Schedule):
+ return self._convert_schedule(source_obj)
+ elif isinstance(source_obj, RecommendedMenuItem):
+ return await self._convert_recommended_item(source_obj)
+ self.logger.error(f"Failed to convert browse object {type(source_obj)}: {source_obj}")
+ raise ConversionError(f"Browse conversion failed: {source_obj}")
+
+ def _convert_menu_item(self, item: MenuItem) -> BrowseFolder | RecommendationFolder:
+ """Convert MenuItem to BrowseFolder or RecommendationFolder."""
+ image_url = ImageProvider.get_icon_url(item.item_id)
+ image = (
+ ImageProvider.create_image(image_url, self.context.provider_domain)
+ if image_url
+ else None
+ )
+ if not item or not item.title:
+ raise ConversionError(f"No menu item {item}")
+ path = self._build_path(item.item_id)
+
+ return_type = BrowseFolder
+
+ if self.context.force_type is RecommendationFolder:
+ return_type = RecommendationFolder
+
+ return return_type(
+ item_id=item.item_id,
+ name=item.title,
+ provider=self.context.provider_domain,
+ path=path,
+ image=image,
+ )
+
+ def _convert_category_or_collection(self, item: Category | Collection) -> BrowseFolder:
+ """Convert Category or Collection to BrowseFolder."""
+ path_prefix = "categories" if isinstance(item, Category) else "collections"
+ path = f"{self.context.provider_domain}://{path_prefix}/{item.item_id}"
+
+ return BrowseFolder(
+ item_id=item.item_id,
+ name=self._get_attr(item, "titles.primary", "Untitled folder"),
+ provider=self.context.provider_domain,
+ path=path,
+ image=(
+ ImageProvider.create_image(item.image_url, self.context.provider_domain)
+ if item.image_url
+ else None
+ ),
+ )
+
+ def _convert_schedule(self, schedule: Schedule) -> BrowseFolder:
+ """Convert Schedule to BrowseFolder."""
+ return BrowseFolder(
+ item_id="schedule",
+ name="Schedule",
+ provider=self.context.provider_domain,
+ path=self._build_path("schedule"),
+ )
+
+ async def _convert_recommended_item(self, item: RecommendedMenuItem) -> RecommendationFolder:
+ """Convert RecommendedMenuItem to RecommendationFolder."""
+ if not item or not item.sub_items or not item.title:
+ raise ConversionError(f"Incorrect format for item {item}")
+
+ # TODO this is messy
+ new_adaptor = Adaptor(provider=self.context.provider)
+ items: list[Track | Radio | MAPodcast | MAPodcastEpisode | BrowseFolder] = []
+ for sub_item in item.sub_items:
+ new_item = await new_adaptor.new_object(sub_item)
+ if (
+ new_item is not None
+ and not isinstance(new_item, RecommendationFolder)
+ and not isinstance(new_item, RecommendedMenuItem)
+ ):
+ items.append(new_item)
+
+ return RecommendationFolder(
+ item_id=item.item_id,
+ name=item.title,
+ provider=self.context.provider_domain,
+ items=UniqueList(items),
+ )
+
+ def _build_path(self, item_id: str) -> str:
+ """Build path for browse items."""
+ if self.context.path_parts:
+ return "/".join([*self.context.path_parts, item_id])
+ return f"{self.context.provider_domain}://{item_id}"
+
+
+class Adaptor:
+ """An adaptor object to convert Sounds API objects into MA ones."""
+
+ def __init__(self, provider: "BBCSoundsProvider"):
+ """Create new adaptor."""
+ self.provider = provider
+ self.logger = self.provider.logger
+ self._converters: list[BaseConverter] = []
+
+ def _create_context(
+ self,
+ path_parts: list[str] | None = None,
+ force_type: (
+ type[Track]
+ | type[Any]
+ | type[Radio]
+ | type[Podcast]
+ | type[PodcastEpisode]
+ | type[BrowseFolder]
+ | type[RecommendationFolder]
+ | None
+ ) = None,
+ ) -> Context:
+ return Context(
+ provider=self.provider,
+ provider_domain=self.provider.domain,
+ path_parts=path_parts,
+ force_type=force_type,
+ )
+
+ async def new_streamable_object(
+ self,
+ source_obj: SoundsTypes,
+ force_type: type[Track] | type[Radio] | type[MAPodcastEpisode] | None = None,
+ path_parts: list[str] | None = None,
+ ) -> StreamDetails | None:
+ """
+ Convert an auntie-sounds object to appropriate Music Assistant object.
+
+ Args:
+ source_obj: The source object from Sounds API via auntie-sounds
+ force_type: Force conversion to specific type if the expected target type is known
+ path_parts: Path parts for browse items to construct the object's path
+
+ Returns:
+ Converted Music Assistant media item or None if no converter found
+ """
+ if source_obj is None:
+ return None
+
+ context = self._create_context(path_parts, force_type)
+
+ converters = [
+ StationConverter(context),
+ PodcastConverter(context),
+ BrowseConverter(context),
+ ]
+
+ for converter in converters:
+ if converter.can_convert(source_obj):
+ try:
+ stream_details = await converter.get_stream_details(source_obj)
+ self.provider.logger.debug(
+ f"Successfully converted {type(source_obj).__name__}"
+ f" to {type(stream_details).__name__}"
+ )
+ return stream_details
+ except Exception as e:
+ self.provider.logger.error(
+ f"Unexpected error in converter {type(converter).__name__}: {e}"
+ )
+ raise
+ self.provider.logger.warning(
+ f"No stream converter found for type {type(source_obj).__name__}"
+ )
+ return None
+
+ async def new_object(
+ self,
+ source_obj: SoundsTypes,
+ force_type: (
+ type[
+ Track
+ | Radio
+ | MAPodcast
+ | MAPodcastEpisode
+ | BrowseFolder
+ | RecommendationFolder
+ | RecommendedMenuItem
+ ]
+ | None
+ ) = None,
+ path_parts: list[str] | None = None,
+ ) -> (
+ Track
+ | Radio
+ | MAPodcast
+ | MAPodcastEpisode
+ | BrowseFolder
+ | RecommendationFolder
+ | RecommendedMenuItem
+ | None
+ ):
+ """
+ Convert an auntie-sounds object to appropriate Music Assistant object.
+
+ Args:
+ source_obj: The source object from Sounds API via auntie-sounds
+ force_type: Force conversion to specific type if the expected target type is known
+ path_parts: Path parts for browse items to construct the object's path
+
+ Returns:
+ Converted Music Assistant media item or None if no converter found
+ """
+ if source_obj is None:
+ return None
+
+ context = self._create_context(path_parts, force_type)
+
+ converters = [
+ StationConverter(context),
+ PodcastConverter(context),
+ BrowseConverter(context),
+ ]
+ for converter in converters:
+ self.logger.debug(f"Checking if converter {converter} can convert {type(source_obj)}")
+ if converter.can_convert(source_obj):
+ try:
+ result = await converter.convert(source_obj)
+ if context.force_type:
+ assert type(result) is context.force_type, (
+ f"Forced type to {context.force_type} but received {type(result)} "
+ f"using {type(converter)}"
+ )
+ self.provider.logger.debug(
+ f"Successfully converted {type(source_obj).__name__}"
+ f" to {type(result).__name__} {result}"
+ )
+ return result
+ except Exception as e:
+ self.provider.logger.error(
+ f"Unexpected error in converter {type(converter).__name__}: {e}"
+ )
+ raise
+ self.logger.debug(f"Converter {converter} could not convert {type(source_obj)}")
+
+ self.logger.warning(f"No converter found for type {type(source_obj).__name__}")
+ return None