Fix parsing of HLS (sub)streams (#1727)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 19 Oct 2024 18:51:07 +0000 (20:51 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 19 Oct 2024 20:31:52 +0000 (22:31 +0200)
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/ffmpeg.py
music_assistant/server/helpers/playlists.py
music_assistant/server/providers/apple_music/__init__.py

index bcb67231853a058d4a52345f61b8d68b7c665c6c..23fa5876f3cc09dc729b32104d078a4ab89bf477 100644 (file)
@@ -8,7 +8,6 @@ import os
 import re
 import struct
 import time
-from collections import deque
 from collections.abc import AsyncGenerator
 from io import BytesIO
 from typing import TYPE_CHECKING
@@ -44,7 +43,6 @@ from music_assistant.constants import (
 from .ffmpeg import FFMpeg, get_ffmpeg_stream
 from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
 from .process import AsyncProcess, check_output, communicate
-from .tags import parse_tags
 from .throttle_retry import BYPASS_THROTTLER
 from .util import TimedAsyncGenerator, create_tempfile, detect_charset
 
@@ -294,7 +292,7 @@ async def get_media_stream(
     try:
         await ffmpeg_proc.start()
         async for chunk in TimedAsyncGenerator(
-            ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), 60
+            ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), 300
         ):
             # for radio streams we just yield all chunks directly
             if streamdetails.media_type == MediaType.RADIO:
@@ -580,10 +578,6 @@ async def get_hls_radio_stream(
     """Get radio audio stream from HTTP HLS playlist."""
     logger = LOGGER.getChild("hls_stream")
     logger.debug("Start streaming HLS stream for url %s", url)
-    timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
-    prev_chunks: deque[str] = deque(maxlen=50)
-    has_playlist_metadata: bool | None = None
-    has_id3_metadata: bool | None = None
     # we simply select the best quality substream here
     # if we ever want to support adaptive stream selection based on bandwidth
     # we need to move the substream selection into the loop below and make it
@@ -591,76 +585,30 @@ async def get_hls_radio_stream(
     # the user wants the best quality possible at all times.
     playlist_item = await get_hls_substream(mass, url)
     substream_url = playlist_item.path
-    empty_loops = 0
-    while True:
+    loops = 50 if streamdetails.media_type != MediaType.RADIO else 1
+    while loops:
         logger.log(VERBOSE_LOG_LEVEL, "start streaming chunks from substream %s", substream_url)
-        async with mass.http_session.get(
-            substream_url, headers=HTTP_HEADERS, timeout=timeout
-        ) as resp:
-            resp.raise_for_status()
-            raw_data = await resp.read()
-            encoding = resp.charset or await detect_charset(raw_data)
-            substream_m3u_data = raw_data.decode(encoding)
-        # get chunk-parts from the substream
-        hls_chunks = parse_m3u(substream_m3u_data)
-        chunk_seconds = 0
-        time_start = time.time()
-        for chunk_item in hls_chunks:
-            if chunk_item.path in prev_chunks:
-                continue
-            chunk_length = int(chunk_item.length) if chunk_item.length else 6
-            chunk_item_url = chunk_item.path
-            if not chunk_item_url.startswith("http"):
-                # path is relative, stitch it together
-                base_path = substream_url.rsplit("/", 1)[0]
-                chunk_item_url = base_path + "/" + chunk_item.path
-            # handle (optional) in-playlist (timed) metadata
-            if has_playlist_metadata is None:
-                has_playlist_metadata = chunk_item.title not in (None, "")
-                logger.debug("Station support for in-playlist metadata: %s", has_playlist_metadata)
-            if has_playlist_metadata and chunk_item.title != "no desc":
-                # bbc (and maybe others?) set the title to 'no desc'
-                cleaned_stream_title = clean_stream_title(chunk_item.title)
-                if cleaned_stream_title != streamdetails.stream_title:
-                    logger.log(
-                        VERBOSE_LOG_LEVEL, "HLS Radio streamtitle original: %s", chunk_item.title
-                    )
-                    logger.log(
-                        VERBOSE_LOG_LEVEL, "HLS Radio streamtitle cleaned: %s", cleaned_stream_title
-                    )
-                    streamdetails.stream_title = cleaned_stream_title
-            logger.log(VERBOSE_LOG_LEVEL, "playing chunk %s", chunk_item)
-            # prevent that we play this chunk again if we loop through
-            prev_chunks.append(chunk_item.path)
-            async with mass.http_session.get(
-                chunk_item_url, headers=HTTP_HEADERS, timeout=timeout
-            ) as resp:
-                yield await resp.content.read()
-            chunk_seconds += chunk_length
-            # handle (optional) in-band (m3u) metadata
-            if has_id3_metadata is not None and has_playlist_metadata:
+        # We simply let ffmpeg deal with parsing the HLS playlist and stichting chunks together.
+        # However we do not feed the playlist URL to ffmpeg directly to give us the possibility
+        # to monitor the stream title and other metadata for radio streams in the future.
+        # Also, we've seen cases where ffmpeg sometimes chokes in a stream and aborts, which is not
+        # very useful for radio streams which you want to simply go on forever, so we need to loop
+        # and restart ffmpeg in case of an error.
+        input_format = AudioFormat(content_type=ContentType.UNKNOWN)
+        audio_format_detected = False
+        async for chunk in get_ffmpeg_stream(
+            audio_input=substream_url,
+            input_format=input_format,
+            output_format=AudioFormat(content_type=ContentType.WAV),
+        ):
+            yield chunk
+            if audio_format_detected:
                 continue
-            if has_id3_metadata in (None, True):
-                tags = await parse_tags(chunk_item_url)
-                has_id3_metadata = tags.title and tags.title not in chunk_item.path
-                logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata)
-
-        # end of playlist reached - we loop around to get the next playlist with chunks
-        # safeguard for an endless loop
-        # this may happen if we're simply going too fast for the live stream
-        # we already throttle it a bit but we may end up in a situation where something is wrong
-        # and we want to break out of this loop, hence this check
-        if chunk_seconds == 0:
-            empty_loops += 1
-            await asyncio.sleep(1)
-        else:
-            empty_loops = 0
-        if empty_loops == 50:
-            logger.warning("breaking out of endless loop")
-            break
-        # ensure that we're not going to fast - otherwise we get the same substream playlist
-        while (time.time() - time_start) < (chunk_seconds - 1):
-            await asyncio.sleep(0.5)
+            if input_format.content_type not in (ContentType.UNKNOWN, ContentType.WAV):
+                # we need to determine the audio format from the first chunk
+                streamdetails.audio_format = input_format
+                audio_format_detected = True
+        loops -= 1
 
 
 async def get_hls_substream(
@@ -679,15 +627,21 @@ async def get_hls_substream(
         raw_data = await resp.read()
         encoding = resp.charset or await detect_charset(raw_data)
         master_m3u_data = raw_data.decode(encoding)
-    if not allow_encrypted and "EXT-X-KEY:METHOD=AES-128" in master_m3u_data:
-        # for now we don't support encrypted HLS streams
+    if not allow_encrypted and "EXT-X-KEY:METHOD=" in master_m3u_data:
+        # for now we do not (yet) support encrypted HLS streams
         raise InvalidDataError("HLS stream is encrypted, not supported")
     substreams = parse_m3u(master_m3u_data)
-    if any(x for x in substreams if x.length or x.key):
-        # this is already a substream!
-        return PlaylistItem(
-            path=url,
-        )
+    # There is a chance that we did not get a master playlist with subplaylists
+    # but just a single master/sub playlist with the actual audio stream(s)
+    # so we need to detect if the playlist child's contain audio streams or
+    # sub-playlists.
+    if any(
+        x
+        for x in substreams
+        if (x.length or x.path.endswith((".mp4", ".aac")))
+        and not x.path.endswith((".m3u", ".m3u8"))
+    ):
+        return PlaylistItem(path=url, key=substreams[0].key)
     # sort substreams on best quality (highest bandwidth) when available
     if any(x for x in substreams if x.stream_info):
         substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True)
index de0b4e75e13f7f7280e28e516c13a03f3563358c..74b1c127ac0c5bfe1db894cbc6fcd3025ef69d10 100644 (file)
@@ -122,7 +122,7 @@ class FFMpeg(AsyncProcess):
         generator_exhausted = False
         audio_received = False
         try:
-            async for chunk in TimedAsyncGenerator(self.audio_input, 30):
+            async for chunk in TimedAsyncGenerator(self.audio_input, 300):
                 audio_received = True
                 await self.write(chunk)
             generator_exhausted = True
@@ -169,7 +169,7 @@ async def get_ffmpeg_stream(
     ) as ffmpeg_proc:
         # read final chunks from stdout
         iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
-        async for chunk in TimedAsyncGenerator(iterator, 60):
+        async for chunk in iterator:
             yield chunk
 
 
index d7e780ccbe1409ffae9472b68fff0906a74d108d..e3cef70234e89828c7304480fae54e8aa32dc48a 100644 (file)
@@ -145,7 +145,9 @@ def parse_pls(pls_data: str) -> list[PlaylistItem]:
     return playlist
 
 
-async def fetch_playlist(mass: MusicAssistant, url: str) -> list[PlaylistItem]:
+async def fetch_playlist(
+    mass: MusicAssistant, url: str, raise_on_hls: bool = True
+) -> list[PlaylistItem]:
     """Parse an online m3u or pls playlist."""
     try:
         async with mass.http_session.get(url, allow_redirects=True, timeout=5) as resp:
@@ -164,8 +166,10 @@ async def fetch_playlist(mass: MusicAssistant, url: str) -> list[PlaylistItem]:
         msg = f"Error while fetching playlist {url}"
         raise InvalidDataError(msg) from err
 
-    if "#EXT-X-VERSION:" in playlist_data or "#EXT-X-STREAM-INF:" in playlist_data:
-        raise IsHLSPlaylist(encrypted="#EXT-X-KEY:" in playlist_data)
+    if raise_on_hls and "#EXT-X-VERSION:" in playlist_data or "#EXT-X-STREAM-INF:" in playlist_data:
+        exc = IsHLSPlaylist()
+        exc.encrypted = "#EXT-X-KEY:" in playlist_data
+        raise exc
 
     if url.endswith((".m3u", ".m3u8")):
         playlist = parse_m3u(playlist_data)
index 79863152c024aa896c5cd093c60a3021e33e5920..8ae48b40f818f44557bb2e6aa0b1f9a8ee1488d9 100644 (file)
@@ -39,7 +39,7 @@ from music_assistant.common.models.media_items import (
 from music_assistant.common.models.streamdetails import StreamDetails
 from music_assistant.constants import CONF_PASSWORD
 from music_assistant.server.helpers.app_vars import app_var
-from music_assistant.server.helpers.audio import get_hls_substream
+from music_assistant.server.helpers.playlists import fetch_playlist
 from music_assistant.server.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
 from music_assistant.server.models.music_provider import MusicProvider
 
@@ -721,8 +721,14 @@ class AppleMusicProvider(MusicProvider):
         ctrp256_urls = [asset["URL"] for asset in stream_assets if asset["flavor"] == "28:ctrp256"]
         if len(ctrp256_urls) == 0:
             raise MediaNotFoundError("No ctrp256 URL found for song.")
-        playlist_item = await get_hls_substream(self.mass, ctrp256_urls[0])
-        track_url = playlist_item.path
+        playlist_url = ctrp256_urls[0]
+        playlist_items = await fetch_playlist(self.mass, ctrp256_urls[0], raise_on_hls=False)
+        # Apple returns a HLS (substream) playlist but instead of chunks,
+        # each item is just the whole file. So we simply grab the first playlist item.
+        playlist_item = playlist_items[0]
+        # path is relative, stitch it together
+        base_path = playlist_url.rsplit("/", 1)[0]
+        track_url = base_path + "/" + playlist_items[0].path
         key = playlist_item.key
         return (track_url, key)