Several tweaks for radio streams and YTM (#1187)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 28 Mar 2024 11:48:36 +0000 (12:48 +0100)
committerGitHub <noreply@github.com>
Thu, 28 Mar 2024 11:48:36 +0000 (12:48 +0100)
* Fixes for radio streams playback and metadata

* some fixes for buffering and caching streamdetails

* Update audio.py

music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/providers/ytmusic/__init__.py

index 42e32921de3ac03e6030a0e9426d141a3a1f0a4d..a2d565f708ed4801dcb3951cb09f6cf9e57b189c 100644 (file)
@@ -828,10 +828,6 @@ class StreamsController(CoreController):
                 queue.queue_id, CONF_CROSSFADE_DURATION, 8
             )
             crossfade_size = int(pcm_sample_size * crossfade_duration)
-            buffer_size = int(pcm_sample_size * 2)  # 2 seconds
-            if use_crossfade:
-                # buffer size needs to be big enough to include the crossfade part
-                buffer_size += crossfade_size
             bytes_written = 0
             buffer = b""
             # handle incoming audio chunks
@@ -839,9 +835,22 @@ class StreamsController(CoreController):
                 queue_track.streamdetails,
                 pcm_format=pcm_format,
                 # strip silence from begin/end if track is being crossfaded
-                strip_silence_begin=use_crossfade,
+                strip_silence_begin=use_crossfade and bytes_written > 0,
                 strip_silence_end=use_crossfade,
             ):
+                # required buffer size is a bit dynamic,
+                # it needs to be small when the flow stream starts
+                seconds_streamed = int(bytes_written / pcm_sample_size)
+                if not use_crossfade or seconds_streamed < 5:
+                    buffer_size = pcm_sample_size
+                elif seconds_streamed < 10:
+                    buffer_size = pcm_sample_size * 2
+                elif use_crossfade and seconds_streamed < 20:
+                    buffer_size = pcm_sample_size * 5
+                else:
+                    buffer_size = crossfade_size + pcm_sample_size * 2
+                    # buffer size needs to be big enough to include the crossfade part
+
                 # ALWAYS APPEND CHUNK TO BUFFER
                 buffer += chunk
                 del chunk
@@ -970,13 +979,12 @@ class StreamsController(CoreController):
         is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
         if is_radio or streamdetails.seek_position:
             strip_silence_begin = False
-        # pcm_sample_size = chunk size = 1 second of pcm audio
-        chunk_size = pcm_format.pcm_sample_size
-        expected_chunks = int(
-            ((streamdetails.duration or 0) * pcm_format.pcm_sample_size) / chunk_size
-        )
-        if expected_chunks < 10:
+        if is_radio or streamdetails.duration < 30:
             strip_silence_end = False
+        # pcm_sample_size = chunk size = 1 second of pcm audio
+        pcm_sample_size = pcm_format.pcm_sample_size
+        buffer_size_begin = pcm_sample_size * 2 if strip_silence_begin else pcm_sample_size
+        buffer_size_end = pcm_sample_size * 5 if strip_silence_end else pcm_sample_size
 
         # collect all arguments for ffmpeg
         filter_params = []
@@ -1032,10 +1040,21 @@ class StreamsController(CoreController):
             stderr_data = ""
             async for line in ffmpeg_proc.iter_stderr():
                 line = line.decode().strip()  # noqa: PLW2901
+                # if streamdetails contenttype is uinknown, try pars eit from the ffmpeg log output
+                # this has no actual usecase, other than displaying the correct codec in the UI
+                if (
+                    streamdetails.audio_format.content_type == ContentType.UNKNOWN
+                    and line.startswith("Stream #0:0: Audio: ")
+                ):
+                    streamdetails.audio_format.content_type = ContentType.try_parse(
+                        line.split("Stream #0:0: Audio: ")[1].split(" ")[0]
+                    )
                 if stderr_data or "loudnorm" in line:
                     stderr_data += line
+                elif "HTTP error" in line:
+                    logger.warning(line)
                 elif line:
-                    self.logger.log(VERBOSE_LOG_LEVEL, line)
+                    logger.log(VERBOSE_LOG_LEVEL, line)
                 del line
 
             # if we reach this point, the process is finished (finish or aborted)
