SearchResults,
Track,
)
+from music_assistant_models.streamdetails import StreamMetadata
from music_assistant_models.unique_list import UniqueList
import music_assistant.helpers.datetime as dt
Menu,
MenuRecommendationOptions,
PlayStatus,
+ RadioShow,
+ Segment,
SoundsClient,
exceptions,
)
+from sounds import PodcastEpisode as SoundsPodcastEpisode
SUPPORTED_FEATURES = {
ProviderFeature.BROWSE,
ProviderFeature.SEARCH,
}
-FEATURES = {"now_playing": False, "catchup_segments": False, "check_blank_image": False}
+FEATURES = {"now_playing": True, "catchup_segments": True, "check_blank_image": False}
type _StreamTypes = Literal["hls", "dash"]
raise MusicAssistantError(f"Incorrect format for podcast episode {prov_episode_id}")
return ma_episode
+ async def _get_playable_stream_details(
+ self, item_id: str, media_type: MediaType
+ ) -> StreamDetails:
+ episode_info = await self.client.streaming.get_by_pid(
+ item_id, include_stream=True, stream_format=self.stream_format
+ )
+ stream_details = await self.adaptor.new_streamable_object(episode_info)
+ if not stream_details:
+ raise self._stream_error(item_id, media_type)
+
+ if episode_info and FEATURES["catchup_segments"]:
+ stream_details.data = {"vpid": episode_info.id}
+ stream_details.stream_metadata_update_callback = self._update_on_demand_stream_metadata
+ stream_details.stream_metadata_update_interval = _Constants.NOW_PLAYING_REFRESH_TIME
+ return stream_details
+
+ async def _get_station_stream_details(self, item_id: str) -> StreamDetails:
+ self.logger.debug(f"Getting stream details for station {item_id}")
+ station = await self.client.stations.get_station(
+ item_id, include_stream=True, stream_format=self.stream_format
+ )
+ if not station:
+ raise MusicAssistantError(f"Couldn't get stream details for station {item_id}")
+
+ self.logger.debug(f"Found station: {station}")
+ if not station.stream:
+ raise MusicAssistantError(f"No stream found for {item_id}")
+
+ stream_details = await self.adaptor.new_streamable_object(station)
+
+ if not stream_details:
+ raise self._stream_error(item_id, MediaType.RADIO)
+
+ if FEATURES["now_playing"]:
+ stream_details.stream_metadata_update_callback = self._update_live_stream_metadata
+ stream_details.stream_metadata_update_interval = _Constants.NOW_PLAYING_REFRESH_TIME
+ return stream_details
+
async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get streamdetails for a track/radio."""
self.logger.debug(f"Getting stream details for {item_id} ({media_type})")
- if media_type == MediaType.PODCAST_EPISODE:
- episode_info = await self.client.streaming.get_by_pid(
- item_id, include_stream=True, stream_format=self.stream_format
- )
- stream_details = await self.adaptor.new_streamable_object(episode_info)
- if not stream_details:
- raise self._stream_error(item_id, media_type)
-
- # Hide behind a feature flag until it can be better tested
- if episode_info and FEATURES["catchup_segments"]:
- # .item_id is the VPID
- self.current_task = self.mass.create_task(
- self._check_for_segments(item_id, stream_details)
- )
- return stream_details
- elif media_type is MediaType.TRACK:
- track = await self.client.streaming.get_by_pid(
- item_id, include_stream=True, stream_format=self.stream_format
- )
- stream_details = await self.adaptor.new_streamable_object(track)
- if not stream_details:
- raise self._stream_error(item_id, media_type)
- self.current_task = self.mass.create_task(
- self._check_for_segments(item_id, stream_details)
- )
- return stream_details
+ if media_type in [MediaType.PODCAST_EPISODE, MediaType.TRACK]:
+ return await self._get_playable_stream_details(item_id, media_type)
else:
- self.logger.debug(f"Getting stream details for station {item_id}")
- station = await self.client.stations.get_station(
- item_id, include_stream=True, stream_format=self.stream_format
- )
- if not station:
- raise MusicAssistantError(f"Couldn't get stream details for station {item_id}")
-
- self.logger.debug(f"Found station: {station}")
- if not station.stream:
- raise MusicAssistantError(f"No stream found for {item_id}")
+ return await self._get_station_stream_details(item_id)
- stream_details = await self.adaptor.new_streamable_object(station)
+ async def _get_programme_segments(self, vpid: str) -> list[Segment] | None:
+ """Get on demand segments from cache or API."""
+ segments = await self.mass.cache.get(
+ provider=self.domain, key=f"programme_segments_{vpid}", default=False
+ )
+ if segments is False:
+ segments = await self.client.streaming.get_show_segments(vpid)
+ await self.mass.cache.set(
+ provider=self.domain,
+ key=f"programme_segments_{vpid}",
+ data=segments,
+ )
+ if isinstance(segments, list):
+ return segments
+ return None
- if not stream_details:
- raise self._stream_error(item_id, media_type)
+ async def _update_on_demand_stream_metadata(
+ self, stream_details: StreamDetails, elapsed_time: int
+ ) -> None:
+ """Get the currently playing segment (song) for on-demand episodes.
- # Start a background task to keep these details updated
- if FEATURES["now_playing"]:
- self.current_task = self.mass.create_task(
- self._watch_stream_details(stream_details)
+ Called by the callback function in StreamDetails.
+ """
+ self.logger.debug("Updating on-demand stream metadata")
+ if not stream_details or not stream_details.stream_metadata:
+ return
+ # segments API required vpid which is not the same as pid
+ vpid = stream_details.data.get("vpid")
+ if vpid:
+ segments = await self._get_programme_segments(vpid=vpid)
+
+ if segments and isinstance(segments, list):
+ segment = next(
+ (
+ s
+ for s in segments
+ if s.offset
+ and int(s.offset.get("start")) <= elapsed_time < int(s.offset.get("end"))
+ ),
+ None,
)
- return stream_details
-
- async def _check_for_segments(self, vpid: str, stream_details: StreamDetails) -> None:
- # seeking past the current segment needs fixing
- segments = await self.client.streaming.get_show_segments(vpid)
- offset = stream_details.seek_position + (stream_details.seconds_streamed or 0)
- if segments:
- seconds = 0 + offset
- segments_iter = iter(segments)
- segment = next(segments_iter)
- if seconds > 0:
- # Skip to the correct segment
- prev = None
- while seconds > segment.offset["start"]:
- self.logger.info("Advancing to next segment")
- prev = segment
- segment = next(segments_iter)
- self.logger.warning("Starting with first segment")
- if prev and seconds > prev.offset["start"] and seconds < prev.offset["end"]:
- if stream_details.stream_metadata:
- stream_details.stream_metadata.artist = prev.titles["primary"]
- stream_details.stream_metadata.title = prev.titles["secondary"]
- if prev.image_url:
- stream_details.stream_metadata.image_url = prev.image_url
- while True:
- if seconds == segment.offset["start"] and stream_details.stream_metadata:
- self.logger.warning("Updating segment")
- stream_details.stream_metadata.artist = segment.titles["primary"]
- stream_details.stream_metadata.title = segment.titles["secondary"]
- if segment.image_url:
- stream_details.stream_metadata.image_url = segment.image_url
- segment = next(segments_iter)
- await asyncio.sleep(1)
- seconds += 1
- else:
- self.logger.warning("No segments found")
- async def _watch_stream_details(self, stream_details: StreamDetails) -> None:
- station_id = stream_details.data["station"]
+ if segment:
+ # Currently playing segment found, update metadata
+ stream_details.stream_metadata = self.now_playing_to_stream_metadata(segment)
+ else:
+ # No segment found for current time, reset to main episode info
+ stream_details = await self._get_playable_stream_details(
+ item_id=stream_details.item_id, media_type=stream_details.media_type
+ )
- while True:
- if not stream_details.stream_metadata:
- await asyncio.sleep(_Constants.NOW_PLAYING_REFRESH_TIME)
- continue
+ def now_playing_to_stream_metadata(self, now_playing: Segment) -> StreamMetadata:
+ """Convert now playing segment to StreamMetadata."""
+ title = now_playing.titles.get("secondary", "")
+ artist = now_playing.titles.get("primary", "")
+ image_url = now_playing.image_url
+ if image_url and _Constants.BLANK_IMAGE_NAME in image_url:
+ image_url = None
+ return StreamMetadata(title=title, artist=artist, image_url=image_url)
+
+ async def _update_live_stream_metadata(
+ self, stream_details: StreamDetails, elapsed_time: int
+ ) -> None:
+ """Get the currently playing song for live radio streams."""
+ self.logger.debug("Updating live stream metadata")
+ if not stream_details or not stream_details.stream_metadata:
+ return
+
+ station_id = stream_details.item_id
+ if not station_id:
+ return
+
+ now_playing = await self.client.schedules.currently_playing_song(station_id)
+ if now_playing:
+ self.logger.debug(f"Now playing for {station_id}: {now_playing}")
+ stream_details.stream_metadata = self.now_playing_to_stream_metadata(now_playing)
+ else:
+ self.logger.debug(f"No song playing on {station_id}, fetching station info")
+ station = await self.client.stations.get_station(station_id)
+ if station:
+ stream_details.stream_metadata = await self._station_programme_display(
+ station=station
+ )
- now_playing = await self.client.schedules.currently_playing_song(
- station_id, image_size=_Constants.DEFAULT_IMAGE_SIZE
- )
+ @use_cache(expiration=_Constants.DEFAULT_EXPIRATION)
+ async def _vod_programme_display(self, pid: str) -> StreamMetadata | None:
+ episode = await self.client.streaming.get_by_pid(pid=pid, stream_format=self.stream_format)
+ if isinstance(episode, (SoundsPodcastEpisode, RadioShow)):
+ if episode and episode.titles:
+ return StreamMetadata(title=episode.titles.get("secondary", ""))
+ return None
- if now_playing and stream_details.stream_metadata:
- self.logger.debug(f"Now playing for {station_id}: {now_playing}")
-
- # removed check temporarily as images not working
- if not FEATURES["check_blank_image"] or (
- now_playing.image_url
- and _Constants.BLANK_IMAGE_NAME not in now_playing.image_url
- ):
- stream_details.stream_metadata.image_url = now_playing.image_url
- song = now_playing.titles["secondary"]
- artist = now_playing.titles["primary"]
- stream_details.stream_metadata.title = song
- stream_details.stream_metadata.artist = artist
- elif stream_details.stream_metadata:
- station = await self.client.stations.get_station(station_id=station_id)
- if station:
- self.logger.debug(f"Station details: {station}")
- display = self._station_programme_display(station)
- if display:
- stream_details.stream_metadata.title = display
- stream_details.stream_metadata.artist = None
- stream_details.stream_metadata.image_url = station.image_url
- await asyncio.sleep(_Constants.NOW_PLAYING_REFRESH_TIME)
-
- def _station_programme_display(self, station: LiveStation) -> str | None:
+ @use_cache(expiration=_Constants.DEFAULT_EXPIRATION)
+ async def _station_programme_display(self, station: LiveStation) -> StreamMetadata | None:
if station and station.titles:
- return f"{station.titles.get('secondary')} • {station.titles.get('primary')}"
+ title = f"{station.titles.get('secondary')} • {station.titles.get('primary')}"
+ return StreamMetadata(title=title, artist=None, image_url=station.image_url)
return None
async def _station_list(self, include_local: bool = False) -> list[Radio]:
- return [
- Radio(
- item_id=station.item_id,
- name=(
- station.network.short_title
- if station.network and station.network.short_title
- else "Unknown station"
- ),
- provider=self.domain,
- metadata=MediaItemMetadata(
- description=self._station_programme_display(station=station),
- images=(
- UniqueList(
- [
- MediaItemImage(
- type=ImageType.THUMB,
- provider=self.domain,
- path=station.network.logo_url,
- remotely_accessible=True,
- ),
- ]
- )
- if station.network and station.network.logo_url
- else None
- ),
- ),
- provider_mappings={
- ProviderMapping(
+ """Get list of stations as Radios."""
+ radio_list: list[Radio] = []
+ for station in await self.client.stations.get_stations(include_local=include_local):
+ if station and station.item_id:
+ station_info = await self._station_programme_display(station=station)
+ description = station_info.title if station_info else None
+ radio_list.append(
+ Radio(
item_id=station.item_id,
- provider_domain=self.domain,
- provider_instance=self.instance_id,
+ name=(
+ station.network.short_title
+ if station.network and station.network.short_title
+ else "Unknown station"
+ ),
+ provider=self.domain,
+ metadata=MediaItemMetadata(
+ description=description,
+ images=(
+ UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ provider=self.domain,
+ path=station.network.logo_url,
+ remotely_accessible=True,
+ ),
+ ]
+ )
+ if station.network and station.network.logo_url
+ else None
+ ),
+ ),
+ provider_mappings={
+ ProviderMapping(
+ item_id=station.item_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
)
- },
- )
- for station in await self.client.stations.get_stations(include_local=include_local)
- if station and station.item_id
- ]
+ )
+ return radio_list
async def _get_category(
self, category_name: str