From: Marcel van der Veldt Date: Thu, 28 Mar 2024 11:48:36 +0000 (+0100) Subject: Several tweaks for radio streams and YTM (#1187) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=80d19c0bd14ab3dceaa1dde2e900bc8c9f0a15ed;p=music-assistant-server.git Several tweaks for radio streams and YTM (#1187) * Fixes for radio streams playback and metadata * some fixes for buffering and caching streamdetails * Update audio.py --- diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 42e32921..a2d565f7 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -828,10 +828,6 @@ class StreamsController(CoreController): queue.queue_id, CONF_CROSSFADE_DURATION, 8 ) crossfade_size = int(pcm_sample_size * crossfade_duration) - buffer_size = int(pcm_sample_size * 2) # 2 seconds - if use_crossfade: - # buffer size needs to be big enough to include the crossfade part - buffer_size += crossfade_size bytes_written = 0 buffer = b"" # handle incoming audio chunks @@ -839,9 +835,22 @@ class StreamsController(CoreController): queue_track.streamdetails, pcm_format=pcm_format, # strip silence from begin/end if track is being crossfaded - strip_silence_begin=use_crossfade, + strip_silence_begin=use_crossfade and bytes_written > 0, strip_silence_end=use_crossfade, ): + # required buffer size is a bit dynamic, + # it needs to be small when the flow stream starts + seconds_streamed = int(bytes_written / pcm_sample_size) + if not use_crossfade or seconds_streamed < 5: + buffer_size = pcm_sample_size + elif seconds_streamed < 10: + buffer_size = pcm_sample_size * 2 + elif use_crossfade and seconds_streamed < 20: + buffer_size = pcm_sample_size * 5 + else: + buffer_size = crossfade_size + pcm_sample_size * 2 + # buffer size needs to be big enough to include the crossfade part + # ALWAYS APPEND CHUNK TO BUFFER buffer += chunk del chunk @@ -970,13 +979,12 @@ class StreamsController(CoreController): is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration if is_radio or streamdetails.seek_position: strip_silence_begin = False - # pcm_sample_size = chunk size = 1 second of pcm audio - chunk_size = pcm_format.pcm_sample_size - expected_chunks = int( - ((streamdetails.duration or 0) * pcm_format.pcm_sample_size) / chunk_size - ) - if expected_chunks < 10: + if is_radio or streamdetails.duration < 30: strip_silence_end = False + # pcm_sample_size = chunk size = 1 second of pcm audio + pcm_sample_size = pcm_format.pcm_sample_size + buffer_size_begin = pcm_sample_size * 2 if strip_silence_begin else pcm_sample_size + buffer_size_end = pcm_sample_size * 5 if strip_silence_end else pcm_sample_size # collect all arguments for ffmpeg filter_params = [] @@ -1032,10 +1040,21 @@ class StreamsController(CoreController): stderr_data = "" async for line in ffmpeg_proc.iter_stderr(): line = line.decode().strip() # noqa: PLW2901 + # if streamdetails contenttype is uinknown, try pars eit from the ffmpeg log output + # this has no actual usecase, other than displaying the correct codec in the UI + if ( + streamdetails.audio_format.content_type == ContentType.UNKNOWN + and line.startswith("Stream #0:0: Audio: ") + ): + streamdetails.audio_format.content_type = ContentType.try_parse( + line.split("Stream #0:0: Audio: ")[1].split(" ")[0] + ) if stderr_data or "loudnorm" in line: stderr_data += line + elif "HTTP error" in line: + logger.warning(line) elif line: - self.logger.log(VERBOSE_LOG_LEVEL, line) + logger.log(VERBOSE_LOG_LEVEL, line) del line # if we reach this point, the process is finished (finish or aborted) @@ -1094,37 +1113,41 @@ class StreamsController(CoreController): self.mass.create_task(log_reader(ffmpeg_proc, state_data)) # get pcm chunks from stdout - # we always stay one chunk behind to properly detect end of chunks + # we always stay buffer_size of bytes behind # so we can strip silence at the beginning and end of a track - prev_chunk = b"" + buffer = b"" chunk_num = 0 - async for chunk in ffmpeg_proc.iter_chunked(chunk_size): + async for chunk in ffmpeg_proc.iter_chunked(pcm_sample_size): chunk_num += 1 + required_buffer = buffer_size_begin if chunk_num < 10 else buffer_size_end + buffer += chunk + del chunk + + if len(buffer) < required_buffer: + # buffer is not full enough, move on + continue + if strip_silence_begin and chunk_num == 2: # first 2 chunks received, strip silence of beginning stripped_audio = await strip_silence( self.mass, - prev_chunk + chunk, + buffer, sample_rate=pcm_format.sample_rate, bit_depth=pcm_format.bit_depth, ) yield stripped_audio state_data["bytes_sent"] += len(stripped_audio) - prev_chunk = b"" + buffer = b"" del stripped_audio continue - if strip_silence_end and chunk_num >= (expected_chunks - 6): - # last part of the track, collect multiple chunks to strip silence later - prev_chunk += chunk - continue - - # middle part of the track, send previous chunk and collect current chunk - if prev_chunk: - yield prev_chunk - state_data["bytes_sent"] += len(prev_chunk) - # collect this chunk for next round - prev_chunk = chunk + #### OTHER: enough data in buffer, feed to output + while len(buffer) > required_buffer: + subchunk = buffer[:pcm_sample_size] + buffer = buffer[pcm_sample_size:] + state_data["bytes_sent"] += len(subchunk) + yield subchunk + del subchunk # if we did not receive any data, something went (terribly) wrong # raise here to prevent an (endless) loop elsewhere @@ -1132,23 +1155,23 @@ class StreamsController(CoreController): raise AudioError(f"stream error on {streamdetails.uri}") # all chunks received, strip silence of last part if needed and yield remaining bytes - if strip_silence_end and prev_chunk: + if strip_silence_end: final_chunk = await strip_silence( self.mass, - prev_chunk, + buffer, sample_rate=pcm_format.sample_rate, bit_depth=pcm_format.bit_depth, reverse=True, ) else: - final_chunk = prev_chunk + final_chunk = buffer # yield final chunk to output (as one big chunk) yield final_chunk state_data["bytes_sent"] += len(final_chunk) state_data["finished"].set() del final_chunk - del prev_chunk + del buffer def _log_request(self, request: web.Request) -> None: """Log request.""" diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index f76a54e2..a54491fa 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -7,6 +7,7 @@ import logging import os import re import struct +from collections import deque from io import BytesIO from time import time from typing import TYPE_CHECKING @@ -43,6 +44,7 @@ from music_assistant.server.helpers.playlists import ( fetch_playlist, parse_m3u, ) +from music_assistant.server.helpers.tags import parse_tags from .process import AsyncProcess, check_output from .util import create_tempfile @@ -199,7 +201,7 @@ async def get_stream_details( Do not try to request streamdetails in advance as this is expiring data. param media_item: The QueueItem for which to request the streamdetails for. """ - if queue_item.streamdetails and (time() < queue_item.streamdetails.expires): + if queue_item.streamdetails and (time() + 60) < queue_item.streamdetails.expires: LOGGER.debug(f"Using (pre)cached streamdetails from queue_item for {queue_item.uri}") # we already have (fresh) streamdetails stored on the queueitem, use these. # this happens for example while seeking in a track. @@ -224,23 +226,20 @@ async def get_stream_details( item_key = f"{music_prov.lookup_key}/{prov_media.item_id}" cache_key = f"cached_streamdetails_{item_key}" if cache := await mass.cache.get(cache_key): - LOGGER.debug(f"Using cached streamdetails for {item_key}") - streamdetails = StreamDetails.from_dict(cache) - break + if time() + 60 < cache["expires"]: + LOGGER.debug(f"Using cached streamdetails for {item_key}") + streamdetails = StreamDetails.from_dict(cache) # get streamdetails from provider try: streamdetails: StreamDetails = await music_prov.get_stream_details( prov_media.item_id ) - # store streamdetails in cache - expiration = streamdetails.expires - time() - if expiration > 300: - await mass.cache.set( - cache_key, streamdetails.to_dict(), expiration=expiration - 60 - ) except MusicAssistantError as err: LOGGER.warning(str(err)) else: + # store streamdetails in cache + expiration = streamdetails.expires - time() + await mass.cache.set(cache_key, streamdetails.to_dict(), expiration=expiration - 60) break else: raise MediaNotFoundError(f"Unable to retrieve streamdetails for {queue_item}") @@ -323,7 +322,9 @@ def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration= return file.getvalue() -async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool, bool]: +async def resolve_radio_stream( + mass: MusicAssistant, url: str, use_get: bool = False +) -> tuple[str, bool, bool]: """ Resolve a streaming radio URL. @@ -336,7 +337,7 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo - bool uf the URL represents a HLS stream/playlist. """ base_url = url.split("?")[0] - cache_key = f"resolved_radio_{url}" + cache_key = f"RADIO_RESOLVED_{url}" if cache := await mass.cache.get(cache_key): return cache is_hls = False @@ -344,12 +345,16 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo resolved_url = url timeout = ClientTimeout(total=0, connect=10, sock_read=5) try: - async with mass.http_session.head( - url, headers=VLC_HEADERS_ICY, allow_redirects=True, timeout=timeout + method = "GET" if use_get else "HEAD" + async with mass.http_session.request( + method, url, headers=VLC_HEADERS_ICY, allow_redirects=True, timeout=timeout ) as resp: resolved_url = str(resp.real_url) headers = resp.headers - supports_icy = int(headers.get("icy-metaint", "0")) > 0 + resp.raise_for_status() + if not resp.headers: + raise InvalidDataError("no headers found") + supports_icy = headers.get("icy-name") is not None or "Icecast" in headers.get("server", "") is_hls = headers.get("content-type") in HLS_CONTENT_TYPES if ( base_url.endswith((".m3u", ".m3u8", ".pls")) @@ -367,11 +372,14 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo is_hls = True except (ClientResponseError, InvalidDataError) as err: + if not use_get: + return await resolve_radio_stream(mass, resolved_url, True) LOGGER.warning("Error while parsing radio URL %s: %s", url, err) return (resolved_url, supports_icy, is_hls) result = (resolved_url, supports_icy, is_hls) - await mass.cache.set(cache_key, result, expiration=86400) + cache_expiration = 24 * 3600 if url == resolved_url else 600 + await mass.cache.set(cache_key, result, expiration=cache_expiration) return result @@ -426,6 +434,83 @@ async def get_icy_stream( async def get_hls_stream( mass: MusicAssistant, url: str, streamdetails: StreamDetails +) -> AsyncGenerator[bytes, None]: + """Get audio stream from HTTP HLS stream.""" + logger = LOGGER.getChild("hls_stream") + timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60) + # fetch master playlist and select (best) child playlist + # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10 + async with mass.http_session.get(url, headers=VLC_HEADERS, timeout=timeout) as resp: + charset = resp.charset or "utf-8" + master_m3u_data = await resp.text(charset) + substreams = parse_m3u(master_m3u_data) + if any(x for x in substreams if x.path.endswith(".ts")) or not all( + x for x in substreams if x.stream_info is not None + ): + # the url we got is already a substream + substream_url = url + else: + # sort substreams on best quality (highest bandwidth) + substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True) + substream = substreams[0] + substream_url = substream.path + if not substream_url.startswith("http"): + # path is relative, stitch it together + base_path = url.rsplit("/", 1)[0] + substream_url = base_path + "/" + substream.path + + logger.debug( + "Start streaming HLS stream for url %s (selected substream %s)", url, substream_url + ) + + if streamdetails.audio_format.content_type == ContentType.UNKNOWN: + streamdetails.audio_format = AudioFormat(content_type=ContentType.AAC) + + prev_chunks: deque[str] = deque(maxlen=30) + has_playlist_metadata: bool | None = None + has_id3_metadata: bool | None = None + while True: + async with mass.http_session.get( + substream_url, headers=VLC_HEADERS, timeout=timeout + ) as resp: + charset = resp.charset or "utf-8" + substream_m3u_data = await resp.text(charset) + # get chunk-parts from the substream + hls_chunks = parse_m3u(substream_m3u_data) + for chunk_item in hls_chunks: + if chunk_item.path in prev_chunks: + continue + chunk_item_url = chunk_item.path + if not chunk_item_url.startswith("http"): + # path is relative, stitch it together + base_path = substream_url.rsplit("/", 1)[0] + chunk_item_url = base_path + "/" + chunk_item.path + # handle (optional) in-playlist (timed) metadata + if has_playlist_metadata is None: + has_playlist_metadata = chunk_item.title is not None + logger.debug("Station support for in-playlist metadata: %s", has_playlist_metadata) + if has_playlist_metadata and chunk_item.title != "no desc": + # bbc (and maybe others?) set the title to 'no desc' + streamdetails.stream_title = chunk_item.title + logger.log(VERBOSE_LOG_LEVEL, "playing chunk %s", chunk_item) + # prevent that we play this chunk again if we loop through + prev_chunks.append(chunk_item.path) + async with mass.http_session.get( + chunk_item_url, headers=VLC_HEADERS, timeout=timeout + ) as resp: + async for chunk in resp.content.iter_any(): + yield chunk + # handle (optional) in-band (m3u) metadata + if has_id3_metadata is not None and has_playlist_metadata: + continue + if has_id3_metadata in (None, True): + tags = await parse_tags(chunk_item_url) + has_id3_metadata = tags.title and tags.title not in chunk_item.path + logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata) + + +async def get_hls_stream_org( + mass: MusicAssistant, url: str, streamdetails: StreamDetails ) -> AsyncGenerator[bytes, None]: """Get audio stream from HTTP HLS stream.""" timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60) @@ -818,8 +903,6 @@ def get_ffmpeg_args( "-ignore_unknown", "-protocol_whitelist", "file,http,https,tcp,tls,crypto,pipe,data,fd", - "-filter_complex_threads", - "1", ] # collect input args input_args = [] diff --git a/music_assistant/server/providers/ytmusic/__init__.py b/music_assistant/server/providers/ytmusic/__init__.py index dec528f8..e54b366e 100644 --- a/music_assistant/server/providers/ytmusic/__init__.py +++ b/music_assistant/server/providers/ytmusic/__init__.py @@ -542,6 +542,8 @@ class YoutubeMusicProvider(MusicProvider): stream_details.expires = time() + int( track_obj["streamingData"].get("expiresInSeconds") ) + else: + stream_details.expires = time() + 600 if stream_format.get("audioChannels") and str(stream_format.get("audioChannels")).isdigit(): stream_details.audio_format.channels = int(stream_format.get("audioChannels")) if stream_format.get("audioSampleRate") and stream_format.get("audioSampleRate").isdigit():