Introduce a HLS radio streams parser (for the BBC radio streams) (#1633)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 30 Aug 2024 20:08:01 +0000 (22:08 +0200)
committerGitHub <noreply@github.com>
Fri, 30 Aug 2024 20:08:01 +0000 (22:08 +0200)
Introduce a HLS radio streams parser

music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py

index 1ef2e07d5917fbf2b9781f990ce046417cea6961..39d51b3e898dc42eeb00b9859a658c8764d54a6f 100644 (file)
@@ -52,8 +52,9 @@ from music_assistant.server.helpers.audio import (
     crossfade_pcm_parts,
     get_chunksize,
     get_ffmpeg_stream,
+    get_hls_radio_stream,
     get_hls_substream,
-    get_icy_stream,
+    get_icy_radio_stream,
     get_player_filter_params,
     get_silence,
     get_stream_details,
@@ -817,19 +818,16 @@ class StreamsController(CoreController):
                 seek_position=streamdetails.seek_position,
             )
         elif streamdetails.stream_type == StreamType.ICY:
-            audio_source = get_icy_stream(self.mass, streamdetails.path, streamdetails)
+            audio_source = get_icy_radio_stream(self.mass, streamdetails.path, streamdetails)
         elif streamdetails.stream_type == StreamType.HLS:
-            # we simply select the best quality substream here
-            # if we ever want to support adaptive stream selection based on bandwidth
-            # we need to move the substream selection into the loop below and make it
-            # bandwidth aware. For now we just assume domestic high bandwidth where
-            # the user wants the best quality possible at all times.
-            substream = await get_hls_substream(self.mass, streamdetails.path)
-            audio_source = substream.path
             if streamdetails.media_type == MediaType.RADIO:
-                # ffmpeg sometimes has trouble with HLS radio streams stopping
-                # abruptly for no reason so this is a workaround to keep the stream alive
-                extra_input_args += ["-reconnect_at_eof", "1"]
+                # Especially the BBC streams struggle when they're played directly
+                # with ffmpeg, so we use our own HLS stream parser/logic
+                audio_source = get_hls_radio_stream(self.mass, streamdetails.path, streamdetails)
+            else:
+                # normal tracks we just let ffmpeg deal with it
+                substream = await get_hls_substream(self.mass, streamdetails.path)
+                audio_source = substream.path
         elif streamdetails.stream_type == StreamType.ENCRYPTED_HTTP:
             audio_source = streamdetails.path
             extra_input_args += ["-decryption_key", streamdetails.decryption_key]
index 905683adb201691a34f7dcecd251206aef00d319..13b4417cb6ebcff7e663a3db1db1d441ebe636c7 100644 (file)
@@ -7,6 +7,7 @@ import logging
 import os
 import re
 import struct
+import time
 from collections import deque
 from collections.abc import AsyncGenerator
 from contextlib import suppress
@@ -51,6 +52,7 @@ from music_assistant.server.helpers.playlists import (
     fetch_playlist,
     parse_m3u,
 )
+from music_assistant.server.helpers.tags import parse_tags
 from music_assistant.server.helpers.throttle_retry import BYPASS_THROTTLER
 
 from .process import AsyncProcess, check_output, communicate
@@ -522,7 +524,7 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo
     return result
 
 
-async def get_icy_stream(
+async def get_icy_radio_stream(
     mass: MusicAssistant, url: str, streamdetails: StreamDetails
 ) -> AsyncGenerator[bytes, None]:
     """Get (radio) audio stream from HTTP, including ICY metadata retrieval."""
@@ -559,6 +561,96 @@ async def get_icy_stream(
                 streamdetails.stream_title = cleaned_stream_title
 
 
+async def get_hls_radio_stream(
+    mass: MusicAssistant,
+    url: str,
+    streamdetails: StreamDetails,
+) -> AsyncGenerator[bytes, None]:
+    """Get radio audio stream from HTTP HLS playlist."""
+    logger = LOGGER.getChild("hls_stream")
+    logger.debug("Start streaming HLS stream for url %s", url)
+    timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
+    prev_chunks: deque[str] = deque(maxlen=50)
+    has_playlist_metadata: bool | None = None
+    has_id3_metadata: bool | None = None
+    # we simply select the best quality substream here
+    # if we ever want to support adaptive stream selection based on bandwidth
+    # we need to move the substream selection into the loop below and make it
+    # bandwidth aware. For now we just assume domestic high bandwidth where
+    # the user wants the best quality possible at all times.
+    playlist_item = await get_hls_substream(mass, url)
+    substream_url = playlist_item.path
+    empty_loops = 0
+    while True:
+        logger.log(VERBOSE_LOG_LEVEL, "start streaming chunks from substream %s", substream_url)
+        async with mass.http_session.get(
+            substream_url, headers=HTTP_HEADERS, timeout=timeout
+        ) as resp:
+            resp.raise_for_status()
+            charset = resp.charset or "utf-8"
+            substream_m3u_data = await resp.text(charset)
+        # get chunk-parts from the substream
+        hls_chunks = parse_m3u(substream_m3u_data)
+        chunk_seconds = 0
+        time_start = time.time()
+        for chunk_item in hls_chunks:
+            if chunk_item.path in prev_chunks:
+                continue
+            chunk_length = int(chunk_item.length) if chunk_item.length else 6
+            chunk_item_url = chunk_item.path
+            if not chunk_item_url.startswith("http"):
+                # path is relative, stitch it together
+                base_path = substream_url.rsplit("/", 1)[0]
+                chunk_item_url = base_path + "/" + chunk_item.path
+            # handle (optional) in-playlist (timed) metadata
+            if has_playlist_metadata is None:
+                has_playlist_metadata = chunk_item.title not in (None, "")
+                logger.debug("Station support for in-playlist metadata: %s", has_playlist_metadata)
+            if has_playlist_metadata and chunk_item.title != "no desc":
+                # bbc (and maybe others?) set the title to 'no desc'
+                cleaned_stream_title = clean_stream_title(chunk_item.title)
+                if cleaned_stream_title != streamdetails.stream_title:
+                    logger.log(
+                        VERBOSE_LOG_LEVEL, "HLS Radio streamtitle original: %s", chunk_item.title
+                    )
+                    logger.log(
+                        VERBOSE_LOG_LEVEL, "HLS Radio streamtitle cleaned: %s", cleaned_stream_title
+                    )
+                    streamdetails.stream_title = cleaned_stream_title
+            logger.log(VERBOSE_LOG_LEVEL, "playing chunk %s", chunk_item)
+            # prevent that we play this chunk again if we loop through
+            prev_chunks.append(chunk_item.path)
+            async with mass.http_session.get(
+                chunk_item_url, headers=HTTP_HEADERS, timeout=timeout
+            ) as resp:
+                yield await resp.content.read()
+            chunk_seconds += chunk_length
+            # handle (optional) in-band (m3u) metadata
+            if has_id3_metadata is not None and has_playlist_metadata:
+                continue
+            if has_id3_metadata in (None, True):
+                tags = await parse_tags(chunk_item_url)
+                has_id3_metadata = tags.title and tags.title not in chunk_item.path
+                logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata)
+
+        # end of playlist reached - we loop around to get the next playlist with chunks
+        # safeguard for an endless loop
+        # this may happen if we're simply going too fast for the live stream
+        # we already throttle it a bit but we may end up in a situation where something is wrong
+        # and we want to break out of this loop, hence this check
+        if chunk_seconds == 0:
+            empty_loops += 1
+            await asyncio.sleep(1)
+        else:
+            empty_loops = 0
+        if empty_loops == 50:
+            logger.warning("breaking out of endless loop")
+            break
+        # ensure that we're not going to fast - otherwise we get the same substream playlist
+        while (time.time() - time_start) < (chunk_seconds - 1):
+            await asyncio.sleep(0.5)
+
+
 async def get_hls_substream(
     mass: MusicAssistant,
     url: str,