"description": "Eclectic mix of music - hand-picked by real humans",
"stream_url": "https://stream.radioparadise.com/flac",
"content_type": ContentType.FLAC,
- "api_url": "https://api.radioparadise.com/api/now_playing",
"station_icon": "radioparadise-logo-main.png",
},
"1": {
"description": "A mellower selection from the RP music library",
"stream_url": "https://stream.radioparadise.com/mellow-flac",
"content_type": ContentType.FLAC,
- "api_url": "https://api.radioparadise.com/api/now_playing?chan=1",
"station_icon": "radioparadise-logo-mellow.png",
},
"2": {
"description": "Heavier selections from the RP music library",
"stream_url": "https://stream.radioparadise.com/rock-flac",
"content_type": ContentType.FLAC,
- "api_url": "https://api.radioparadise.com/api/now_playing?chan=2",
"station_icon": "radioparadise-logo-rock.png",
},
"3": {
"description": "Global music and experimental selections",
"stream_url": "https://stream.radioparadise.com/global-flac",
"content_type": ContentType.FLAC,
- "api_url": "https://api.radioparadise.com/api/now_playing?chan=3",
"station_icon": "radioparadise-logo-global.png",
},
"4": {
"description": "Exploring the frontiers of improvisational music",
"stream_url": "https://stream.radioparadise.com/beyond-flac",
"content_type": ContentType.FLAC,
- "api_url": "https://api.radioparadise.com/api/now_playing?chan=4",
"station_icon": "radioparadise-logo-beyond.png",
},
"5": {
"description": "Don't panic, and don't forget your towel",
"stream_url": "https://stream.radioparadise.com/serenity",
"content_type": ContentType.AAC,
- "api_url": "https://api.radioparadise.com/api/now_playing?chan=5",
"station_icon": "radioparadise-logo-serenity.png",
},
}
+
+# API base URLs
+API_BASE_URL = "https://api.radioparadise.com/api"
+PLAY_API_URL = f"{API_BASE_URL}/play?bitrate=4&info=true&chan="
+NOWPLAYING_API_URL = f"{API_BASE_URL}/now_playing?chan="
--- /dev/null
+"""Helper functions for Radio Paradise provider."""
+
+import time
+from typing import Any
+
+
+def get_current_block_position(block_data: dict[str, Any]) -> int:
+ """Calculate current playback position within a Radio Paradise block.
+
+ :param block_data: Block data containing sched_time_millis.
+ """
+ current_time_ms = int(time.time() * 1000)
+ sched_time = int(block_data.get("sched_time_millis", current_time_ms))
+ return current_time_ms - sched_time
+
+
+def find_current_song(
+ songs: dict[str, dict[str, Any]], current_time_ms: int
+) -> dict[str, Any] | None:
+ """Find which song should currently be playing based on elapsed time.
+
+ :param songs: Dictionary of songs from Radio Paradise block data.
+ :param current_time_ms: Current position in milliseconds within the block.
+ """
+ sorted_keys = sorted(songs.keys(), key=int)
+
+ for song_key in sorted_keys:
+ song = songs[song_key]
+ song_start = int(song.get("elapsed", 0))
+ song_duration = int(song.get("duration", 0))
+ song_end = song_start + song_duration
+
+ if song_start <= current_time_ms < song_end:
+ return song
+
+ # If no exact match, return first song
+ first_song = songs.get("0")
+ return first_song if first_song is not None else None
+
+
+def get_next_song(songs: dict[str, Any], current_song: dict[str, Any]) -> dict[str, Any] | None:
+ """Get the next song that will play after the current song.
+
+ :param songs: Dictionary of songs from Radio Paradise block data.
+ :param current_song: The currently playing song dictionary.
+ """
+ current_event = current_song.get("event")
+ current_elapsed = int(current_song.get("elapsed", 0))
+ sorted_keys = sorted(songs.keys(), key=int)
+
+ for song_key in sorted_keys:
+ song: dict[str, Any] = songs[song_key]
+ if song.get("event") != current_event and int(song.get("elapsed", 0)) > current_elapsed:
+ return song
+ return None
return radio
-def build_stream_metadata(current_song: dict[str, Any]) -> StreamMetadata:
+def _build_upcoming_string(metadata: dict[str, Any], current_song: dict[str, Any]) -> str | None:
+ """Build "Up Next: Artist - Track ● Later: Artist2, Artist3" string.
+
+ :param metadata: Full metadata response with next song and block data.
+ :param current_song: Current track data to exclude from upcoming list.
+ """
+ next_song = metadata.get("next")
+ if not next_song:
+ return None
+
+ next_artist = next_song.get("artist", "")
+ next_title = next_song.get("title", "")
+ if not next_artist or not next_title:
+ return None
+
+ result = f"Up Next: {next_artist} - {next_title}"
+
+ # Get additional artists from block data for "Later" section
+ block_data = metadata.get("block_data")
+ if block_data and "song" in block_data:
+ current_event = current_song.get("event")
+ next_event = next_song.get("event")
+ current_elapsed = int(current_song.get("elapsed", 0))
+
+ # Collect unique artists that come after current and next song
+ seen_artists = {next_artist}
+ later_artists = []
+
+ sorted_keys = sorted(block_data["song"].keys(), key=int)
+ for song_key in sorted_keys:
+ song = block_data["song"][song_key]
+ song_event = song.get("event")
+
+ # Skip current and next song, only include songs after current
+ if (
+ song_event not in (current_event, next_event)
+ and int(song.get("elapsed", 0)) > current_elapsed
+ ):
+ artist_name = song.get("artist", "")
+ if artist_name and artist_name not in seen_artists:
+ seen_artists.add(artist_name)
+ later_artists.append(artist_name)
+ if len(later_artists) >= 3:
+ break
+
+ if later_artists:
+ result += f" ● Later: {', '.join(later_artists)}"
+
+ return result
+
+
+def build_stream_metadata(
+ current_song: dict[str, Any],
+ metadata: dict[str, Any],
+ *,
+ show_upcoming: bool = False,
+) -> StreamMetadata:
"""Build StreamMetadata with current track info.
- :param current_song: Current track data from Radio Paradise now_playing API.
+ :param current_song: Current track data from Radio Paradise API.
+ :param metadata: Full metadata response with next song and block data.
+ :param show_upcoming: If True, show upcoming info in artist field.
"""
- # Extract track info from now_playing API response
+ # Extract track info
artist = current_song.get("artist", "Unknown Artist")
title = current_song.get("title", "Unknown Title")
album = current_song.get("album")
elif year:
album_display = str(year)
- # Get cover image URL - now_playing API returns full URL
- image_url = current_song.get("cover")
+ # Alternate artist field with upcoming info
+ artist_display = artist
+ if show_upcoming:
+ upcoming = _build_upcoming_string(metadata, current_song)
+ if upcoming:
+ artist_display = upcoming
+
+ # Get cover image URL
+ # Play API returns relative path (e.g., "covers/l/19806.jpg")
+ # Now playing API returns full URL (e.g., "https://img.radioparadise.com/covers/l/19806.jpg")
+ cover = current_song.get("cover")
+ image_url = None
+ if cover:
+ image_url = cover if cover.startswith("http") else f"https://img.radioparadise.com/{cover}"
+
+ # Get track duration (API returns milliseconds, convert to seconds)
+ duration = current_song.get("duration")
+ if duration:
+ duration = int(duration) // 1000
return StreamMetadata(
title=title,
- artist=artist,
+ artist=artist_display,
album=album_display,
image_url=image_url,
+ duration=duration,
)
from music_assistant.models.music_provider import MusicProvider
from . import parsers
-from .constants import RADIO_PARADISE_CHANNELS
+from .constants import NOWPLAYING_API_URL, PLAY_API_URL, RADIO_PARADISE_CHANNELS
+from .helpers import find_current_song, get_current_block_position, get_next_song
class RadioParadiseProvider(MusicProvider):
metadata = await self._get_channel_metadata(item_id)
if metadata and metadata.get("current"):
current_song = metadata["current"]
- stream_details.stream_metadata = parsers.build_stream_metadata(current_song)
+ stream_details.stream_metadata = parsers.build_stream_metadata(current_song, metadata)
return stream_details
return parsers.parse_radio(channel_id, self.instance_id, self.domain)
async def _get_channel_metadata(self, channel_id: str) -> dict[str, Any] | None:
- """Get current track metadata from Radio Paradise's now_playing API.
+ """Get current track and upcoming tracks from Radio Paradise's API.
+
+ Tries the enriched play API first, falls back to simple now_playing API if it fails.
:param channel_id: Radio Paradise channel ID (0-5).
"""
if channel_id not in RADIO_PARADISE_CHANNELS:
return None
+ # Try enriched play API first
+ result = await self._get_play_api_metadata(channel_id)
+ if result:
+ return result
+
+ # Fallback to simple now_playing API
+ self.logger.debug(f"Falling back to now_playing API for channel {channel_id}")
+ return await self._get_nowplaying_api_metadata(channel_id)
+
+ async def _get_play_api_metadata(self, channel_id: str) -> dict[str, Any] | None:
+ """Get metadata from the enriched play API with upcoming track info.
+
+ :param channel_id: Radio Paradise channel ID (0-5).
+ """
+ try:
+ api_url = f"{PLAY_API_URL}{channel_id}"
+ timeout = aiohttp.ClientTimeout(total=10)
+
+ async with self.mass.http_session.get(api_url, timeout=timeout) as response:
+ if response.status != 200:
+ self.logger.debug(f"Play API call failed with status {response.status}")
+ return None
+
+ data = await response.json()
+
+ if not data or "song" not in data:
+ self.logger.debug(f"No song data in play API response for channel {channel_id}")
+ return None
+
+ # Find currently playing song based on elapsed time
+ current_time_ms = get_current_block_position(data)
+ current_song = find_current_song(data.get("song", {}), current_time_ms)
+
+ if not current_song:
+ self.logger.debug(f"No current song found for channel {channel_id}")
+ return None
+
+ # Get next song
+ next_song = get_next_song(data.get("song", {}), current_song)
+
+ return {"current": current_song, "next": next_song, "block_data": data}
+
+ except aiohttp.ClientError as exc:
+ self.logger.debug(f"Play API request failed for channel {channel_id}: {exc}")
+ return None
+ except (KeyError, ValueError, TypeError) as exc:
+ self.logger.debug(f"Error parsing play API response for channel {channel_id}: {exc}")
+ return None
+
+ async def _get_nowplaying_api_metadata(self, channel_id: str) -> dict[str, Any] | None:
+ """Get metadata from the simple now_playing API (fallback).
+
+ :param channel_id: Radio Paradise channel ID (0-5).
+ """
try:
- # Use now_playing API
- channel_info = RADIO_PARADISE_CHANNELS[channel_id]
- api_url = channel_info["api_url"]
+ api_url = f"{NOWPLAYING_API_URL}{channel_id}"
timeout = aiohttp.ClientTimeout(total=10)
async with self.mass.http_session.get(api_url, timeout=timeout) as response:
if response.status != 200:
- self.logger.debug(f"Now playing API call failed with status {response.status}")
+ self.logger.debug(f"Now playing API failed with status {response.status}")
return None
data = await response.json()
if not data:
- self.logger.debug(f"No metadata returned for channel {channel_id}")
+ self.logger.debug(f"No data from now_playing API for channel {channel_id}")
return None
+ # now_playing API returns flat song data, no next song or block data
return {"current": data, "next": None, "block_data": None}
except aiohttp.ClientError as exc:
- self.logger.debug(f"Failed to get metadata for channel {channel_id}: {exc}")
+ self.logger.debug(f"Now playing API request failed for channel {channel_id}: {exc}")
return None
- except Exception as exc:
- self.logger.debug(f"Unexpected error getting metadata for channel {channel_id}: {exc}")
+ except (KeyError, ValueError, TypeError) as exc:
+ self.logger.debug(f"Error parsing now_playing response for channel {channel_id}: {exc}")
return None
async def _update_stream_metadata(
"""Update stream metadata callback called by player queue controller.
Fetches current track info from Radio Paradise's API and updates
- StreamDetails with track metadata.
+ StreamDetails with track metadata. Alternates between showing the artist
+ and upcoming track info every 10 seconds.
:param stream_details: StreamDetails object to update with metadata.
:param elapsed_time: Elapsed playback time in seconds (unused for Radio Paradise).
metadata = await self._get_channel_metadata(item_id)
if metadata and metadata.get("current"):
current_song = metadata["current"]
- artist = current_song.get("artist", "")
- title = current_song.get("title", "")
- current_track_id = f"{artist}:{title}"
-
- # Only update if track changed
- if (
- not stream_details.stream_metadata
- or stream_details.data.get("last_track_id") != current_track_id
- ):
- # Create StreamMetadata object with full track info
- stream_metadata = parsers.build_stream_metadata(current_song)
-
- self.logger.debug(
- f"Updating stream metadata for {item_id}: "
- f"{stream_metadata.artist} - {stream_metadata.title}"
- )
- stream_details.stream_metadata = stream_metadata
- stream_details.data["last_track_id"] = current_track_id
+ current_event = current_song.get("event", "")
+
+ # Track changed - reset to show artist first
+ if stream_details.data.get("last_event") != current_event:
+ stream_details.data["last_event"] = current_event
+ stream_details.data["show_upcoming"] = False
+
+ # Toggle between artist and upcoming info
+ show_upcoming = stream_details.data.get("show_upcoming", False)
+
+ # Create StreamMetadata object with full track info
+ stream_metadata = parsers.build_stream_metadata(
+ current_song, metadata, show_upcoming=show_upcoming
+ )
+
+ self.logger.debug(
+ f"Updating stream metadata for {item_id}: "
+ f"{stream_metadata.artist} - {stream_metadata.title}"
+ )
+ stream_details.stream_metadata = stream_metadata
+
+ # Toggle for next update
+ stream_details.data["show_upcoming"] = not show_upcoming
except aiohttp.ClientError as exc:
- self.logger.debug(f"Network error while updating metadata for {item_id}: {exc}")
- except Exception as exc:
- self.logger.warning(f"Unexpected error updating metadata for {item_id}: {exc}")
+ self.logger.debug(f"Network error updating metadata for {item_id}: {exc}")