@@ -1094,37 +1113,41 @@ class StreamsController(CoreController):
             self.mass.create_task(log_reader(ffmpeg_proc, state_data))
 
             # get pcm chunks from stdout
-            # we always stay one chunk behind to properly detect end of chunks
+            # we always stay buffer_size of bytes behind
             # so we can strip silence at the beginning and end of a track
-            prev_chunk = b""
+            buffer = b""
             chunk_num = 0
-            async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
+            async for chunk in ffmpeg_proc.iter_chunked(pcm_sample_size):
                 chunk_num += 1
+                required_buffer = buffer_size_begin if chunk_num < 10 else buffer_size_end
+                buffer += chunk
+                del chunk
+
+                if len(buffer) < required_buffer:
+                    # buffer is not full enough, move on
+                    continue
+
                 if strip_silence_begin and chunk_num == 2:
                     # first 2 chunks received, strip silence of beginning
                     stripped_audio = await strip_silence(
                         self.mass,
-                        prev_chunk + chunk,
+                        buffer,
                         sample_rate=pcm_format.sample_rate,
                         bit_depth=pcm_format.bit_depth,
                     )
                     yield stripped_audio
                     state_data["bytes_sent"] += len(stripped_audio)
-                    prev_chunk = b""
+                    buffer = b""
                     del stripped_audio
                     continue
-                if strip_silence_end and chunk_num >= (expected_chunks - 6):
-                    # last part of the track, collect multiple chunks to strip silence later
-                    prev_chunk += chunk
-                    continue
-
-                # middle part of the track, send previous chunk and collect current chunk
-                if prev_chunk:
-                    yield prev_chunk
-                    state_data["bytes_sent"] += len(prev_chunk)
 
-                # collect this chunk for next round
-                prev_chunk = chunk
+                #### OTHER: enough data in buffer, feed to output
+                while len(buffer) > required_buffer:
+                    subchunk = buffer[:pcm_sample_size]
+                    buffer = buffer[pcm_sample_size:]
+                    state_data["bytes_sent"] += len(subchunk)
+                    yield subchunk
+                    del subchunk
 
             # if we did not receive any data, something went (terribly) wrong
             # raise here to prevent an (endless) loop elsewhere
@@ -1132,23 +1155,23 @@ class StreamsController(CoreController):
                 raise AudioError(f"stream error on {streamdetails.uri}")
 
             # all chunks received, strip silence of last part if needed and yield remaining bytes
-            if strip_silence_end and prev_chunk:
+            if strip_silence_end:
                 final_chunk = await strip_silence(
                     self.mass,
-                    prev_chunk,
+                    buffer,
                     sample_rate=pcm_format.sample_rate,
                     bit_depth=pcm_format.bit_depth,
                     reverse=True,
                 )
             else:
-                final_chunk = prev_chunk
+                final_chunk = buffer
 
             # yield final chunk to output (as one big chunk)
             yield final_chunk
             state_data["bytes_sent"] += len(final_chunk)
             state_data["finished"].set()
             del final_chunk
-            del prev_chunk
+            del buffer
 
     def _log_request(self, request: web.Request) -> None:
         """Log request."""
index f76a54e2c01f14a0b9bb35a6f576ed4180e512e4..a54491fa7388ab4198f1655982b79bd018a8d486 100644 (file)
@@ -7,6 +7,7 @@ import logging
 import os
 import re
 import struct
+from collections import deque
 from io import BytesIO
 from time import time
 from typing import TYPE_CHECKING
@@ -43,6 +44,7 @@ from music_assistant.server.helpers.playlists import (
     fetch_playlist,
     parse_m3u,
 )
+from music_assistant.server.helpers.tags import parse_tags
 
 from .process import AsyncProcess, check_output
 from .util import create_tempfile
