+++ /dev/null
-"""Helper functions for Radio Paradise provider."""
-
-import time
-from typing import Any, cast
-
-from .constants import RADIO_PARADISE_CHANNELS
-
-
-def get_current_block_position(block_data: dict[str, Any]) -> int:
- """Calculate current playback position within a Radio Paradise block.
-
- Args:
- block_data: Block data containing sched_time_millis
-
- Returns:
- Current position in milliseconds from block start
- """
- current_time_ms = int(time.time() * 1000)
- sched_time = int(block_data.get("sched_time_millis", current_time_ms))
- return current_time_ms - sched_time
-
-
-def find_current_song(
- songs: dict[str, dict[str, Any]], current_time_ms: int
-) -> dict[str, Any] | None:
- """Find which song should currently be playing based on elapsed time.
-
- Args:
- songs: Dictionary of songs from Radio Paradise block data
- current_time_ms: Current position in milliseconds within the block
-
- Returns:
- The song dict that should be playing now, or None if not found
- """
- sorted_keys = sorted(songs.keys(), key=int)
-
- for song_key in sorted_keys:
- song = songs[song_key]
- song_start = int(song.get("elapsed", 0))
- song_duration = int(song.get("duration", 0))
- song_end = song_start + song_duration
-
- if song_start <= current_time_ms < song_end:
- return song
-
- # If no exact match, return first song
- first_song = songs.get("0")
- return first_song if first_song is not None else {}
-
-
-def get_next_song(songs: dict[str, Any], current_song: dict[str, Any]) -> dict[str, Any] | None:
- """Get the next song that will play after the current song.
-
- Args:
- songs: Dictionary of songs from Radio Paradise block data
- current_song: The currently playing song dictionary
-
- Returns:
- The next song dict, or None if no next song found
- """
- current_event = current_song.get("event")
- current_elapsed = int(current_song.get("elapsed", 0))
- sorted_keys = sorted(songs.keys(), key=int)
-
- for song_key in sorted_keys:
- song = cast("dict[str, Any]", songs[song_key])
- if song.get("event") != current_event and int(song.get("elapsed", 0)) > current_elapsed:
- return song
- return None
-
-
-def build_stream_url(channel_id: str) -> str:
- """Build the streaming URL for a Radio Paradise channel.
-
- Args:
- channel_id: Radio Paradise channel ID (0-5)
-
- Returns:
- Streaming URL for the channel, or empty string if not found
- """
- if channel_id not in RADIO_PARADISE_CHANNELS:
- return ""
-
- channel_info = RADIO_PARADISE_CHANNELS[channel_id]
- return str(channel_info.get("stream_url", ""))
-
-
-def enhance_title_with_upcoming(
- title: str,
- current_song: dict[str, Any],
- next_song: dict[str, Any] | None,
- block_data: dict[str, Any] | None,
-) -> str:
- """Enhance track title with upcoming track info for scrolling display.
-
- Args:
- title: Original track title
- current_song: Current track data
- next_song: Next track data, or None if not available
- block_data: Full block data with all upcoming tracks
-
- Returns:
- Enhanced title with "Up Next" and "Later" information appended
- """
- enhanced_title = title
-
- # Add next track info
- if next_song:
- next_artist = next_song.get("artist", "")
- next_title = next_song.get("title", "")
- if next_artist and next_title:
- enhanced_title += f" | Up Next: {next_artist} - {next_title}"
-
- # Add later artists in a single pass with deduplication
- if block_data and "song" in block_data:
- current_event = current_song.get("event")
- current_elapsed = int(current_song.get("elapsed", 0))
- next_event = next_song.get("event") if next_song else None
-
- # Use set to deduplicate artist names (including next_song artist)
- seen_artists = set()
- if next_song:
- next_artist = next_song.get("artist", "")
- if next_artist:
- seen_artists.add(next_artist)
-
- later_artists = []
- sorted_keys = sorted(block_data["song"].keys(), key=int)
- for song_key in sorted_keys:
- song = block_data["song"][song_key]
- song_event = song.get("event")
-
- # Skip current and next song, only include songs that come after current
- if (
- song_event not in (current_event, next_event)
- and int(song.get("elapsed", 0)) > current_elapsed
- ):
- artist_name = song.get("artist", "")
- if artist_name and artist_name not in seen_artists:
- seen_artists.add(artist_name)
- later_artists.append(artist_name)
- if len(later_artists) >= 4: # Limit to 4 artists
- break
-
- if later_artists:
- artists_list = ", ".join(later_artists)
- enhanced_title += f" | Later: {artists_list}"
-
- return enhanced_title
from music_assistant_models.streamdetails import StreamMetadata
from .constants import RADIO_PARADISE_CHANNELS, STATION_ICONS_BASE_URL
-from .helpers import enhance_title_with_upcoming # noqa: F401
def parse_radio(channel_id: str, instance_id: str, provider_domain: str) -> Radio:
return radio
-def build_stream_metadata(current_song: dict[str, Any], metadata: dict[str, Any]) -> StreamMetadata: # noqa: ARG001
- """Build StreamMetadata with current track info and upcoming tracks.
+def build_stream_metadata(current_song: dict[str, Any]) -> StreamMetadata:
+ """Build StreamMetadata with current track info.
- Args:
- current_song: Current track data from Radio Paradise API
- metadata: Full metadata response with next song and block data
-
- Returns:
- StreamMetadata with track info and upcoming track previews
+ :param current_song: Current track data from Radio Paradise now_playing API.
"""
- # Extract track info
+ # Extract track info from now_playing API response
artist = current_song.get("artist", "Unknown Artist")
title = current_song.get("title", "Unknown Title")
album = current_song.get("album")
elif year:
album_display = str(year)
- # Get cover image URL
- cover_path = current_song.get("cover")
- image_url = None
- if cover_path:
- image_url = f"https://img.radioparadise.com/{cover_path}"
-
- # Get track duration
- duration = current_song.get("duration")
- if duration:
- duration = int(duration) // 1000 # Convert from ms to seconds
-
- # Add upcoming tracks info to title for scrolling display
- # next_song = metadata.get("next")
- # block_data = metadata.get("block_data")
- # TODO: Find a way to forward the next_song data to the frontend in the stream metadata
- # enhanced_title = enhance_title_with_upcoming(title, current_song, next_song, block_data)
- # enhanced_title = title # TODO remove after frontend update
+ # Get cover image URL - now_playing API returns full URL
+ image_url = current_song.get("cover")
return StreamMetadata(
title=title,
artist=artist,
album=album_display,
image_url=image_url,
- duration=duration,
)
from __future__ import annotations
-import asyncio
-import contextlib
from collections.abc import AsyncGenerator, Sequence
from typing import Any
from . import parsers
from .constants import RADIO_PARADISE_CHANNELS
-from .helpers import build_stream_url, find_current_song, get_current_block_position, get_next_song
class RadioParadiseProvider(MusicProvider):
if item_id not in RADIO_PARADISE_CHANNELS:
raise MediaNotFoundError(f"Unknown radio channel: {item_id}")
- stream_url = build_stream_url(item_id)
+ # Get stream URL from channel configuration
+ channel_info = RADIO_PARADISE_CHANNELS[item_id]
+ stream_url = channel_info.get("stream_url")
if not stream_url:
raise UnplayableMediaError(f"No stream URL found for channel {item_id}")
allow_seek=False,
can_seek=False,
duration=0,
+ stream_metadata_update_callback=self._update_stream_metadata,
+ stream_metadata_update_interval=10, # Check every 10 seconds
)
# Set initial metadata if available
metadata = await self._get_channel_metadata(item_id)
if metadata and metadata.get("current"):
current_song = metadata["current"]
- stream_details.stream_metadata = parsers.build_stream_metadata(current_song, metadata)
-
- # Store the monitoring task in streamdetails.data for cleanup in on_streamed
- monitor_task = self.mass.create_task(self._monitor_stream_metadata(stream_details))
- stream_details.data = {"monitor_task": monitor_task}
+ stream_details.stream_metadata = parsers.build_stream_metadata(current_song)
return stream_details
- async def on_streamed(self, streamdetails: StreamDetails) -> None:
- """Handle callback when given streamdetails completed streaming."""
- self.logger.debug(
- f"Radio Paradise channel {streamdetails.item_id} streamed for "
- f"{streamdetails.seconds_streamed} seconds"
- )
-
- # Cancel and clean up the monitoring task
- if "monitor_task" in streamdetails.data:
- monitor_task = streamdetails.data["monitor_task"]
- if not monitor_task.done():
- monitor_task.cancel()
- with contextlib.suppress(asyncio.CancelledError):
- await monitor_task
- del streamdetails.data["monitor_task"]
-
async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
"""Browse this provider's items."""
return [self._parse_radio(channel_id) for channel_id in RADIO_PARADISE_CHANNELS]
return parsers.parse_radio(channel_id, self.instance_id, self.domain)
async def _get_channel_metadata(self, channel_id: str) -> dict[str, Any] | None:
- """Get current track and upcoming tracks from Radio Paradise's block API.
+ """Get current track metadata from Radio Paradise's now_playing API.
- Args:
- channel_id: Radio Paradise channel ID (0-5)
-
- Returns:
- Dict with current song, next song, and block data, or None if API fails
+ :param channel_id: Radio Paradise channel ID (0-5).
"""
if channel_id not in RADIO_PARADISE_CHANNELS:
return None
try:
- # Use block API for much richer data
- api_url = (
- f"https://api.radioparadise.com/api/get_block?bitrate=4&info=true&chan={channel_id}"
- )
+ # Use now_playing API
+ channel_info = RADIO_PARADISE_CHANNELS[channel_id]
+ api_url = channel_info["api_url"]
timeout = aiohttp.ClientTimeout(total=10)
async with self.mass.http_session.get(api_url, timeout=timeout) as response:
if response.status != 200:
- self.logger.debug(f"Block API call failed with status {response.status}")
+ self.logger.debug(f"Now playing API call failed with status {response.status}")
return None
data = await response.json()
- # Find currently playing song based on elapsed time
- current_time_ms = get_current_block_position(data)
- current_song = find_current_song(data.get("song", {}), current_time_ms)
-
- if not current_song:
- self.logger.debug(f"No current song found for channel {channel_id}")
+ if not data:
+ self.logger.debug(f"No metadata returned for channel {channel_id}")
return None
- # Get next song
- next_song = get_next_song(data.get("song", {}), current_song)
-
- return {"current": current_song, "next": next_song, "block_data": data}
+ return {"current": data, "next": None, "block_data": None}
except aiohttp.ClientError as exc:
- self.logger.debug(f"Failed to get block metadata for channel {channel_id}: {exc}")
+ self.logger.debug(f"Failed to get metadata for channel {channel_id}: {exc}")
return None
except Exception as exc:
- self.logger.debug(
- f"Unexpected error getting block metadata for channel {channel_id}: {exc}"
- )
+ self.logger.debug(f"Unexpected error getting metadata for channel {channel_id}: {exc}")
return None
- async def _monitor_stream_metadata(self, stream_details: StreamDetails) -> None:
- """Monitor and update stream metadata in real-time during playback.
+ async def _update_stream_metadata(
+ self, stream_details: StreamDetails, elapsed_time: int
+ ) -> None:
+ """Update stream metadata callback called by player queue controller.
- Fetches current track info from Radio Paradise's API every 10 seconds
- and updates StreamDetails with track metadata and upcoming songs.
+ Fetches current track info from Radio Paradise's API and updates
+ StreamDetails with track metadata.
- Args:
- stream_details: StreamDetails object to update with metadata
+ :param stream_details: StreamDetails object to update with metadata.
+ :param elapsed_time: Elapsed playback time in seconds (unused for Radio Paradise).
"""
- last_track_event = ""
item_id = stream_details.item_id
+ # Initialize data dict if needed
+ if stream_details.data is None:
+ stream_details.data = {}
+
try:
- while True:
- metadata = await self._get_channel_metadata(item_id)
- if metadata and metadata.get("current"):
- current_song = metadata["current"]
- current_event = current_song.get("event", "")
-
- if current_event != last_track_event:
- # Create StreamMetadata object with full track info
- stream_metadata = parsers.build_stream_metadata(current_song, metadata)
-
- self.logger.debug(
- f"Updating stream metadata for {item_id}: "
- f"{stream_metadata.artist} - {stream_metadata.title}"
- )
- stream_details.stream_metadata = stream_metadata
-
- last_track_event = current_event
-
- await asyncio.sleep(15)
- except asyncio.CancelledError:
- self.logger.debug(f"Monitor task cancelled for {item_id}")
+ metadata = await self._get_channel_metadata(item_id)
+ if metadata and metadata.get("current"):
+ current_song = metadata["current"]
+ artist = current_song.get("artist", "")
+ title = current_song.get("title", "")
+ current_track_id = f"{artist}:{title}"
+
+ # Only update if track changed
+ if (
+ not stream_details.stream_metadata
+ or stream_details.data.get("last_track_id") != current_track_id
+ ):
+ # Create StreamMetadata object with full track info
+ stream_metadata = parsers.build_stream_metadata(current_song)
+
+ self.logger.debug(
+ f"Updating stream metadata for {item_id}: "
+ f"{stream_metadata.artist} - {stream_metadata.title}"
+ )
+ stream_details.stream_metadata = stream_metadata
+ stream_details.data["last_track_id"] = current_track_id
+
except aiohttp.ClientError as exc:
- self.logger.debug(f"Network error while monitoring {item_id}: {exc}")
+ self.logger.debug(f"Network error while updating metadata for {item_id}: {exc}")
except Exception as exc:
- self.logger.warning(f"Unexpected error monitoring {item_id}: {exc}")
+ self.logger.warning(f"Unexpected error updating metadata for {item_id}: {exc}")