Split Radio Paradise provider (#2394)
authorOzGav <gavnosp@hotmail.com>
Sun, 14 Sep 2025 13:59:37 +0000 (23:59 +1000)
committerGitHub <noreply@github.com>
Sun, 14 Sep 2025 13:59:37 +0000 (15:59 +0200)
music_assistant/providers/radioparadise/__init__.py
music_assistant/providers/radioparadise/constants.py [new file with mode: 0644]
music_assistant/providers/radioparadise/helpers.py [new file with mode: 0644]
music_assistant/providers/radioparadise/parsers.py [new file with mode: 0644]
music_assistant/providers/radioparadise/provider.py [new file with mode: 0644]

index 6b35513f86ffa910063d49c3fc43cf6a44f34ad4..f3d2d9d71343c41d7e93368ce1cfee9b8472ee97 100644 (file)
@@ -2,33 +2,7 @@
 
 from __future__ import annotations
 
-import asyncio
-import contextlib
-import time
-from collections.abc import AsyncGenerator, Sequence
-from typing import TYPE_CHECKING, Any, cast
-
-import aiohttp
-from music_assistant_models.enums import (
-    ContentType,
-    ImageType,
-    MediaType,
-    ProviderFeature,
-    StreamType,
-)
-from music_assistant_models.errors import MediaNotFoundError, UnplayableMediaError
-from music_assistant_models.media_items import (
-    AudioFormat,
-    BrowseFolder,
-    ItemMapping,
-    MediaItemImage,
-    MediaItemType,
-    ProviderMapping,
-    Radio,
-)
-from music_assistant_models.streamdetails import StreamDetails, StreamMetadata
-
-from music_assistant.models.music_provider import MusicProvider
+from typing import TYPE_CHECKING
 
 if TYPE_CHECKING:
     from music_assistant_models.config_entries import (
@@ -41,62 +15,7 @@ if TYPE_CHECKING:
     from music_assistant import MusicAssistant
     from music_assistant.models import ProviderInstanceType
 
-# Base URL for station icons
-STATION_ICONS_BASE_URL = (
-    "https://raw.githubusercontent.com/music-assistant/music-assistant.io/main/docs/assets/icons"
-)
-
-# Radio Paradise channel configurations with hardcoded channels
-RADIO_PARADISE_CHANNELS: dict[str, dict[str, Any]] = {
-    "0": {
-        "name": "Radio Paradise - Main Mix",
-        "description": "Eclectic mix of music - hand-picked by real humans",
-        "stream_url": "https://stream.radioparadise.com/flac",
-        "content_type": ContentType.FLAC,
-        "api_url": "https://api.radioparadise.com/api/now_playing",
-        "station_icon": "radioparadise-logo-main.png",
-    },
-    "1": {
-        "name": "Radio Paradise - Mellow Mix",
-        "description": "A mellower selection from the RP music library",
-        "stream_url": "https://stream.radioparadise.com/mellow-flac",
-        "content_type": ContentType.FLAC,
-        "api_url": "https://api.radioparadise.com/api/now_playing?chan=1",
-        "station_icon": "radioparadise-logo-mellow.png",
-    },
-    "2": {
-        "name": "Radio Paradise - Rock Mix",
-        "description": "Heavier selections from the RP music library",
-        "stream_url": "https://stream.radioparadise.com/rock-flac",
-        "content_type": ContentType.FLAC,
-        "api_url": "https://api.radioparadise.com/api/now_playing?chan=2",
-        "station_icon": "radioparadise-logo-rock.png",
-    },
-    "3": {
-        "name": "Radio Paradise - Global",
-        "description": "Global music and experimental selections",
-        "stream_url": "https://stream.radioparadise.com/global-flac",
-        "content_type": ContentType.FLAC,
-        "api_url": "https://api.radioparadise.com/api/now_playing?chan=3",
-        "station_icon": "radioparadise-logo-global.png",
-    },
-    "4": {
-        "name": "Radio Paradise - Beyond",
-        "description": "Exploring the frontiers of improvisational music",
-        "stream_url": "https://stream.radioparadise.com/beyond-flac",
-        "content_type": ContentType.FLAC,
-        "api_url": "https://api.radioparadise.com/api/now_playing?chan=4",
-        "station_icon": "radioparadise-logo-beyond.png",
-    },
-    "5": {
-        "name": "Radio Paradise - Serenity",
-        "description": "Don't panic, and don't forget your towel",
-        "stream_url": "https://stream.radioparadise.com/serenity",
-        "content_type": ContentType.AAC,
-        "api_url": "https://api.radioparadise.com/api/now_playing?chan=5",
-        "station_icon": "radioparadise-logo-serenity.png",
-    },
-}
+from .provider import RadioParadiseProvider
 
 
 async def setup(
@@ -114,413 +33,3 @@ async def get_config_entries(
 ) -> tuple[ConfigEntry, ...]:
     """Return Config entries to setup this provider."""
     return ()
-
-
-class RadioParadiseProvider(MusicProvider):
-    """Radio Paradise Music Provider for Music Assistant."""
-
-    def __init__(self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig):
-        """Initialize the provider."""
-        super().__init__(mass, manifest, config)
-
-    @property
-    def supported_features(self) -> set[ProviderFeature]:
-        """Return the features supported by this Provider."""
-        return {
-            ProviderFeature.BROWSE,
-            ProviderFeature.LIBRARY_RADIOS,
-        }
-
-    @property
-    def is_streaming_provider(self) -> bool:
-        """Return True if the provider is a streaming provider."""
-        return True
-
-    async def get_library_radios(self) -> AsyncGenerator[Radio, None]:
-        """Retrieve library/subscribed radio stations from the provider."""
-        for channel_id in RADIO_PARADISE_CHANNELS:
-            yield self._parse_radio(channel_id)
-
-    async def get_radio(self, prov_radio_id: str) -> Radio:
-        """Get full radio details by id."""
-        if prov_radio_id not in RADIO_PARADISE_CHANNELS:
-            raise MediaNotFoundError("Station not found")
-
-        return self._parse_radio(prov_radio_id)
-
-    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
-        """Get streamdetails for a radio station."""
-        if media_type != MediaType.RADIO:
-            raise UnplayableMediaError(f"Unsupported media type: {media_type}")
-        if item_id not in RADIO_PARADISE_CHANNELS:
-            raise MediaNotFoundError(f"Unknown radio channel: {item_id}")
-
-        stream_url = self._build_stream_url(item_id)
-        if not stream_url:
-            raise UnplayableMediaError(f"No stream URL found for channel {item_id}")
-
-        # Get content type from channel configuration
-        channel_info = RADIO_PARADISE_CHANNELS[item_id]
-        content_type = channel_info["content_type"]
-
-        stream_details = StreamDetails(
-            item_id=item_id,
-            provider=self.lookup_key,
-            audio_format=AudioFormat(
-                content_type=content_type,
-                channels=2,
-            ),
-            media_type=MediaType.RADIO,
-            stream_type=StreamType.HTTP,
-            path=stream_url,
-            allow_seek=False,
-            can_seek=False,
-            duration=0,
-        )
-
-        # Set initial metadata if available
-        metadata = await self._get_channel_metadata(item_id)
-        if metadata and metadata.get("current"):
-            current_song = metadata["current"]
-            stream_details.stream_metadata = self._build_stream_metadata(current_song, metadata)
-
-        # Store the monitoring task in streamdetails.data for cleanup in on_streamed
-        monitor_task = self.mass.create_task(self._monitor_stream_metadata(stream_details))
-        stream_details.data = {"monitor_task": monitor_task}
-
-        return stream_details
-
-    async def on_streamed(self, streamdetails: StreamDetails) -> None:
-        """Handle callback when given streamdetails completed streaming."""
-        self.logger.debug(
-            f"Radio Paradise channel {streamdetails.item_id} streamed for "
-            f"{streamdetails.seconds_streamed} seconds"
-        )
-
-        # Cancel and clean up the monitoring task
-        if "monitor_task" in streamdetails.data:
-            monitor_task = streamdetails.data["monitor_task"]
-            if not monitor_task.done():
-                monitor_task.cancel()
-                with contextlib.suppress(asyncio.CancelledError):
-                    await monitor_task
-            del streamdetails.data["monitor_task"]
-
-    async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
-        """Browse this provider's items."""
-        return [self._parse_radio(channel_id) for channel_id in RADIO_PARADISE_CHANNELS]
-
-    def _parse_radio(self, channel_id: str) -> Radio:
-        """Create a Radio object from cached channel information."""
-        channel_info = RADIO_PARADISE_CHANNELS.get(channel_id, {})
-
-        radio = Radio(
-            provider=self.lookup_key,
-            item_id=channel_id,
-            name=channel_info.get("name", "Unknown Radio"),
-            provider_mappings={
-                ProviderMapping(
-                    provider_domain=self.domain,
-                    provider_instance=self.instance_id,
-                    item_id=channel_id,
-                    available=True,
-                )
-            },
-        )
-
-        # Add static station icon
-        station_icon = channel_info.get("station_icon")
-        if station_icon:
-            icon_url = f"{STATION_ICONS_BASE_URL}/{station_icon}"
-            radio.metadata.add_image(
-                MediaItemImage(
-                    provider=self.lookup_key,
-                    type=ImageType.THUMB,
-                    path=icon_url,
-                    remotely_accessible=True,
-                )
-            )
-
-        return radio
-
-    async def _get_channel_metadata(self, channel_id: str) -> dict[str, Any] | None:
-        """Get current track and upcoming tracks from Radio Paradise's block API.
-
-        Args:
-            channel_id: Radio Paradise channel ID (0-5)
-
-        Returns:
-            Dict with current song, next song, and block data, or None if API fails
-        """
-        if channel_id not in RADIO_PARADISE_CHANNELS:
-            return None
-
-        try:
-            # Use block API for much richer data
-            api_url = (
-                f"https://api.radioparadise.com/api/get_block?bitrate=4&info=true&chan={channel_id}"
-            )
-            timeout = aiohttp.ClientTimeout(total=10)
-
-            async with self.mass.http_session.get(api_url, timeout=timeout) as response:
-                if response.status != 200:
-                    self.logger.debug(f"Block API call failed with status {response.status}")
-                    return None
-
-                data = await response.json()
-
-                # Find currently playing song based on elapsed time
-                current_time_ms = self._get_current_block_position(data)
-                current_song = self.find_current_song(data.get("song", {}), current_time_ms)
-
-                if not current_song:
-                    self.logger.debug(f"No current song found for channel {channel_id}")
-                    return None
-
-                # Get next song
-                next_song = self._get_next_song(data.get("song", {}), current_song)
-
-                return {"current": current_song, "next": next_song, "block_data": data}
-
-        except aiohttp.ClientError as exc:
-            self.logger.debug(f"Failed to get block metadata for channel {channel_id}: {exc}")
-            return None
-        except Exception as exc:
-            self.logger.debug(
-                f"Unexpected error getting block metadata for channel {channel_id}: {exc}"
-            )
-            return None
-
-    def _get_current_block_position(self, block_data: dict[str, Any]) -> int:
-        """Calculate current playback position within a Radio Paradise block.
-
-        Args:
-            block_data: Block data containing sched_time_millis
-
-        Returns:
-            Current position in milliseconds from block start
-        """
-        current_time_ms = int(time.time() * 1000)
-        sched_time = int(block_data.get("sched_time_millis", current_time_ms))
-        return current_time_ms - sched_time
-
-    def find_current_song(
-        self, songs: dict[str, dict[str, Any]], current_time_ms: int
-    ) -> dict[str, Any] | None:
-        """Find which song should currently be playing based on elapsed time.
-
-        Args:
-            songs: Dictionary of songs from Radio Paradise block data
-            current_time_ms: Current position in milliseconds within the block
-
-        Returns:
-            The song dict that should be playing now, or None if not found
-        """
-        sorted_keys = sorted(songs.keys(), key=int)
-
-        for song_key in sorted_keys:
-            song = songs[song_key]
-            song_start = int(song.get("elapsed", 0))
-            song_duration = int(song.get("duration", 0))
-            song_end = song_start + song_duration
-
-            if song_start <= current_time_ms < song_end:
-                return song
-
-        # If no exact match, return first song
-        first_song = songs.get("0")
-        return first_song if first_song is not None else {}
-
-    def _get_next_song(
-        self, songs: dict[str, Any], current_song: dict[str, Any]
-    ) -> dict[str, Any] | None:
-        """Get the next song that will play after the current song.
-
-        Args:
-            songs: Dictionary of songs from Radio Paradise block data
-            current_song: The currently playing song dictionary
-
-        Returns:
-            The next song dict, or None if no next song found
-        """
-        current_event = current_song.get("event")
-        current_elapsed = int(current_song.get("elapsed", 0))
-        sorted_keys = sorted(songs.keys(), key=int)
-
-        for song_key in sorted_keys:
-            song = cast("dict[str, Any]", songs[song_key])
-            if song.get("event") != current_event and int(song.get("elapsed", 0)) > current_elapsed:
-                return song
-        return None
-
-    def _build_stream_url(self, channel_id: str) -> str:
-        """Build the streaming URL for a Radio Paradise channel.
-
-        Args:
-            channel_id: Radio Paradise channel ID (0-5)
-
-        Returns:
-            Streaming URL for the channel, or empty string if not found
-        """
-        if channel_id not in RADIO_PARADISE_CHANNELS:
-            return ""
-
-        channel_info = RADIO_PARADISE_CHANNELS[channel_id]
-        return str(channel_info.get("stream_url", ""))
-
-    async def _monitor_stream_metadata(self, stream_details: StreamDetails) -> None:
-        """Monitor and update stream metadata in real-time during playback.
-
-        Fetches current track info from Radio Paradise's API every 10 seconds
-        and updates StreamDetails with track metadata and upcoming songs.
-
-        Args:
-            stream_details: StreamDetails object to update with metadata
-        """
-        last_track_event = ""
-        item_id = stream_details.item_id
-
-        try:
-            while True:
-                metadata = await self._get_channel_metadata(item_id)
-                if metadata and metadata.get("current"):
-                    current_song = metadata["current"]
-                    current_event = current_song.get("event", "")
-
-                    if current_event != last_track_event:
-                        # Create StreamMetadata object with full track info
-                        stream_metadata = self._build_stream_metadata(current_song, metadata)
-
-                        self.logger.debug(
-                            f"Updating stream metadata for {item_id}: "
-                            f"{stream_metadata.artist} - {stream_metadata.title}"
-                        )
-                        stream_details.stream_metadata = stream_metadata
-
-                        last_track_event = current_event
-
-                await asyncio.sleep(15)
-        except asyncio.CancelledError:
-            self.logger.debug(f"Monitor task cancelled for {item_id}")
-        except aiohttp.ClientError as exc:
-            self.logger.debug(f"Network error while monitoring {item_id}: {exc}")
-        except Exception as exc:
-            self.logger.warning(f"Unexpected error monitoring {item_id}: {exc}")
-
-    def _build_stream_metadata(
-        self, current_song: dict[str, Any], metadata: dict[str, Any]
-    ) -> StreamMetadata:
-        """Build StreamMetadata with current track info and upcoming tracks.
-
-        Args:
-            current_song: Current track data from Radio Paradise API
-            metadata: Full metadata response with next song and block data
-
-        Returns:
-            StreamMetadata with track info and upcoming track previews
-        """
-        # Extract track info
-        artist = current_song.get("artist", "Unknown Artist")
-        title = current_song.get("title", "Unknown Title")
-        album = current_song.get("album")
-        year = current_song.get("year")
-
-        # Build album string with year if available
-        album_display = album
-        if album and year:
-            album_display = f"{album} ({year})"
-        elif year:
-            album_display = str(year)
-
-        # Get cover image URL
-        cover_path = current_song.get("cover")
-        image_url = None
-        if cover_path:
-            image_url = f"https://img.radioparadise.com/{cover_path}"
-
-        # Debug log the image URL
-        self.logger.debug(f"Cover art URL for {artist} - {title}: {image_url}")
-
-        # Get track duration
-        duration = current_song.get("duration")
-        if duration:
-            duration = int(duration) // 1000  # Convert from ms to seconds
-
-        # Add upcoming tracks info to title for scrolling display
-        next_song = metadata.get("next")
-        block_data = metadata.get("block_data")
-        enhanced_title = self._enhance_title_with_upcoming(
-            title, current_song, next_song, block_data
-        )
-
-        return StreamMetadata(
-            title=enhanced_title,
-            artist=artist,
-            album=album_display,
-            image_url=image_url,
-            duration=duration,
-        )
-
-    def _enhance_title_with_upcoming(
-        self,
-        title: str,
-        current_song: dict[str, Any],
-        next_song: dict[str, Any] | None,
-        block_data: dict[str, Any] | None,
-    ) -> str:
-        """Enhance track title with upcoming track info for scrolling display.
-
-        Args:
-            title: Original track title
-            current_song: Current track data
-            next_song: Next track data, or None if not available
-            block_data: Full block data with all upcoming tracks
-
-        Returns:
-            Enhanced title with "Up Next" and "Later" information appended
-        """
-        enhanced_title = title
-
-        # Add next track info
-        if next_song:
-            next_artist = next_song.get("artist", "")
-            next_title = next_song.get("title", "")
-            if next_artist and next_title:
-                enhanced_title += f" | Up Next: {next_artist} - {next_title}"
-
-        # Add later artists in a single pass with deduplication
-        if block_data and "song" in block_data:
-            current_event = current_song.get("event")
-            current_elapsed = int(current_song.get("elapsed", 0))
-            next_event = next_song.get("event") if next_song else None
-
-            # Use set to deduplicate artist names (including next_song artist)
-            seen_artists = set()
-            if next_song:
-                next_artist = next_song.get("artist", "")
-                if next_artist:
-                    seen_artists.add(next_artist)
-
-            later_artists = []
-            sorted_keys = sorted(block_data["song"].keys(), key=int)
-            for song_key in sorted_keys:
-                song = block_data["song"][song_key]
-                song_event = song.get("event")
-
-                # Skip current and next song, only include songs that come after current
-                if (
-                    song_event not in (current_event, next_event)
-                    and int(song.get("elapsed", 0)) > current_elapsed
-                ):
-                    artist_name = song.get("artist", "")
-                    if artist_name and artist_name not in seen_artists:
-                        seen_artists.add(artist_name)
-                        later_artists.append(artist_name)
-                        if len(later_artists) >= 4:  # Limit to 4 artists
-                            break
-
-            if later_artists:
-                artists_list = ", ".join(later_artists)
-                enhanced_title += f" | Later: {artists_list}"
-
-        return enhanced_title
diff --git a/music_assistant/providers/radioparadise/constants.py b/music_assistant/providers/radioparadise/constants.py
new file mode 100644 (file)
index 0000000..da23d0e
--- /dev/null
@@ -0,0 +1,62 @@
+"""Constants for Radio Paradise provider."""
+
+from typing import Any
+
+from music_assistant_models.enums import ContentType
+
+# Base URL for station icons
+STATION_ICONS_BASE_URL = (
+    "https://raw.githubusercontent.com/music-assistant/music-assistant.io/main/docs/assets/icons"
+)
+
+# Radio Paradise channel configurations with hardcoded channels
+RADIO_PARADISE_CHANNELS: dict[str, dict[str, Any]] = {
+    "0": {
+        "name": "Radio Paradise - Main Mix",
+        "description": "Eclectic mix of music - hand-picked by real humans",
+        "stream_url": "https://stream.radioparadise.com/flac",
+        "content_type": ContentType.FLAC,
+        "api_url": "https://api.radioparadise.com/api/now_playing",
+        "station_icon": "radioparadise-logo-main.png",
+    },
+    "1": {
+        "name": "Radio Paradise - Mellow Mix",
+        "description": "A mellower selection from the RP music library",
+        "stream_url": "https://stream.radioparadise.com/mellow-flac",
+        "content_type": ContentType.FLAC,
+        "api_url": "https://api.radioparadise.com/api/now_playing?chan=1",
+        "station_icon": "radioparadise-logo-mellow.png",
+    },
+    "2": {
+        "name": "Radio Paradise - Rock Mix",
+        "description": "Heavier selections from the RP music library",
+        "stream_url": "https://stream.radioparadise.com/rock-flac",
+        "content_type": ContentType.FLAC,
+        "api_url": "https://api.radioparadise.com/api/now_playing?chan=2",
+        "station_icon": "radioparadise-logo-rock.png",
+    },
+    "3": {
+        "name": "Radio Paradise - Global",
+        "description": "Global music and experimental selections",
+        "stream_url": "https://stream.radioparadise.com/global-flac",
+        "content_type": ContentType.FLAC,
+        "api_url": "https://api.radioparadise.com/api/now_playing?chan=3",
+        "station_icon": "radioparadise-logo-global.png",
+    },
+    "4": {
+        "name": "Radio Paradise - Beyond",
+        "description": "Exploring the frontiers of improvisational music",
+        "stream_url": "https://stream.radioparadise.com/beyond-flac",
+        "content_type": ContentType.FLAC,
+        "api_url": "https://api.radioparadise.com/api/now_playing?chan=4",
+        "station_icon": "radioparadise-logo-beyond.png",
+    },
+    "5": {
+        "name": "Radio Paradise - Serenity",
+        "description": "Don't panic, and don't forget your towel",
+        "stream_url": "https://stream.radioparadise.com/serenity",
+        "content_type": ContentType.AAC,
+        "api_url": "https://api.radioparadise.com/api/now_playing?chan=5",
+        "station_icon": "radioparadise-logo-serenity.png",
+    },
+}
diff --git a/music_assistant/providers/radioparadise/helpers.py b/music_assistant/providers/radioparadise/helpers.py
new file mode 100644 (file)
index 0000000..728cb95
--- /dev/null
@@ -0,0 +1,149 @@
+"""Helper functions for Radio Paradise provider."""
+
+import time
+from typing import Any, cast
+
+from .constants import RADIO_PARADISE_CHANNELS
+
+
+def get_current_block_position(block_data: dict[str, Any]) -> int:
+    """Calculate current playback position within a Radio Paradise block.
+
+    Args:
+        block_data: Block data containing sched_time_millis
+
+    Returns:
+        Current position in milliseconds from block start
+    """
+    current_time_ms = int(time.time() * 1000)
+    sched_time = int(block_data.get("sched_time_millis", current_time_ms))
+    return current_time_ms - sched_time
+
+
+def find_current_song(
+    songs: dict[str, dict[str, Any]], current_time_ms: int
+) -> dict[str, Any] | None:
+    """Find which song should currently be playing based on elapsed time.
+
+    Args:
+        songs: Dictionary of songs from Radio Paradise block data
+        current_time_ms: Current position in milliseconds within the block
+
+    Returns:
+        The song dict that should be playing now, or None if not found
+    """
+    sorted_keys = sorted(songs.keys(), key=int)
+
+    for song_key in sorted_keys:
+        song = songs[song_key]
+        song_start = int(song.get("elapsed", 0))
+        song_duration = int(song.get("duration", 0))
+        song_end = song_start + song_duration
+
+        if song_start <= current_time_ms < song_end:
+            return song
+
+    # If no exact match, return first song
+    first_song = songs.get("0")
+    return first_song if first_song is not None else {}
+
+
+def get_next_song(songs: dict[str, Any], current_song: dict[str, Any]) -> dict[str, Any] | None:
+    """Get the next song that will play after the current song.
+
+    Args:
+        songs: Dictionary of songs from Radio Paradise block data
+        current_song: The currently playing song dictionary
+
+    Returns:
+        The next song dict, or None if no next song found
+    """
+    current_event = current_song.get("event")
+    current_elapsed = int(current_song.get("elapsed", 0))
+    sorted_keys = sorted(songs.keys(), key=int)
+
+    for song_key in sorted_keys:
+        song = cast("dict[str, Any]", songs[song_key])
+        if song.get("event") != current_event and int(song.get("elapsed", 0)) > current_elapsed:
+            return song
+    return None
+
+
+def build_stream_url(channel_id: str) -> str:
+    """Build the streaming URL for a Radio Paradise channel.
+
+    Args:
+        channel_id: Radio Paradise channel ID (0-5)
+
+    Returns:
+        Streaming URL for the channel, or empty string if not found
+    """
+    if channel_id not in RADIO_PARADISE_CHANNELS:
+        return ""
+
+    channel_info = RADIO_PARADISE_CHANNELS[channel_id]
+    return str(channel_info.get("stream_url", ""))
+
+
+def enhance_title_with_upcoming(
+    title: str,
+    current_song: dict[str, Any],
+    next_song: dict[str, Any] | None,
+    block_data: dict[str, Any] | None,
+) -> str:
+    """Enhance track title with upcoming track info for scrolling display.
+
+    Args:
+        title: Original track title
+        current_song: Current track data
+        next_song: Next track data, or None if not available
+        block_data: Full block data with all upcoming tracks
+
+    Returns:
+        Enhanced title with "Up Next" and "Later" information appended
+    """
+    enhanced_title = title
+
+    # Add next track info
+    if next_song:
+        next_artist = next_song.get("artist", "")
+        next_title = next_song.get("title", "")
+        if next_artist and next_title:
+            enhanced_title += f" | Up Next: {next_artist} - {next_title}"
+
+    # Add later artists in a single pass with deduplication
+    if block_data and "song" in block_data:
+        current_event = current_song.get("event")
+        current_elapsed = int(current_song.get("elapsed", 0))
+        next_event = next_song.get("event") if next_song else None
+
+        # Use set to deduplicate artist names (including next_song artist)
+        seen_artists = set()
+        if next_song:
+            next_artist = next_song.get("artist", "")
+            if next_artist:
+                seen_artists.add(next_artist)
+
+        later_artists = []
+        sorted_keys = sorted(block_data["song"].keys(), key=int)
+        for song_key in sorted_keys:
+            song = block_data["song"][song_key]
+            song_event = song.get("event")
+
+            # Skip current and next song, only include songs that come after current
+            if (
+                song_event not in (current_event, next_event)
+                and int(song.get("elapsed", 0)) > current_elapsed
+            ):
+                artist_name = song.get("artist", "")
+                if artist_name and artist_name not in seen_artists:
+                    seen_artists.add(artist_name)
+                    later_artists.append(artist_name)
+                    if len(later_artists) >= 4:  # Limit to 4 artists
+                        break
+
+        if later_artists:
+            artists_list = ", ".join(later_artists)
+            enhanced_title += f" | Later: {artists_list}"
+
+    return enhanced_title
diff --git a/music_assistant/providers/radioparadise/parsers.py b/music_assistant/providers/radioparadise/parsers.py
new file mode 100644 (file)
index 0000000..93ced25
--- /dev/null
@@ -0,0 +1,98 @@
+"""Parsers for Radio Paradise provider."""
+
+from typing import Any
+
+from music_assistant_models.enums import ImageType
+from music_assistant_models.media_items import (
+    MediaItemImage,
+    ProviderMapping,
+    Radio,
+)
+from music_assistant_models.streamdetails import StreamMetadata
+
+from .constants import RADIO_PARADISE_CHANNELS, STATION_ICONS_BASE_URL
+from .helpers import enhance_title_with_upcoming
+
+
+def parse_radio(
+    channel_id: str, provider_lookup_key: str, provider_domain: str, instance_id: str
+) -> Radio:
+    """Create a Radio object from cached channel information."""
+    channel_info = RADIO_PARADISE_CHANNELS.get(channel_id, {})
+
+    radio = Radio(
+        provider=provider_lookup_key,
+        item_id=channel_id,
+        name=channel_info.get("name", "Unknown Radio"),
+        provider_mappings={
+            ProviderMapping(
+                provider_domain=provider_domain,
+                provider_instance=instance_id,
+                item_id=channel_id,
+                available=True,
+            )
+        },
+    )
+
+    # Add static station icon
+    station_icon = channel_info.get("station_icon")
+    if station_icon:
+        icon_url = f"{STATION_ICONS_BASE_URL}/{station_icon}"
+        radio.metadata.add_image(
+            MediaItemImage(
+                provider=provider_lookup_key,
+                type=ImageType.THUMB,
+                path=icon_url,
+                remotely_accessible=True,
+            )
+        )
+
+    return radio
+
+
+def build_stream_metadata(current_song: dict[str, Any], metadata: dict[str, Any]) -> StreamMetadata:
+    """Build StreamMetadata with current track info and upcoming tracks.
+
+    Args:
+        current_song: Current track data from Radio Paradise API
+        metadata: Full metadata response with next song and block data
+
+    Returns:
+        StreamMetadata with track info and upcoming track previews
+    """
+    # Extract track info
+    artist = current_song.get("artist", "Unknown Artist")
+    title = current_song.get("title", "Unknown Title")
+    album = current_song.get("album")
+    year = current_song.get("year")
+
+    # Build album string with year if available
+    album_display = album
+    if album and year:
+        album_display = f"{album} ({year})"
+    elif year:
+        album_display = str(year)
+
+    # Get cover image URL
+    cover_path = current_song.get("cover")
+    image_url = None
+    if cover_path:
+        image_url = f"https://img.radioparadise.com/{cover_path}"
+
+    # Get track duration
+    duration = current_song.get("duration")
+    if duration:
+        duration = int(duration) // 1000  # Convert from ms to seconds
+
+    # Add upcoming tracks info to title for scrolling display
+    next_song = metadata.get("next")
+    block_data = metadata.get("block_data")
+    enhanced_title = enhance_title_with_upcoming(title, current_song, next_song, block_data)
+
+    return StreamMetadata(
+        title=enhanced_title,
+        artist=artist,
+        album=album_display,
+        image_url=image_url,
+        duration=duration,
+    )
diff --git a/music_assistant/providers/radioparadise/provider.py b/music_assistant/providers/radioparadise/provider.py
new file mode 100644 (file)
index 0000000..5e209a6
--- /dev/null
@@ -0,0 +1,222 @@
+"""Radio Paradise Music Provider for Music Assistant."""
+
+from __future__ import annotations
+
+import asyncio
+import contextlib
+from collections.abc import AsyncGenerator, Sequence
+from typing import TYPE_CHECKING, Any
+
+import aiohttp
+from music_assistant_models.enums import (
+    MediaType,
+    ProviderFeature,
+    StreamType,
+)
+from music_assistant_models.errors import MediaNotFoundError, UnplayableMediaError
+from music_assistant_models.media_items import (
+    AudioFormat,
+    BrowseFolder,
+    ItemMapping,
+    MediaItemType,
+    Radio,
+)
+from music_assistant_models.streamdetails import StreamDetails
+
+from music_assistant.models.music_provider import MusicProvider
+
+from . import parsers
+from .constants import RADIO_PARADISE_CHANNELS
+from .helpers import build_stream_url, find_current_song, get_current_block_position, get_next_song
+
+if TYPE_CHECKING:
+    from music_assistant_models.config_entries import ProviderConfig
+    from music_assistant_models.provider import ProviderManifest
+
+    from music_assistant import MusicAssistant
+
+
+class RadioParadiseProvider(MusicProvider):
+    """Radio Paradise Music Provider for Music Assistant."""
+
+    def __init__(self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig):
+        """Initialize the provider."""
+        super().__init__(mass, manifest, config)
+
+    @property
+    def supported_features(self) -> set[ProviderFeature]:
+        """Return the features supported by this Provider."""
+        return {
+            ProviderFeature.BROWSE,
+            ProviderFeature.LIBRARY_RADIOS,
+        }
+
+    @property
+    def is_streaming_provider(self) -> bool:
+        """Return True if the provider is a streaming provider."""
+        return True
+
+    async def get_library_radios(self) -> AsyncGenerator[Radio, None]:
+        """Retrieve library/subscribed radio stations from the provider."""
+        for channel_id in RADIO_PARADISE_CHANNELS:
+            yield self._parse_radio(channel_id)
+
+    async def get_radio(self, prov_radio_id: str) -> Radio:
+        """Get full radio details by id."""
+        if prov_radio_id not in RADIO_PARADISE_CHANNELS:
+            raise MediaNotFoundError("Station not found")
+
+        return self._parse_radio(prov_radio_id)
+
+    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
+        """Get streamdetails for a radio station."""
+        if media_type != MediaType.RADIO:
+            raise UnplayableMediaError(f"Unsupported media type: {media_type}")
+        if item_id not in RADIO_PARADISE_CHANNELS:
+            raise MediaNotFoundError(f"Unknown radio channel: {item_id}")
+
+        stream_url = build_stream_url(item_id)
+        if not stream_url:
+            raise UnplayableMediaError(f"No stream URL found for channel {item_id}")
+
+        # Get content type from channel configuration
+        channel_info = RADIO_PARADISE_CHANNELS[item_id]
+        content_type = channel_info["content_type"]
+
+        stream_details = StreamDetails(
+            item_id=item_id,
+            provider=self.lookup_key,
+            audio_format=AudioFormat(
+                content_type=content_type,
+                channels=2,
+            ),
+            media_type=MediaType.RADIO,
+            stream_type=StreamType.HTTP,
+            path=stream_url,
+            allow_seek=False,
+            can_seek=False,
+            duration=0,
+        )
+
+        # Set initial metadata if available
+        metadata = await self._get_channel_metadata(item_id)
+        if metadata and metadata.get("current"):
+            current_song = metadata["current"]
+            stream_details.stream_metadata = parsers.build_stream_metadata(current_song, metadata)
+
+        # Store the monitoring task in streamdetails.data for cleanup in on_streamed
+        monitor_task = self.mass.create_task(self._monitor_stream_metadata(stream_details))
+        stream_details.data = {"monitor_task": monitor_task}
+
+        return stream_details
+
+    async def on_streamed(self, streamdetails: StreamDetails) -> None:
+        """Handle callback when given streamdetails completed streaming."""
+        self.logger.debug(
+            f"Radio Paradise channel {streamdetails.item_id} streamed for "
+            f"{streamdetails.seconds_streamed} seconds"
+        )
+
+        # Cancel and clean up the monitoring task
+        if "monitor_task" in streamdetails.data:
+            monitor_task = streamdetails.data["monitor_task"]
+            if not monitor_task.done():
+                monitor_task.cancel()
+                with contextlib.suppress(asyncio.CancelledError):
+                    await monitor_task
+            del streamdetails.data["monitor_task"]
+
+    async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
+        """Browse this provider's items."""
+        return [self._parse_radio(channel_id) for channel_id in RADIO_PARADISE_CHANNELS]
+
+    def _parse_radio(self, channel_id: str) -> Radio:
+        """Create a Radio object from cached channel information."""
+        return parsers.parse_radio(channel_id, self.lookup_key, self.domain, self.instance_id)
+
+    async def _get_channel_metadata(self, channel_id: str) -> dict[str, Any] | None:
+        """Get current track and upcoming tracks from Radio Paradise's block API.
+
+        Args:
+            channel_id: Radio Paradise channel ID (0-5)
+
+        Returns:
+            Dict with current song, next song, and block data, or None if API fails
+        """
+        if channel_id not in RADIO_PARADISE_CHANNELS:
+            return None
+
+        try:
+            # Use block API for much richer data
+            api_url = (
+                f"https://api.radioparadise.com/api/get_block?bitrate=4&info=true&chan={channel_id}"
+            )
+            timeout = aiohttp.ClientTimeout(total=10)
+
+            async with self.mass.http_session.get(api_url, timeout=timeout) as response:
+                if response.status != 200:
+                    self.logger.debug(f"Block API call failed with status {response.status}")
+                    return None
+
+                data = await response.json()
+
+                # Find currently playing song based on elapsed time
+                current_time_ms = get_current_block_position(data)
+                current_song = find_current_song(data.get("song", {}), current_time_ms)
+
+                if not current_song:
+                    self.logger.debug(f"No current song found for channel {channel_id}")
+                    return None
+
+                # Get next song
+                next_song = get_next_song(data.get("song", {}), current_song)
+
+                return {"current": current_song, "next": next_song, "block_data": data}
+
+        except aiohttp.ClientError as exc:
+            self.logger.debug(f"Failed to get block metadata for channel {channel_id}: {exc}")
+            return None
+        except Exception as exc:
+            self.logger.debug(
+                f"Unexpected error getting block metadata for channel {channel_id}: {exc}"
+            )
+            return None
+
+    async def _monitor_stream_metadata(self, stream_details: StreamDetails) -> None:
+        """Monitor and update stream metadata in real-time during playback.
+
+        Fetches current track info from Radio Paradise's API every 10 seconds
+        and updates StreamDetails with track metadata and upcoming songs.
+
+        Args:
+            stream_details: StreamDetails object to update with metadata
+        """
+        last_track_event = ""
+        item_id = stream_details.item_id
+
+        try:
+            while True:
+                metadata = await self._get_channel_metadata(item_id)
+                if metadata and metadata.get("current"):
+                    current_song = metadata["current"]
+                    current_event = current_song.get("event", "")
+
+                    if current_event != last_track_event:
+                        # Create StreamMetadata object with full track info
+                        stream_metadata = parsers.build_stream_metadata(current_song, metadata)
+
+                        self.logger.debug(
+                            f"Updating stream metadata for {item_id}: "
+                            f"{stream_metadata.artist} - {stream_metadata.title}"
+                        )
+                        stream_details.stream_metadata = stream_metadata
+
+                        last_track_event = current_event
+
+                await asyncio.sleep(15)
+        except asyncio.CancelledError:
+            self.logger.debug(f"Monitor task cancelled for {item_id}")
+        except aiohttp.ClientError as exc:
+            self.logger.debug(f"Network error while monitoring {item_id}: {exc}")
+        except Exception as exc:
+            self.logger.warning(f"Unexpected error monitoring {item_id}: {exc}")