--- /dev/null
+"""Helpers for parsing playlists."""
+from __future__ import annotations
+
+import asyncio
+import logging
+from typing import TYPE_CHECKING, List
+
+import aiohttp
+
+from music_assistant.models.errors import InvalidDataError
+
+if TYPE_CHECKING:
+ from music_assistant.mass import MusicAssistant
+
+
+LOGGER = logging.getLogger(__name__)
+
+
+async def parse_m3u(m3u_data: str) -> List[str]:
+ """Parse (only) filenames/urls from m3u playlist file."""
+ m3u_lines = m3u_data.splitlines()
+ lines = []
+ for line in m3u_lines:
+ line = line.strip()
+ if line.startswith("#"):
+ # ignore metadata
+ continue
+ if len(line) != 0:
+ # Get uri/path from all other, non-blank lines
+ lines.append(line)
+
+ return lines
+
+
+async def parse_pls(pls_data: str) -> List[str]:
+ """Parse (only) filenames/urls from pls playlist file."""
+ pls_lines = pls_data.splitlines()
+ lines = []
+ for line in pls_lines:
+ line = line.strip()
+ if not line.startswith("File"):
+ # ignore metadata lines
+ continue
+ if "=" in line:
+ # Get uri/path from all other, non-blank lines
+ lines.append(line.split("=")[1])
+
+ return lines
+
+
+async def fetch_playlist(mass: MusicAssistant, url: str) -> List[str]:
+ """Parse an online m3u or pls playlist."""
+
+ try:
+ async with mass.http_session.get(url, timeout=5) as resp:
+ charset = resp.charset or "utf-8"
+ try:
+ playlist_data = (await resp.content.read(64 * 1024)).decode(charset)
+ except ValueError as err:
+ raise InvalidDataError(f"Could not decode playlist {url}") from err
+ except asyncio.TimeoutError as err:
+ raise InvalidDataError(f"Timeout while fetching playlist {url}") from err
+ except aiohttp.client_exceptions.ClientError as err:
+ raise InvalidDataError(f"Error while fetching playlist {url}") from err
+
+ if url.endswith(".m3u") or url.endswith(".m3u8"):
+ playlist = await parse_m3u(playlist_data)
+ else:
+ playlist = await parse_pls(playlist_data)
+
+ if not playlist:
+ raise InvalidDataError(f"Empty playlist {url}")
+
+ return playlist
"""Tune-In musicprovider support for MusicAssistant."""
from __future__ import annotations
+from time import time
from typing import AsyncGenerator, List, Optional
from asyncio_throttle import Throttler
from music_assistant.helpers.audio import get_radio_stream
+from music_assistant.helpers.playlists import fetch_playlist
from music_assistant.helpers.util import create_sort_name
from music_assistant.models.enums import ProviderType
from music_assistant.models.errors import LoginFailed, MediaNotFoundError
item_id, media_type = item_id.split("--", 1)
stream_info = await self.__get_data("Tune.ashx", id=item_id)
for stream in stream_info["body"]:
- if stream["media_type"] == media_type:
- return StreamDetails(
- provider=self.type,
- item_id=item_id,
- content_type=ContentType(stream["media_type"]),
- media_type=MediaType.RADIO,
- data=stream["url"],
- )
+
+ if stream["media_type"] != media_type:
+ continue
+ # check if the radio stream is not a playlist
+ url = stream["url"]
+ if url.endswith("m3u8") or url.endswith("m3u") or url.endswith("pls"):
+ playlist = await fetch_playlist(self.mass, url)
+ url = playlist[0]
+ return StreamDetails(
+ provider=self.type,
+ item_id=item_id,
+ content_type=ContentType(stream["media_type"]),
+ media_type=MediaType.RADIO,
+ data=url,
+ expires=time() + 24 * 3600,
+ )
raise MediaNotFoundError(f"Unable to retrieve stream details for {item_id}")
async def get_audio_stream(
get_http_stream,
get_radio_stream,
)
+from music_assistant.helpers.playlists import fetch_playlist
from music_assistant.helpers.tags import AudioTags, parse_tags
from music_assistant.models.config import MusicProviderConfig
from music_assistant.models.enums import (
self, item_id_or_url: str, force_refresh: bool = False
) -> Tuple[str, str, AudioTags]:
"""Retrieve (cached) mediainfo for url."""
- if "?" in item_id_or_url or "&" in item_id_or_url:
+ # check if the radio stream is not a playlist
+ if (
+ item_id_or_url.endswith("m3u8")
+ or item_id_or_url.endswith("m3u")
+ or item_id_or_url.endswith("pls")
+ ):
+ playlist = await fetch_playlist(self.mass, item_id_or_url)
+ url = playlist[0]
+ item_id = item_id_or_url
+ self._full_url[item_id] = url
+ elif "?" in item_id_or_url or "&" in item_id_or_url:
# store the 'real' full url to be picked up later
# this makes sure that we're not storing any temporary data like auth keys etc
# a request for an url mediaitem always passes here first before streamdetails
sample_rate=media_info.sample_rate,
bit_depth=media_info.bits_per_sample,
direct=None if is_radio else url,
+ data=url,
)
async def get_audio_stream(
):
yield chunk
else:
- # regular stream url (without icy meta and reconnect)
+ # regular stream url (without icy meta)
async for chunk in get_http_stream(
self.mass, streamdetails.data, streamdetails, seek_position
):