)
for stream in stream_info["body"]:
yield await self._parse_radio(item, stream, folder)
+ elif item_type == "link" and item.get("item") == "url":
+ # custom url
+ yield await self._parse_radio(item)
elif item_type == "link":
# stations are in sublevel (new style)
if sublevel := await self.__get_data(item["URL"], render="json"):
async def get_radio(self, prov_radio_id: str) -> Radio:
"""Get radio station details."""
- prov_radio_id, media_type = prov_radio_id.split("--", 1)
- params = {"c": "composite", "detail": "listing", "id": prov_radio_id}
- result = await self.__get_data("Describe.ashx", **params)
- if result and result.get("body") and result["body"][0].get("children"):
- item = result["body"][0]["children"][0]
- stream_info = await self.__get_data("Tune.ashx", id=prov_radio_id)
- for stream in stream_info["body"]:
- if stream["media_type"] != media_type:
- continue
- return await self._parse_radio(item, stream)
+ if not prov_radio_id.startswith("http"):
+ prov_radio_id, media_type = prov_radio_id.split("--", 1)
+ params = {"c": "composite", "detail": "listing", "id": prov_radio_id}
+ result = await self.__get_data("Describe.ashx", **params)
+ if result and result.get("body") and result["body"][0].get("children"):
+ item = result["body"][0]["children"][0]
+ stream_info = await self.__get_data("Tune.ashx", id=prov_radio_id)
+ for stream in stream_info["body"]:
+ if stream["media_type"] != media_type:
+ continue
+ return await self._parse_radio(item, stream)
+ # fallback - e.g. for handle custom urls ...
+ async for radio in self.get_library_radios():
+ if radio.item_id == prov_radio_id:
+ return radio
return None
async def _parse_radio(
- self, details: dict, stream: dict, folder: Optional[str] = None
+ self, details: dict, stream: Optional[dict] = None, folder: Optional[str] = None
) -> Radio:
"""Parse Radio object from json obj returned from api."""
if "name" in details:
if " | " in name:
name = name.split(" | ")[1]
name = name.split(" (")[0]
- item_id = f'{details["preset_id"]}--{stream["media_type"]}'
- radio = Radio(item_id=item_id, provider=self.type, name=name)
- if stream["media_type"] == "aac":
- quality = MediaQuality.LOSSY_AAC
- elif stream["media_type"] == "ogg":
- quality = MediaQuality.LOSSY_OGG
+
+ if stream is None:
+ # custom url (no stream object present)
+ url = details["URL"]
+ item_id = url
+ # TODO: parse header of stream for audio quality details?
+ quality = MediaQuality.UNKNOWN
else:
- quality = MediaQuality.LOSSY_MP3
+ url = stream["url"]
+ item_id = f'{details["preset_id"]}--{stream["media_type"]}'
+ if stream["media_type"] == "aac":
+ quality = MediaQuality.LOSSY_AAC
+ elif stream["media_type"] == "ogg":
+ quality = MediaQuality.LOSSY_OGG
+ else:
+ quality = MediaQuality.LOSSY_MP3
+
+ radio = Radio(item_id=item_id, provider=self.type, name=name)
radio.add_provider_id(
MediaItemProviderId(
item_id=item_id,
prov_type=self.type,
prov_id=self.id,
quality=quality,
- details=stream["url"],
+ details=url,
)
)
# preset number is used for sorting (not present at stream time)
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Get streamdetails for a radio station."""
+ if item_id.startswith("http"):
+ # custom url
+ return StreamDetails(
+ provider=self.type,
+ item_id=item_id,
+ content_type=ContentType.UNKNOWN,
+ media_type=MediaType.RADIO,
+ data=item_id,
+ )
item_id, media_type = item_id.split("--", 1)
stream_info = await self.__get_data("Tune.ashx", id=item_id)
for stream in stream_info["body"]:
item_id=item_id,
content_type=ContentType(stream["media_type"]),
media_type=MediaType.RADIO,
- data=stream,
+ data=stream["url"],
)
raise MediaNotFoundError(f"Unable to retrieve stream details for {item_id}")
) -> AsyncGenerator[bytes, None]:
"""Return the audio stream for the provider item."""
async for chunk in get_radio_stream(
- self.mass, streamdetails.data["url"], streamdetails
+ self.mass, streamdetails.data, streamdetails
):
yield chunk