@@ -199,7 +201,7 @@ async def get_stream_details(
     Do not try to request streamdetails in advance as this is expiring data.
         param media_item: The QueueItem for which to request the streamdetails for.
     """
-    if queue_item.streamdetails and (time() < queue_item.streamdetails.expires):
+    if queue_item.streamdetails and (time() + 60) < queue_item.streamdetails.expires:
         LOGGER.debug(f"Using (pre)cached streamdetails from queue_item for {queue_item.uri}")
         # we already have (fresh) streamdetails stored on the queueitem, use these.
         # this happens for example while seeking in a track.
@@ -224,23 +226,20 @@ async def get_stream_details(
             item_key = f"{music_prov.lookup_key}/{prov_media.item_id}"
             cache_key = f"cached_streamdetails_{item_key}"
             if cache := await mass.cache.get(cache_key):
-                LOGGER.debug(f"Using cached streamdetails for {item_key}")
-                streamdetails = StreamDetails.from_dict(cache)
-                break
+                if time() + 60 < cache["expires"]:
+                    LOGGER.debug(f"Using cached streamdetails for {item_key}")
+                    streamdetails = StreamDetails.from_dict(cache)
             # get streamdetails from provider
             try:
                 streamdetails: StreamDetails = await music_prov.get_stream_details(
                     prov_media.item_id
                 )
-                # store streamdetails in cache
-                expiration = streamdetails.expires - time()
-                if expiration > 300:
-                    await mass.cache.set(
-                        cache_key, streamdetails.to_dict(), expiration=expiration - 60
-                    )
             except MusicAssistantError as err:
                 LOGGER.warning(str(err))
             else:
+                # store streamdetails in cache
+                expiration = streamdetails.expires - time()
+                await mass.cache.set(cache_key, streamdetails.to_dict(), expiration=expiration - 60)
                 break
         else:
             raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}")
@@ -323,7 +322,9 @@ def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=
     return file.getvalue()
 
 
-async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool, bool]:
+async def resolve_radio_stream(
+    mass: MusicAssistant, url: str, use_get: bool = False
+) -> tuple[str, bool, bool]:
     """
     Resolve a streaming radio URL.
 
@@ -336,7 +337,7 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo
     - bool uf the URL represents a HLS stream/playlist.
     """
     base_url = url.split("?")[0]
-    cache_key = f"resolved_radio_{url}"
+    cache_key = f"RADIO_RESOLVED_{url}"
     if cache := await mass.cache.get(cache_key):
         return cache
     is_hls = False
@@ -344,12 +345,16 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo
     resolved_url = url
     timeout = ClientTimeout(total=0, connect=10, sock_read=5)
     try:
-        async with mass.http_session.head(
-            url, headers=VLC_HEADERS_ICY, allow_redirects=True, timeout=timeout
+        method = "GET" if use_get else "HEAD"
+        async with mass.http_session.request(
+            method, url, headers=VLC_HEADERS_ICY, allow_redirects=True, timeout=timeout
         ) as resp:
             resolved_url = str(resp.real_url)
             headers = resp.headers
-        supports_icy = int(headers.get("icy-metaint", "0")) > 0
+            resp.raise_for_status()
+            if not resp.headers:
+                raise InvalidDataError("no headers found")
+        supports_icy = headers.get("icy-name") is not None or "Icecast" in headers.get("server", "")
         is_hls = headers.get("content-type") in HLS_CONTENT_TYPES
         if (
             base_url.endswith((".m3u", ".m3u8", ".pls"))
@@ -367,11 +372,14 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo
                 is_hls = True
 
     except (ClientResponseError, InvalidDataError) as err:
+        if not use_get:
+            return await resolve_radio_stream(mass, resolved_url, True)
         LOGGER.warning("Error while parsing radio URL %s: %s", url, err)
         return (resolved_url, supports_icy, is_hls)
 
     result = (resolved_url, supports_icy, is_hls)
-    await mass.cache.set(cache_key, result, expiration=86400)
+    cache_expiration = 24 * 3600 if url == resolved_url else 600
+    await mass.cache.set(cache_key, result, expiration=cache_expiration)
     return result
 
 
@@ -426,6 +434,83 @@ async def get_icy_stream(
 
 async def get_hls_stream(
     mass: MusicAssistant, url: str, streamdetails: StreamDetails
+) -> AsyncGenerator[bytes, None]:
+    """Get audio stream from HTTP HLS stream."""
+    logger = LOGGER.getChild("hls_stream")
+    timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
+    # fetch master playlist and select (best) child playlist
+    # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
+    async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp:
+        charset = resp.charset or "utf-8"
+        master_m3u_data = await resp.text(charset)
+    substreams = parse_m3u(master_m3u_data)
+    if any(x for x in substreams if x.path.endswith(".ts")) or not all(
+        x for x in substreams if x.stream_info is not None
+    ):
+        # the url we got is already a substream
+        substream_url = url
+    else:
+        # sort substreams on best quality (highest bandwidth)
+        substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True)
+        substream = substreams[0]
+        substream_url = substream.path
+        if not substream_url.startswith("http"):
+            # path is relative, stitch it together
+            base_path = url.rsplit("/", 1)[0]
+            substream_url = base_path + "/" + substream.path
+
+    logger.debug(
+        "Start streaming HLS stream for url %s (selected substream %s)", url, substream_url
+    )
+
+    if streamdetails.audio_format.content_type == ContentType.UNKNOWN:
+        streamdetails.audio_format = AudioFormat(content_type=ContentType.AAC)
+
+    prev_chunks: deque[str] = deque(maxlen=30)
+    has_playlist_metadata: bool | None = None
+    has_id3_metadata: bool | None = None
+    while True:
+        async with mass.http_session.get(
+            substream_url, headers=VLC_HEADERS, timeout=timeout
+        ) as resp:
+            charset = resp.charset or "utf-8"
+            substream_m3u_data = await resp.text(charset)
+        # get chunk-parts from the substream
+        hls_chunks = parse_m3u(substream_m3u_data)
+        for chunk_item in hls_chunks:
+            if chunk_item.path in prev_chunks:
+                continue
+            chunk_item_url = chunk_item.path
+            if not chunk_item_url.startswith("http"):
+                # path is relative, stitch it together
+                base_path = substream_url.rsplit("/", 1)[0]
+                chunk_item_url = base_path + "/" + chunk_item.path
+            # handle (optional) in-playlist (timed) metadata
+            if has_playlist_metadata is None:
+                has_playlist_metadata = chunk_item.title is not None
+                logger.debug("Station support for in-playlist metadata: %s", has_playlist_metadata)
+            if has_playlist_metadata and chunk_item.title != "no desc":
+                # bbc (and maybe others?) set the title to 'no desc'
+                streamdetails.stream_title = chunk_item.title
+            logger.log(VERBOSE_LOG_LEVEL, "playing chunk %s", chunk_item)
+            # prevent that we play this chunk again if we loop through
+            prev_chunks.append(chunk_item.path)
+            async with mass.http_session.get(
+                chunk_item_url, headers=VLC_HEADERS, timeout=timeout
+            ) as resp:
+                async for chunk in resp.content.iter_any():
+                    yield chunk
+            # handle (optional) in-band (m3u) metadata
+            if has_id3_metadata is not None and has_playlist_metadata:
+                continue
+            if has_id3_metadata in (None, True):
+                tags = await parse_tags(chunk_item_url)
+                has_id3_metadata = tags.title and tags.title not in chunk_item.path
+                logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata)
+
+
+async def get_hls_stream_org(
+    mass: MusicAssistant, url: str, streamdetails: StreamDetails
 ) -> AsyncGenerator[bytes, None]:
     """Get audio stream from HTTP HLS stream."""
     timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
@@ -818,8 +903,6 @@ def get_ffmpeg_args(
         "-ignore_unknown",
         "-protocol_whitelist",
         "file,http,https,tcp,tls,crypto,pipe,data,fd",
-        "-filter_complex_threads",
-        "1",
     ]
     # collect input args
     input_args = []
index dec528f85d4d6c86030d9900a00387e9184ac931..e54b366e977cec26f67754712e8e8694de0454c6 100644 (file)
@@ -542,6 +542,8 @@ class YoutubeMusicProvider(MusicProvider):
             stream_details.expires = time() + int(
                 track_obj["streamingData"].get("expiresInSeconds")
             )
+        else:
+            stream_details.expires = time() + 600
         if stream_format.get("audioChannels") and str(stream_format.get("audioChannels")).isdigit():
             stream_details.audio_format.channels = int(stream_format.get("audioChannels"))
         if stream_format.get("audioSampleRate") and stream_format.get("audioSampleRate").isdigit():