From 0e7ec4017f362a16283d739bc73a52e15a54caed Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Fri, 30 Aug 2024 22:08:01 +0200 Subject: [PATCH] Introduce a HLS radio streams parser (for the BBC radio streams) (#1633) Introduce a HLS radio streams parser --- music_assistant/server/controllers/streams.py | 22 ++--- music_assistant/server/helpers/audio.py | 94 ++++++++++++++++++- 2 files changed, 103 insertions(+), 13 deletions(-) diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 1ef2e07d..39d51b3e 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -52,8 +52,9 @@ from music_assistant.server.helpers.audio import ( crossfade_pcm_parts, get_chunksize, get_ffmpeg_stream, + get_hls_radio_stream, get_hls_substream, - get_icy_stream, + get_icy_radio_stream, get_player_filter_params, get_silence, get_stream_details, @@ -817,19 +818,16 @@ class StreamsController(CoreController): seek_position=streamdetails.seek_position, ) elif streamdetails.stream_type == StreamType.ICY: - audio_source = get_icy_stream(self.mass, streamdetails.path, streamdetails) + audio_source = get_icy_radio_stream(self.mass, streamdetails.path, streamdetails) elif streamdetails.stream_type == StreamType.HLS: - # we simply select the best quality substream here - # if we ever want to support adaptive stream selection based on bandwidth - # we need to move the substream selection into the loop below and make it - # bandwidth aware. For now we just assume domestic high bandwidth where - # the user wants the best quality possible at all times. - substream = await get_hls_substream(self.mass, streamdetails.path) - audio_source = substream.path if streamdetails.media_type == MediaType.RADIO: - # ffmpeg sometimes has trouble with HLS radio streams stopping - # abruptly for no reason so this is a workaround to keep the stream alive - extra_input_args += ["-reconnect_at_eof", "1"] + # Especially the BBC streams struggle when they're played directly + # with ffmpeg, so we use our own HLS stream parser/logic + audio_source = get_hls_radio_stream(self.mass, streamdetails.path, streamdetails) + else: + # normal tracks we just let ffmpeg deal with it + substream = await get_hls_substream(self.mass, streamdetails.path) + audio_source = substream.path elif streamdetails.stream_type == StreamType.ENCRYPTED_HTTP: audio_source = streamdetails.path extra_input_args += ["-decryption_key", streamdetails.decryption_key] diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 905683ad..13b4417c 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -7,6 +7,7 @@ import logging import os import re import struct +import time from collections import deque from collections.abc import AsyncGenerator from contextlib import suppress @@ -51,6 +52,7 @@ from music_assistant.server.helpers.playlists import ( fetch_playlist, parse_m3u, ) +from music_assistant.server.helpers.tags import parse_tags from music_assistant.server.helpers.throttle_retry import BYPASS_THROTTLER from .process import AsyncProcess, check_output, communicate @@ -522,7 +524,7 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo return result -async def get_icy_stream( +async def get_icy_radio_stream( mass: MusicAssistant, url: str, streamdetails: StreamDetails ) -> AsyncGenerator[bytes, None]: """Get (radio) audio stream from HTTP, including ICY metadata retrieval.""" @@ -559,6 +561,96 @@ async def get_icy_stream( streamdetails.stream_title = cleaned_stream_title +async def get_hls_radio_stream( + mass: MusicAssistant, + url: str, + streamdetails: StreamDetails, +) -> AsyncGenerator[bytes, None]: + """Get radio audio stream from HTTP HLS playlist.""" + logger = LOGGER.getChild("hls_stream") + logger.debug("Start streaming HLS stream for url %s", url) + timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60) + prev_chunks: deque[str] = deque(maxlen=50) + has_playlist_metadata: bool | None = None + has_id3_metadata: bool | None = None + # we simply select the best quality substream here + # if we ever want to support adaptive stream selection based on bandwidth + # we need to move the substream selection into the loop below and make it + # bandwidth aware. For now we just assume domestic high bandwidth where + # the user wants the best quality possible at all times. + playlist_item = await get_hls_substream(mass, url) + substream_url = playlist_item.path + empty_loops = 0 + while True: + logger.log(VERBOSE_LOG_LEVEL, "start streaming chunks from substream %s", substream_url) + async with mass.http_session.get( + substream_url, headers=HTTP_HEADERS, timeout=timeout + ) as resp: + resp.raise_for_status() + charset = resp.charset or "utf-8" + substream_m3u_data = await resp.text(charset) + # get chunk-parts from the substream + hls_chunks = parse_m3u(substream_m3u_data) + chunk_seconds = 0 + time_start = time.time() + for chunk_item in hls_chunks: + if chunk_item.path in prev_chunks: + continue + chunk_length = int(chunk_item.length) if chunk_item.length else 6 + chunk_item_url = chunk_item.path + if not chunk_item_url.startswith("http"): + # path is relative, stitch it together + base_path = substream_url.rsplit("/", 1)[0] + chunk_item_url = base_path + "/" + chunk_item.path + # handle (optional) in-playlist (timed) metadata + if has_playlist_metadata is None: + has_playlist_metadata = chunk_item.title not in (None, "") + logger.debug("Station support for in-playlist metadata: %s", has_playlist_metadata) + if has_playlist_metadata and chunk_item.title != "no desc": + # bbc (and maybe others?) set the title to 'no desc' + cleaned_stream_title = clean_stream_title(chunk_item.title) + if cleaned_stream_title != streamdetails.stream_title: + logger.log( + VERBOSE_LOG_LEVEL, "HLS Radio streamtitle original: %s", chunk_item.title + ) + logger.log( + VERBOSE_LOG_LEVEL, "HLS Radio streamtitle cleaned: %s", cleaned_stream_title + ) + streamdetails.stream_title = cleaned_stream_title + logger.log(VERBOSE_LOG_LEVEL, "playing chunk %s", chunk_item) + # prevent that we play this chunk again if we loop through + prev_chunks.append(chunk_item.path) + async with mass.http_session.get( + chunk_item_url, headers=HTTP_HEADERS, timeout=timeout + ) as resp: + yield await resp.content.read() + chunk_seconds += chunk_length + # handle (optional) in-band (m3u) metadata + if has_id3_metadata is not None and has_playlist_metadata: + continue + if has_id3_metadata in (None, True): + tags = await parse_tags(chunk_item_url) + has_id3_metadata = tags.title and tags.title not in chunk_item.path + logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata) + + # end of playlist reached - we loop around to get the next playlist with chunks + # safeguard for an endless loop + # this may happen if we're simply going too fast for the live stream + # we already throttle it a bit but we may end up in a situation where something is wrong + # and we want to break out of this loop, hence this check + if chunk_seconds == 0: + empty_loops += 1 + await asyncio.sleep(1) + else: + empty_loops = 0 + if empty_loops == 50: + logger.warning("breaking out of endless loop") + break + # ensure that we're not going to fast - otherwise we get the same substream playlist + while (time.time() - time_start) < (chunk_seconds - 1): + await asyncio.sleep(0.5) + + async def get_hls_substream( mass: MusicAssistant, url: str, -- 2.34.1