async def get_library_radios(self) -> List[Radio]:
"""Retrieve library/subscribed radio stations from the provider."""
+ result = []
params = {"c": "presets"}
- result = await self._get_data("Browse.ashx", params)
- if result and "body" in result:
- return [
- await self._parse_radio(item)
- for item in result["body"]
- if item.get("type", "") == "audio"
- ]
- return []
+ radios = await self._get_data("Browse.ashx", params)
+ if radios and "body" in radios:
+ for radio in radios["body"]:
+ if radio.get("type", "") != "audio":
+ continue
+ # each radio station can have multiple streams add each one as different quality
+ params = {"id": radio["preset_id"]}
+ stream_info = await self._get_data("Tune.ashx", params)
+ for stream in stream_info["body"]:
+ result.append(await self._parse_radio(radio, stream))
+ return result
async def get_radio(self, prov_radio_id: str) -> Radio:
"""Get radio station details."""
prov_radio_id = prov_radio_id.split("--")[0]
- radio = None
+ media_type = prov_radio_id.split("--")[1]
params = {"c": "composite", "detail": "listing", "id": prov_radio_id}
result = await self._get_data("Describe.ashx", params)
if result and result.get("body") and result["body"][0].get("children"):
item = result["body"][0]["children"][0]
- radio = await self._parse_radio(item)
- return radio
+ stream_info = await self._get_data("Tune.ashx", {"id": prov_radio_id})
+ for stream in stream_info["body"]:
+ if stream["media_type"] != media_type:
+ continue
+ return await self._parse_radio(item, stream)
+ return None
- async def _parse_radio(self, details: dict) -> Radio:
+ async def _parse_radio(self, details: dict, stream: dict) -> Radio:
"""Parse Radio object from json obj returned from api."""
if "name" in details:
name = details["name"]
if " | " in name:
name = name.split(" | ")[1]
name = name.split(" (")[0]
- radio = Radio(item_id=details["preset_id"], provider=self.id, name=name)
- # parse stream urls and format
- stream_info = await self._get_stream_urls(radio.item_id)
- for stream in stream_info["body"]:
- if stream["media_type"] == "aac":
- quality = MediaQuality.LOSSY_AAC
- elif stream["media_type"] == "ogg":
- quality = MediaQuality.LOSSY_OGG
- else:
- quality = MediaQuality.LOSSY_MP3
- radio.add_provider_id(
- MediaItemProviderId(
- provider=self.id,
- item_id=f'{details["preset_id"]}--{stream["media_type"]}',
- quality=quality,
- details=stream["url"],
- )
+ item_id = f'{details["preset_id"]}--{stream["media_type"]}'
+ radio = Radio(item_id=item_id, provider=self.id, name=name)
+ if stream["media_type"] == "aac":
+ quality = MediaQuality.LOSSY_AAC
+ elif stream["media_type"] == "ogg":
+ quality = MediaQuality.LOSSY_OGG
+ else:
+ quality = MediaQuality.LOSSY_MP3
+ radio.add_provider_id(
+ MediaItemProviderId(
+ provider=self.id,
+ item_id=item_id,
+ quality=quality,
+ details=stream["url"],
)
+ )
# image
if "image" in details:
radio.metadata["image"] = details["image"]
radio.metadata["image"] = details["logo"]
return radio
- async def _get_stream_urls(self, radio_id):
- """Return the stream urls for the given radio id."""
- radio_id = radio_id.split("--")[0]
- params = {"id": radio_id}
- res = await self._get_data("Tune.ashx", params)
- return res
-
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Get streamdetails for a radio station."""
- radio_id = item_id.split("--")[0]
- if len(item_id.split("--")) > 1:
- media_type = item_id.split("--")[1]
- else:
- media_type = ""
- stream_info = await self._get_stream_urls(radio_id)
+ prov_radio_id = item_id.split("--")[0]
+ media_type = prov_radio_id.split("--")[1]
+ stream_info = await self._get_data("Tune.ashx", {"id": prov_radio_id})
for stream in stream_info["body"]:
- if stream["media_type"] == media_type or not media_type:
+ if stream["media_type"] == media_type:
return StreamDetails(
type=StreamType.URL,
item_id=item_id,