From 6f7b1628f9fd9b658f7622de377c0e910ad68bbd Mon Sep 17 00:00:00 2001 From: OzGav Date: Sun, 14 Sep 2025 23:59:37 +1000 Subject: [PATCH] Split Radio Paradise provider (#2394) --- .../providers/radioparadise/__init__.py | 495 +----------------- .../providers/radioparadise/constants.py | 62 +++ .../providers/radioparadise/helpers.py | 149 ++++++ .../providers/radioparadise/parsers.py | 98 ++++ .../providers/radioparadise/provider.py | 222 ++++++++ 5 files changed, 533 insertions(+), 493 deletions(-) create mode 100644 music_assistant/providers/radioparadise/constants.py create mode 100644 music_assistant/providers/radioparadise/helpers.py create mode 100644 music_assistant/providers/radioparadise/parsers.py create mode 100644 music_assistant/providers/radioparadise/provider.py diff --git a/music_assistant/providers/radioparadise/__init__.py b/music_assistant/providers/radioparadise/__init__.py index 6b35513f..f3d2d9d7 100644 --- a/music_assistant/providers/radioparadise/__init__.py +++ b/music_assistant/providers/radioparadise/__init__.py @@ -2,33 +2,7 @@ from __future__ import annotations -import asyncio -import contextlib -import time -from collections.abc import AsyncGenerator, Sequence -from typing import TYPE_CHECKING, Any, cast - -import aiohttp -from music_assistant_models.enums import ( - ContentType, - ImageType, - MediaType, - ProviderFeature, - StreamType, -) -from music_assistant_models.errors import MediaNotFoundError, UnplayableMediaError -from music_assistant_models.media_items import ( - AudioFormat, - BrowseFolder, - ItemMapping, - MediaItemImage, - MediaItemType, - ProviderMapping, - Radio, -) -from music_assistant_models.streamdetails import StreamDetails, StreamMetadata - -from music_assistant.models.music_provider import MusicProvider +from typing import TYPE_CHECKING if TYPE_CHECKING: from music_assistant_models.config_entries import ( @@ -41,62 +15,7 @@ if TYPE_CHECKING: from music_assistant import MusicAssistant from music_assistant.models import ProviderInstanceType -# Base URL for station icons -STATION_ICONS_BASE_URL = ( - "https://raw.githubusercontent.com/music-assistant/music-assistant.io/main/docs/assets/icons" -) - -# Radio Paradise channel configurations with hardcoded channels -RADIO_PARADISE_CHANNELS: dict[str, dict[str, Any]] = { - "0": { - "name": "Radio Paradise - Main Mix", - "description": "Eclectic mix of music - hand-picked by real humans", - "stream_url": "https://stream.radioparadise.com/flac", - "content_type": ContentType.FLAC, - "api_url": "https://api.radioparadise.com/api/now_playing", - "station_icon": "radioparadise-logo-main.png", - }, - "1": { - "name": "Radio Paradise - Mellow Mix", - "description": "A mellower selection from the RP music library", - "stream_url": "https://stream.radioparadise.com/mellow-flac", - "content_type": ContentType.FLAC, - "api_url": "https://api.radioparadise.com/api/now_playing?chan=1", - "station_icon": "radioparadise-logo-mellow.png", - }, - "2": { - "name": "Radio Paradise - Rock Mix", - "description": "Heavier selections from the RP music library", - "stream_url": "https://stream.radioparadise.com/rock-flac", - "content_type": ContentType.FLAC, - "api_url": "https://api.radioparadise.com/api/now_playing?chan=2", - "station_icon": "radioparadise-logo-rock.png", - }, - "3": { - "name": "Radio Paradise - Global", - "description": "Global music and experimental selections", - "stream_url": "https://stream.radioparadise.com/global-flac", - "content_type": ContentType.FLAC, - "api_url": "https://api.radioparadise.com/api/now_playing?chan=3", - "station_icon": "radioparadise-logo-global.png", - }, - "4": { - "name": "Radio Paradise - Beyond", - "description": "Exploring the frontiers of improvisational music", - "stream_url": "https://stream.radioparadise.com/beyond-flac", - "content_type": ContentType.FLAC, - "api_url": "https://api.radioparadise.com/api/now_playing?chan=4", - "station_icon": "radioparadise-logo-beyond.png", - }, - "5": { - "name": "Radio Paradise - Serenity", - "description": "Don't panic, and don't forget your towel", - "stream_url": "https://stream.radioparadise.com/serenity", - "content_type": ContentType.AAC, - "api_url": "https://api.radioparadise.com/api/now_playing?chan=5", - "station_icon": "radioparadise-logo-serenity.png", - }, -} +from .provider import RadioParadiseProvider async def setup( @@ -114,413 +33,3 @@ async def get_config_entries( ) -> tuple[ConfigEntry, ...]: """Return Config entries to setup this provider.""" return () - - -class RadioParadiseProvider(MusicProvider): - """Radio Paradise Music Provider for Music Assistant.""" - - def __init__(self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig): - """Initialize the provider.""" - super().__init__(mass, manifest, config) - - @property - def supported_features(self) -> set[ProviderFeature]: - """Return the features supported by this Provider.""" - return { - ProviderFeature.BROWSE, - ProviderFeature.LIBRARY_RADIOS, - } - - @property - def is_streaming_provider(self) -> bool: - """Return True if the provider is a streaming provider.""" - return True - - async def get_library_radios(self) -> AsyncGenerator[Radio, None]: - """Retrieve library/subscribed radio stations from the provider.""" - for channel_id in RADIO_PARADISE_CHANNELS: - yield self._parse_radio(channel_id) - - async def get_radio(self, prov_radio_id: str) -> Radio: - """Get full radio details by id.""" - if prov_radio_id not in RADIO_PARADISE_CHANNELS: - raise MediaNotFoundError("Station not found") - - return self._parse_radio(prov_radio_id) - - async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails: - """Get streamdetails for a radio station.""" - if media_type != MediaType.RADIO: - raise UnplayableMediaError(f"Unsupported media type: {media_type}") - if item_id not in RADIO_PARADISE_CHANNELS: - raise MediaNotFoundError(f"Unknown radio channel: {item_id}") - - stream_url = self._build_stream_url(item_id) - if not stream_url: - raise UnplayableMediaError(f"No stream URL found for channel {item_id}") - - # Get content type from channel configuration - channel_info = RADIO_PARADISE_CHANNELS[item_id] - content_type = channel_info["content_type"] - - stream_details = StreamDetails( - item_id=item_id, - provider=self.lookup_key, - audio_format=AudioFormat( - content_type=content_type, - channels=2, - ), - media_type=MediaType.RADIO, - stream_type=StreamType.HTTP, - path=stream_url, - allow_seek=False, - can_seek=False, - duration=0, - ) - - # Set initial metadata if available - metadata = await self._get_channel_metadata(item_id) - if metadata and metadata.get("current"): - current_song = metadata["current"] - stream_details.stream_metadata = self._build_stream_metadata(current_song, metadata) - - # Store the monitoring task in streamdetails.data for cleanup in on_streamed - monitor_task = self.mass.create_task(self._monitor_stream_metadata(stream_details)) - stream_details.data = {"monitor_task": monitor_task} - - return stream_details - - async def on_streamed(self, streamdetails: StreamDetails) -> None: - """Handle callback when given streamdetails completed streaming.""" - self.logger.debug( - f"Radio Paradise channel {streamdetails.item_id} streamed for " - f"{streamdetails.seconds_streamed} seconds" - ) - - # Cancel and clean up the monitoring task - if "monitor_task" in streamdetails.data: - monitor_task = streamdetails.data["monitor_task"] - if not monitor_task.done(): - monitor_task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await monitor_task - del streamdetails.data["monitor_task"] - - async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]: - """Browse this provider's items.""" - return [self._parse_radio(channel_id) for channel_id in RADIO_PARADISE_CHANNELS] - - def _parse_radio(self, channel_id: str) -> Radio: - """Create a Radio object from cached channel information.""" - channel_info = RADIO_PARADISE_CHANNELS.get(channel_id, {}) - - radio = Radio( - provider=self.lookup_key, - item_id=channel_id, - name=channel_info.get("name", "Unknown Radio"), - provider_mappings={ - ProviderMapping( - provider_domain=self.domain, - provider_instance=self.instance_id, - item_id=channel_id, - available=True, - ) - }, - ) - - # Add static station icon - station_icon = channel_info.get("station_icon") - if station_icon: - icon_url = f"{STATION_ICONS_BASE_URL}/{station_icon}" - radio.metadata.add_image( - MediaItemImage( - provider=self.lookup_key, - type=ImageType.THUMB, - path=icon_url, - remotely_accessible=True, - ) - ) - - return radio - - async def _get_channel_metadata(self, channel_id: str) -> dict[str, Any] | None: - """Get current track and upcoming tracks from Radio Paradise's block API. - - Args: - channel_id: Radio Paradise channel ID (0-5) - - Returns: - Dict with current song, next song, and block data, or None if API fails - """ - if channel_id not in RADIO_PARADISE_CHANNELS: - return None - - try: - # Use block API for much richer data - api_url = ( - f"https://api.radioparadise.com/api/get_block?bitrate=4&info=true&chan={channel_id}" - ) - timeout = aiohttp.ClientTimeout(total=10) - - async with self.mass.http_session.get(api_url, timeout=timeout) as response: - if response.status != 200: - self.logger.debug(f"Block API call failed with status {response.status}") - return None - - data = await response.json() - - # Find currently playing song based on elapsed time - current_time_ms = self._get_current_block_position(data) - current_song = self.find_current_song(data.get("song", {}), current_time_ms) - - if not current_song: - self.logger.debug(f"No current song found for channel {channel_id}") - return None - - # Get next song - next_song = self._get_next_song(data.get("song", {}), current_song) - - return {"current": current_song, "next": next_song, "block_data": data} - - except aiohttp.ClientError as exc: - self.logger.debug(f"Failed to get block metadata for channel {channel_id}: {exc}") - return None - except Exception as exc: - self.logger.debug( - f"Unexpected error getting block metadata for channel {channel_id}: {exc}" - ) - return None - - def _get_current_block_position(self, block_data: dict[str, Any]) -> int: - """Calculate current playback position within a Radio Paradise block. - - Args: - block_data: Block data containing sched_time_millis - - Returns: - Current position in milliseconds from block start - """ - current_time_ms = int(time.time() * 1000) - sched_time = int(block_data.get("sched_time_millis", current_time_ms)) - return current_time_ms - sched_time - - def find_current_song( - self, songs: dict[str, dict[str, Any]], current_time_ms: int - ) -> dict[str, Any] | None: - """Find which song should currently be playing based on elapsed time. - - Args: - songs: Dictionary of songs from Radio Paradise block data - current_time_ms: Current position in milliseconds within the block - - Returns: - The song dict that should be playing now, or None if not found - """ - sorted_keys = sorted(songs.keys(), key=int) - - for song_key in sorted_keys: - song = songs[song_key] - song_start = int(song.get("elapsed", 0)) - song_duration = int(song.get("duration", 0)) - song_end = song_start + song_duration - - if song_start <= current_time_ms < song_end: - return song - - # If no exact match, return first song - first_song = songs.get("0") - return first_song if first_song is not None else {} - - def _get_next_song( - self, songs: dict[str, Any], current_song: dict[str, Any] - ) -> dict[str, Any] | None: - """Get the next song that will play after the current song. - - Args: - songs: Dictionary of songs from Radio Paradise block data - current_song: The currently playing song dictionary - - Returns: - The next song dict, or None if no next song found - """ - current_event = current_song.get("event") - current_elapsed = int(current_song.get("elapsed", 0)) - sorted_keys = sorted(songs.keys(), key=int) - - for song_key in sorted_keys: - song = cast("dict[str, Any]", songs[song_key]) - if song.get("event") != current_event and int(song.get("elapsed", 0)) > current_elapsed: - return song - return None - - def _build_stream_url(self, channel_id: str) -> str: - """Build the streaming URL for a Radio Paradise channel. - - Args: - channel_id: Radio Paradise channel ID (0-5) - - Returns: - Streaming URL for the channel, or empty string if not found - """ - if channel_id not in RADIO_PARADISE_CHANNELS: - return "" - - channel_info = RADIO_PARADISE_CHANNELS[channel_id] - return str(channel_info.get("stream_url", "")) - - async def _monitor_stream_metadata(self, stream_details: StreamDetails) -> None: - """Monitor and update stream metadata in real-time during playback. - - Fetches current track info from Radio Paradise's API every 10 seconds - and updates StreamDetails with track metadata and upcoming songs. - - Args: - stream_details: StreamDetails object to update with metadata - """ - last_track_event = "" - item_id = stream_details.item_id - - try: - while True: - metadata = await self._get_channel_metadata(item_id) - if metadata and metadata.get("current"): - current_song = metadata["current"] - current_event = current_song.get("event", "") - - if current_event != last_track_event: - # Create StreamMetadata object with full track info - stream_metadata = self._build_stream_metadata(current_song, metadata) - - self.logger.debug( - f"Updating stream metadata for {item_id}: " - f"{stream_metadata.artist} - {stream_metadata.title}" - ) - stream_details.stream_metadata = stream_metadata - - last_track_event = current_event - - await asyncio.sleep(15) - except asyncio.CancelledError: - self.logger.debug(f"Monitor task cancelled for {item_id}") - except aiohttp.ClientError as exc: - self.logger.debug(f"Network error while monitoring {item_id}: {exc}") - except Exception as exc: - self.logger.warning(f"Unexpected error monitoring {item_id}: {exc}") - - def _build_stream_metadata( - self, current_song: dict[str, Any], metadata: dict[str, Any] - ) -> StreamMetadata: - """Build StreamMetadata with current track info and upcoming tracks. - - Args: - current_song: Current track data from Radio Paradise API - metadata: Full metadata response with next song and block data - - Returns: - StreamMetadata with track info and upcoming track previews - """ - # Extract track info - artist = current_song.get("artist", "Unknown Artist") - title = current_song.get("title", "Unknown Title") - album = current_song.get("album") - year = current_song.get("year") - - # Build album string with year if available - album_display = album - if album and year: - album_display = f"{album} ({year})" - elif year: - album_display = str(year) - - # Get cover image URL - cover_path = current_song.get("cover") - image_url = None - if cover_path: - image_url = f"https://img.radioparadise.com/{cover_path}" - - # Debug log the image URL - self.logger.debug(f"Cover art URL for {artist} - {title}: {image_url}") - - # Get track duration - duration = current_song.get("duration") - if duration: - duration = int(duration) // 1000 # Convert from ms to seconds - - # Add upcoming tracks info to title for scrolling display - next_song = metadata.get("next") - block_data = metadata.get("block_data") - enhanced_title = self._enhance_title_with_upcoming( - title, current_song, next_song, block_data - ) - - return StreamMetadata( - title=enhanced_title, - artist=artist, - album=album_display, - image_url=image_url, - duration=duration, - ) - - def _enhance_title_with_upcoming( - self, - title: str, - current_song: dict[str, Any], - next_song: dict[str, Any] | None, - block_data: dict[str, Any] | None, - ) -> str: - """Enhance track title with upcoming track info for scrolling display. - - Args: - title: Original track title - current_song: Current track data - next_song: Next track data, or None if not available - block_data: Full block data with all upcoming tracks - - Returns: - Enhanced title with "Up Next" and "Later" information appended - """ - enhanced_title = title - - # Add next track info - if next_song: - next_artist = next_song.get("artist", "") - next_title = next_song.get("title", "") - if next_artist and next_title: - enhanced_title += f" | Up Next: {next_artist} - {next_title}" - - # Add later artists in a single pass with deduplication - if block_data and "song" in block_data: - current_event = current_song.get("event") - current_elapsed = int(current_song.get("elapsed", 0)) - next_event = next_song.get("event") if next_song else None - - # Use set to deduplicate artist names (including next_song artist) - seen_artists = set() - if next_song: - next_artist = next_song.get("artist", "") - if next_artist: - seen_artists.add(next_artist) - - later_artists = [] - sorted_keys = sorted(block_data["song"].keys(), key=int) - for song_key in sorted_keys: - song = block_data["song"][song_key] - song_event = song.get("event") - - # Skip current and next song, only include songs that come after current - if ( - song_event not in (current_event, next_event) - and int(song.get("elapsed", 0)) > current_elapsed - ): - artist_name = song.get("artist", "") - if artist_name and artist_name not in seen_artists: - seen_artists.add(artist_name) - later_artists.append(artist_name) - if len(later_artists) >= 4: # Limit to 4 artists - break - - if later_artists: - artists_list = ", ".join(later_artists) - enhanced_title += f" | Later: {artists_list}" - - return enhanced_title diff --git a/music_assistant/providers/radioparadise/constants.py b/music_assistant/providers/radioparadise/constants.py new file mode 100644 index 00000000..da23d0ef --- /dev/null +++ b/music_assistant/providers/radioparadise/constants.py @@ -0,0 +1,62 @@ +"""Constants for Radio Paradise provider.""" + +from typing import Any + +from music_assistant_models.enums import ContentType + +# Base URL for station icons +STATION_ICONS_BASE_URL = ( + "https://raw.githubusercontent.com/music-assistant/music-assistant.io/main/docs/assets/icons" +) + +# Radio Paradise channel configurations with hardcoded channels +RADIO_PARADISE_CHANNELS: dict[str, dict[str, Any]] = { + "0": { + "name": "Radio Paradise - Main Mix", + "description": "Eclectic mix of music - hand-picked by real humans", + "stream_url": "https://stream.radioparadise.com/flac", + "content_type": ContentType.FLAC, + "api_url": "https://api.radioparadise.com/api/now_playing", + "station_icon": "radioparadise-logo-main.png", + }, + "1": { + "name": "Radio Paradise - Mellow Mix", + "description": "A mellower selection from the RP music library", + "stream_url": "https://stream.radioparadise.com/mellow-flac", + "content_type": ContentType.FLAC, + "api_url": "https://api.radioparadise.com/api/now_playing?chan=1", + "station_icon": "radioparadise-logo-mellow.png", + }, + "2": { + "name": "Radio Paradise - Rock Mix", + "description": "Heavier selections from the RP music library", + "stream_url": "https://stream.radioparadise.com/rock-flac", + "content_type": ContentType.FLAC, + "api_url": "https://api.radioparadise.com/api/now_playing?chan=2", + "station_icon": "radioparadise-logo-rock.png", + }, + "3": { + "name": "Radio Paradise - Global", + "description": "Global music and experimental selections", + "stream_url": "https://stream.radioparadise.com/global-flac", + "content_type": ContentType.FLAC, + "api_url": "https://api.radioparadise.com/api/now_playing?chan=3", + "station_icon": "radioparadise-logo-global.png", + }, + "4": { + "name": "Radio Paradise - Beyond", + "description": "Exploring the frontiers of improvisational music", + "stream_url": "https://stream.radioparadise.com/beyond-flac", + "content_type": ContentType.FLAC, + "api_url": "https://api.radioparadise.com/api/now_playing?chan=4", + "station_icon": "radioparadise-logo-beyond.png", + }, + "5": { + "name": "Radio Paradise - Serenity", + "description": "Don't panic, and don't forget your towel", + "stream_url": "https://stream.radioparadise.com/serenity", + "content_type": ContentType.AAC, + "api_url": "https://api.radioparadise.com/api/now_playing?chan=5", + "station_icon": "radioparadise-logo-serenity.png", + }, +} diff --git a/music_assistant/providers/radioparadise/helpers.py b/music_assistant/providers/radioparadise/helpers.py new file mode 100644 index 00000000..728cb955 --- /dev/null +++ b/music_assistant/providers/radioparadise/helpers.py @@ -0,0 +1,149 @@ +"""Helper functions for Radio Paradise provider.""" + +import time +from typing import Any, cast + +from .constants import RADIO_PARADISE_CHANNELS + + +def get_current_block_position(block_data: dict[str, Any]) -> int: + """Calculate current playback position within a Radio Paradise block. + + Args: + block_data: Block data containing sched_time_millis + + Returns: + Current position in milliseconds from block start + """ + current_time_ms = int(time.time() * 1000) + sched_time = int(block_data.get("sched_time_millis", current_time_ms)) + return current_time_ms - sched_time + + +def find_current_song( + songs: dict[str, dict[str, Any]], current_time_ms: int +) -> dict[str, Any] | None: + """Find which song should currently be playing based on elapsed time. + + Args: + songs: Dictionary of songs from Radio Paradise block data + current_time_ms: Current position in milliseconds within the block + + Returns: + The song dict that should be playing now, or None if not found + """ + sorted_keys = sorted(songs.keys(), key=int) + + for song_key in sorted_keys: + song = songs[song_key] + song_start = int(song.get("elapsed", 0)) + song_duration = int(song.get("duration", 0)) + song_end = song_start + song_duration + + if song_start <= current_time_ms < song_end: + return song + + # If no exact match, return first song + first_song = songs.get("0") + return first_song if first_song is not None else {} + + +def get_next_song(songs: dict[str, Any], current_song: dict[str, Any]) -> dict[str, Any] | None: + """Get the next song that will play after the current song. + + Args: + songs: Dictionary of songs from Radio Paradise block data + current_song: The currently playing song dictionary + + Returns: + The next song dict, or None if no next song found + """ + current_event = current_song.get("event") + current_elapsed = int(current_song.get("elapsed", 0)) + sorted_keys = sorted(songs.keys(), key=int) + + for song_key in sorted_keys: + song = cast("dict[str, Any]", songs[song_key]) + if song.get("event") != current_event and int(song.get("elapsed", 0)) > current_elapsed: + return song + return None + + +def build_stream_url(channel_id: str) -> str: + """Build the streaming URL for a Radio Paradise channel. + + Args: + channel_id: Radio Paradise channel ID (0-5) + + Returns: + Streaming URL for the channel, or empty string if not found + """ + if channel_id not in RADIO_PARADISE_CHANNELS: + return "" + + channel_info = RADIO_PARADISE_CHANNELS[channel_id] + return str(channel_info.get("stream_url", "")) + + +def enhance_title_with_upcoming( + title: str, + current_song: dict[str, Any], + next_song: dict[str, Any] | None, + block_data: dict[str, Any] | None, +) -> str: + """Enhance track title with upcoming track info for scrolling display. + + Args: + title: Original track title + current_song: Current track data + next_song: Next track data, or None if not available + block_data: Full block data with all upcoming tracks + + Returns: + Enhanced title with "Up Next" and "Later" information appended + """ + enhanced_title = title + + # Add next track info + if next_song: + next_artist = next_song.get("artist", "") + next_title = next_song.get("title", "") + if next_artist and next_title: + enhanced_title += f" | Up Next: {next_artist} - {next_title}" + + # Add later artists in a single pass with deduplication + if block_data and "song" in block_data: + current_event = current_song.get("event") + current_elapsed = int(current_song.get("elapsed", 0)) + next_event = next_song.get("event") if next_song else None + + # Use set to deduplicate artist names (including next_song artist) + seen_artists = set() + if next_song: + next_artist = next_song.get("artist", "") + if next_artist: + seen_artists.add(next_artist) + + later_artists = [] + sorted_keys = sorted(block_data["song"].keys(), key=int) + for song_key in sorted_keys: + song = block_data["song"][song_key] + song_event = song.get("event") + + # Skip current and next song, only include songs that come after current + if ( + song_event not in (current_event, next_event) + and int(song.get("elapsed", 0)) > current_elapsed + ): + artist_name = song.get("artist", "") + if artist_name and artist_name not in seen_artists: + seen_artists.add(artist_name) + later_artists.append(artist_name) + if len(later_artists) >= 4: # Limit to 4 artists + break + + if later_artists: + artists_list = ", ".join(later_artists) + enhanced_title += f" | Later: {artists_list}" + + return enhanced_title diff --git a/music_assistant/providers/radioparadise/parsers.py b/music_assistant/providers/radioparadise/parsers.py new file mode 100644 index 00000000..93ced25d --- /dev/null +++ b/music_assistant/providers/radioparadise/parsers.py @@ -0,0 +1,98 @@ +"""Parsers for Radio Paradise provider.""" + +from typing import Any + +from music_assistant_models.enums import ImageType +from music_assistant_models.media_items import ( + MediaItemImage, + ProviderMapping, + Radio, +) +from music_assistant_models.streamdetails import StreamMetadata + +from .constants import RADIO_PARADISE_CHANNELS, STATION_ICONS_BASE_URL +from .helpers import enhance_title_with_upcoming + + +def parse_radio( + channel_id: str, provider_lookup_key: str, provider_domain: str, instance_id: str +) -> Radio: + """Create a Radio object from cached channel information.""" + channel_info = RADIO_PARADISE_CHANNELS.get(channel_id, {}) + + radio = Radio( + provider=provider_lookup_key, + item_id=channel_id, + name=channel_info.get("name", "Unknown Radio"), + provider_mappings={ + ProviderMapping( + provider_domain=provider_domain, + provider_instance=instance_id, + item_id=channel_id, + available=True, + ) + }, + ) + + # Add static station icon + station_icon = channel_info.get("station_icon") + if station_icon: + icon_url = f"{STATION_ICONS_BASE_URL}/{station_icon}" + radio.metadata.add_image( + MediaItemImage( + provider=provider_lookup_key, + type=ImageType.THUMB, + path=icon_url, + remotely_accessible=True, + ) + ) + + return radio + + +def build_stream_metadata(current_song: dict[str, Any], metadata: dict[str, Any]) -> StreamMetadata: + """Build StreamMetadata with current track info and upcoming tracks. + + Args: + current_song: Current track data from Radio Paradise API + metadata: Full metadata response with next song and block data + + Returns: + StreamMetadata with track info and upcoming track previews + """ + # Extract track info + artist = current_song.get("artist", "Unknown Artist") + title = current_song.get("title", "Unknown Title") + album = current_song.get("album") + year = current_song.get("year") + + # Build album string with year if available + album_display = album + if album and year: + album_display = f"{album} ({year})" + elif year: + album_display = str(year) + + # Get cover image URL + cover_path = current_song.get("cover") + image_url = None + if cover_path: + image_url = f"https://img.radioparadise.com/{cover_path}" + + # Get track duration + duration = current_song.get("duration") + if duration: + duration = int(duration) // 1000 # Convert from ms to seconds + + # Add upcoming tracks info to title for scrolling display + next_song = metadata.get("next") + block_data = metadata.get("block_data") + enhanced_title = enhance_title_with_upcoming(title, current_song, next_song, block_data) + + return StreamMetadata( + title=enhanced_title, + artist=artist, + album=album_display, + image_url=image_url, + duration=duration, + ) diff --git a/music_assistant/providers/radioparadise/provider.py b/music_assistant/providers/radioparadise/provider.py new file mode 100644 index 00000000..5e209a68 --- /dev/null +++ b/music_assistant/providers/radioparadise/provider.py @@ -0,0 +1,222 @@ +"""Radio Paradise Music Provider for Music Assistant.""" + +from __future__ import annotations + +import asyncio +import contextlib +from collections.abc import AsyncGenerator, Sequence +from typing import TYPE_CHECKING, Any + +import aiohttp +from music_assistant_models.enums import ( + MediaType, + ProviderFeature, + StreamType, +) +from music_assistant_models.errors import MediaNotFoundError, UnplayableMediaError +from music_assistant_models.media_items import ( + AudioFormat, + BrowseFolder, + ItemMapping, + MediaItemType, + Radio, +) +from music_assistant_models.streamdetails import StreamDetails + +from music_assistant.models.music_provider import MusicProvider + +from . import parsers +from .constants import RADIO_PARADISE_CHANNELS +from .helpers import build_stream_url, find_current_song, get_current_block_position, get_next_song + +if TYPE_CHECKING: + from music_assistant_models.config_entries import ProviderConfig + from music_assistant_models.provider import ProviderManifest + + from music_assistant import MusicAssistant + + +class RadioParadiseProvider(MusicProvider): + """Radio Paradise Music Provider for Music Assistant.""" + + def __init__(self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig): + """Initialize the provider.""" + super().__init__(mass, manifest, config) + + @property + def supported_features(self) -> set[ProviderFeature]: + """Return the features supported by this Provider.""" + return { + ProviderFeature.BROWSE, + ProviderFeature.LIBRARY_RADIOS, + } + + @property + def is_streaming_provider(self) -> bool: + """Return True if the provider is a streaming provider.""" + return True + + async def get_library_radios(self) -> AsyncGenerator[Radio, None]: + """Retrieve library/subscribed radio stations from the provider.""" + for channel_id in RADIO_PARADISE_CHANNELS: + yield self._parse_radio(channel_id) + + async def get_radio(self, prov_radio_id: str) -> Radio: + """Get full radio details by id.""" + if prov_radio_id not in RADIO_PARADISE_CHANNELS: + raise MediaNotFoundError("Station not found") + + return self._parse_radio(prov_radio_id) + + async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails: + """Get streamdetails for a radio station.""" + if media_type != MediaType.RADIO: + raise UnplayableMediaError(f"Unsupported media type: {media_type}") + if item_id not in RADIO_PARADISE_CHANNELS: + raise MediaNotFoundError(f"Unknown radio channel: {item_id}") + + stream_url = build_stream_url(item_id) + if not stream_url: + raise UnplayableMediaError(f"No stream URL found for channel {item_id}") + + # Get content type from channel configuration + channel_info = RADIO_PARADISE_CHANNELS[item_id] + content_type = channel_info["content_type"] + + stream_details = StreamDetails( + item_id=item_id, + provider=self.lookup_key, + audio_format=AudioFormat( + content_type=content_type, + channels=2, + ), + media_type=MediaType.RADIO, + stream_type=StreamType.HTTP, + path=stream_url, + allow_seek=False, + can_seek=False, + duration=0, + ) + + # Set initial metadata if available + metadata = await self._get_channel_metadata(item_id) + if metadata and metadata.get("current"): + current_song = metadata["current"] + stream_details.stream_metadata = parsers.build_stream_metadata(current_song, metadata) + + # Store the monitoring task in streamdetails.data for cleanup in on_streamed + monitor_task = self.mass.create_task(self._monitor_stream_metadata(stream_details)) + stream_details.data = {"monitor_task": monitor_task} + + return stream_details + + async def on_streamed(self, streamdetails: StreamDetails) -> None: + """Handle callback when given streamdetails completed streaming.""" + self.logger.debug( + f"Radio Paradise channel {streamdetails.item_id} streamed for " + f"{streamdetails.seconds_streamed} seconds" + ) + + # Cancel and clean up the monitoring task + if "monitor_task" in streamdetails.data: + monitor_task = streamdetails.data["monitor_task"] + if not monitor_task.done(): + monitor_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await monitor_task + del streamdetails.data["monitor_task"] + + async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]: + """Browse this provider's items.""" + return [self._parse_radio(channel_id) for channel_id in RADIO_PARADISE_CHANNELS] + + def _parse_radio(self, channel_id: str) -> Radio: + """Create a Radio object from cached channel information.""" + return parsers.parse_radio(channel_id, self.lookup_key, self.domain, self.instance_id) + + async def _get_channel_metadata(self, channel_id: str) -> dict[str, Any] | None: + """Get current track and upcoming tracks from Radio Paradise's block API. + + Args: + channel_id: Radio Paradise channel ID (0-5) + + Returns: + Dict with current song, next song, and block data, or None if API fails + """ + if channel_id not in RADIO_PARADISE_CHANNELS: + return None + + try: + # Use block API for much richer data + api_url = ( + f"https://api.radioparadise.com/api/get_block?bitrate=4&info=true&chan={channel_id}" + ) + timeout = aiohttp.ClientTimeout(total=10) + + async with self.mass.http_session.get(api_url, timeout=timeout) as response: + if response.status != 200: + self.logger.debug(f"Block API call failed with status {response.status}") + return None + + data = await response.json() + + # Find currently playing song based on elapsed time + current_time_ms = get_current_block_position(data) + current_song = find_current_song(data.get("song", {}), current_time_ms) + + if not current_song: + self.logger.debug(f"No current song found for channel {channel_id}") + return None + + # Get next song + next_song = get_next_song(data.get("song", {}), current_song) + + return {"current": current_song, "next": next_song, "block_data": data} + + except aiohttp.ClientError as exc: + self.logger.debug(f"Failed to get block metadata for channel {channel_id}: {exc}") + return None + except Exception as exc: + self.logger.debug( + f"Unexpected error getting block metadata for channel {channel_id}: {exc}" + ) + return None + + async def _monitor_stream_metadata(self, stream_details: StreamDetails) -> None: + """Monitor and update stream metadata in real-time during playback. + + Fetches current track info from Radio Paradise's API every 10 seconds + and updates StreamDetails with track metadata and upcoming songs. + + Args: + stream_details: StreamDetails object to update with metadata + """ + last_track_event = "" + item_id = stream_details.item_id + + try: + while True: + metadata = await self._get_channel_metadata(item_id) + if metadata and metadata.get("current"): + current_song = metadata["current"] + current_event = current_song.get("event", "") + + if current_event != last_track_event: + # Create StreamMetadata object with full track info + stream_metadata = parsers.build_stream_metadata(current_song, metadata) + + self.logger.debug( + f"Updating stream metadata for {item_id}: " + f"{stream_metadata.artist} - {stream_metadata.title}" + ) + stream_details.stream_metadata = stream_metadata + + last_track_event = current_event + + await asyncio.sleep(15) + except asyncio.CancelledError: + self.logger.debug(f"Monitor task cancelled for {item_id}") + except aiohttp.ClientError as exc: + self.logger.debug(f"Network error while monitoring {item_id}: {exc}") + except Exception as exc: + self.logger.warning(f"Unexpected error monitoring {item_id}: {exc}") -- 2.34.1