From a5b9f5e9afa253211728a77a9a19eccd74d827aa Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Fri, 12 Apr 2024 21:56:20 +0200 Subject: [PATCH] Fix seeking in HLS streams (e.g. soundcloud) (#1221) --- music_assistant/server/controllers/streams.py | 133 +++++++------- music_assistant/server/helpers/audio.py | 173 ++++++++++++------ .../server/providers/soundcloud/__init__.py | 6 +- 3 files changed, 184 insertions(+), 128 deletions(-) diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index b07dd5e0..e28a2c31 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -710,6 +710,7 @@ class StreamsController(CoreController): streamdetails.seek_position = 0 # collect all arguments for ffmpeg filter_params = [] + extra_input_args = [] if streamdetails.target_loudness is not None: # add loudnorm filters filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-1.5:LRA=11" @@ -730,89 +731,87 @@ class StreamsController(CoreController): streamdetails, seek_position=streamdetails.seek_position, ) - elif streamdetails.stream_type == StreamType.HLS: - audio_source = get_hls_stream(self.mass, streamdetails.path, streamdetails) elif streamdetails.stream_type == StreamType.ICY: audio_source = get_icy_stream(self.mass, streamdetails.path, streamdetails) + elif streamdetails.stream_type == StreamType.HLS: + audio_source = get_hls_stream( + self.mass, streamdetails.path, streamdetails, streamdetails.seek_position + ) else: audio_source = streamdetails.path - extra_input_args = [] - if streamdetails.seek_position and streamdetails.stream_type != StreamType.CUSTOM: - extra_input_args += ["-ss", str(int(streamdetails.seek_position))] + if streamdetails.seek_position: + extra_input_args += ["-ss", str(int(streamdetails.seek_position))] logger.debug("start media stream for: %s", streamdetails.uri) bytes_sent = 0 finished = False - async with FFMpeg( - audio_input=audio_source, - input_format=streamdetails.audio_format, - output_format=pcm_format, - filter_params=filter_params, - extra_input_args=[ - *extra_input_args, - # we criple ffmpeg a bit on purpose with the filter_threads - # option so it doesn't consume all cpu when calculating loudnorm - "-filter_threads", - "2", - ], - collect_log_history=True, - logger=logger, - ) as ffmpeg_proc: - try: + try: + async with FFMpeg( + audio_input=audio_source, + input_format=streamdetails.audio_format, + output_format=pcm_format, + filter_params=filter_params, + extra_input_args=[ + *extra_input_args, + # we criple ffmpeg a bit on purpose with the filter_threads + # option so it doesn't consume all cpu when calculating loudnorm + "-filter_threads", + "2", + ], + collect_log_history=True, + logger=logger, + ) as ffmpeg_proc: async for chunk in ffmpeg_proc.iter_any(pcm_format.pcm_sample_size): bytes_sent += len(chunk) yield chunk del chunk finished = True - finally: - if finished: - await ffmpeg_proc.wait() - else: - await ffmpeg_proc.close() - - # try to determine how many seconds we've streamed - seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 - logger.debug( - "stream %s (with code %s) for %s - seconds streamed: %s", - "finished" if finished else "aborted", - ffmpeg_proc.returncode, - streamdetails.uri, - seconds_streamed, - ) - if seconds_streamed: - streamdetails.seconds_streamed = seconds_streamed - # store accurate duration - if finished and not streamdetails.seek_position and seconds_streamed: - streamdetails.duration = seconds_streamed - - # parse loudnorm data if we have that collected - if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)): - required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120 - if finished or (seconds_streamed >= required_seconds): - logger.debug( - "Loudness measurement for %s: %s", - streamdetails.uri, - loudness_details, - ) - streamdetails.loudness = loudness_details - self.mass.create_task( - self.mass.music.set_track_loudness( - streamdetails.item_id, streamdetails.provider, loudness_details - ) - ) - # report playback - if finished or seconds_streamed > 30: + finally: + if finished and not ffmpeg_proc.closed: + await asyncio.wait_for(ffmpeg_proc.wait(), 60) + elif not ffmpeg_proc.closed: + await ffmpeg_proc.close() + + # try to determine how many seconds we've streamed + seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 + logger.debug( + "stream %s (with code %s) for %s - seconds streamed: %s", + "finished" if finished else "aborted", + ffmpeg_proc.returncode, + streamdetails.uri, + seconds_streamed, + ) + streamdetails.seconds_streamed = seconds_streamed + # store accurate duration + if finished and not streamdetails.seek_position and seconds_streamed: + streamdetails.duration = seconds_streamed + + # parse loudnorm data if we have that collected + if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)): + required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120 + if finished or (seconds_streamed >= required_seconds): + logger.debug( + "Loudness measurement for %s: %s", + streamdetails.uri, + loudness_details, + ) + streamdetails.loudness = loudness_details self.mass.create_task( - self.mass.music.mark_item_played( - streamdetails.media_type, - streamdetails.item_id, - streamdetails.provider, + self.mass.music.set_track_loudness( + streamdetails.item_id, streamdetails.provider, loudness_details ) ) - if music_prov := self.mass.get_provider(streamdetails.provider): - self.mass.create_task( - music_prov.on_streamed(streamdetails, seconds_streamed) - ) + # report playback + if finished or seconds_streamed > 30: + self.mass.create_task( + self.mass.music.mark_item_played( + streamdetails.media_type, + streamdetails.item_id, + streamdetails.provider, + ) + ) + if music_prov := self.mass.get_provider(streamdetails.provider): + self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed)) def _log_request(self, request: web.Request) -> None: """Log request.""" diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 648cb3d1..2d4c0db8 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -7,6 +7,7 @@ import logging import os import re import struct +import time from collections import deque from collections.abc import AsyncGenerator from contextlib import suppress @@ -125,8 +126,6 @@ class FFMpeg(AsyncProcess): """Close/terminate the process and wait for exit.""" if self._stdin_task and not self._stdin_task.done(): self._stdin_task.cancel() - with suppress(asyncio.CancelledError): - await self._stdin_task # make sure the stdin generator is also properly closed # by propagating a cancellederror within with suppress(RuntimeError): @@ -157,20 +156,27 @@ class FFMpeg(AsyncProcess): async def _log_reader_task(self) -> None: """Read ffmpeg log from stderr.""" + decode_errors = 0 async for line in self.iter_stderr(): if self.collect_log_history: self.log_history.append(line) if "error" in line or "warning" in line: - self.logger.warning(line) + self.logger.debug(line) elif "critical" in line: - self.logger.critical(line) + self.logger.warning(line) else: self.logger.log(VERBOSE_LOG_LEVEL, line) + if "Invalid data found when processing input" in line: + decode_errors += 1 + if decode_errors >= 50: + self.logger.error(line) + await super().close(True) + # if streamdetails contenttype is unknown, try parse it from the ffmpeg log - if line.startswith("Stream #0:0: Audio: "): + if line.startswith("Stream #") and ": Audio: " in line: if self.input_format.content_type == ContentType.UNKNOWN: - content_type_raw = line.split("Stream #0:0: Audio: ")[1].split(" ")[0] + content_type_raw = line.split(": Audio: ")[1].split(" ")[0] content_type = ContentType.try_parse(content_type_raw) self.logger.info( "Detected (input) content type: %s (%s)", content_type, content_type_raw @@ -182,10 +188,17 @@ class FFMpeg(AsyncProcess): """Feed stdin with audio chunks from an AsyncGenerator.""" if TYPE_CHECKING: self.audio_input: AsyncGenerator[bytes, None] - async for chunk in self.audio_input: - await self.write(chunk) - # write EOF once we've reached the end of the input stream - await self.write_eof() + try: + async for chunk in self.audio_input: + await self.write(chunk) + # write EOF once we've reached the end of the input stream + await self.write_eof() + except Exception as err: + # make sure we dont swallow any exceptions and we bail out + # once our audio source fails. + if isinstance(err, asyncio.CancelledError): + self.logger.exception(err) + await self.close(True) async def crossfade_pcm_parts( @@ -323,6 +336,9 @@ async def get_stream_details( Do not try to request streamdetails in advance as this is expiring data. param media_item: The QueueItem for which to request the streamdetails for. """ + if seek_position and (queue_item.media_type == MediaType.RADIO or not queue_item.duration): + LOGGER.warning("seeking is not possible on duration-less streams!") + seek_position = 0 if queue_item.streamdetails and seek_position: LOGGER.debug(f"Using (pre)cached streamdetails from queue_item for {queue_item.uri}") # we already have (fresh?) streamdetails stored on the queueitem, use these. @@ -367,7 +383,6 @@ async def get_stream_details( streamdetails.stream_type = StreamType.HLS elif is_icy: streamdetails.stream_type = StreamType.ICY - # set queue_id on the streamdetails so we know what is being streamed streamdetails.queue_id = queue_item.queue_id # handle skip/fade_in details @@ -533,53 +548,47 @@ async def get_icy_stream( async def get_hls_stream( - mass: MusicAssistant, url: str, streamdetails: StreamDetails + mass: MusicAssistant, + url: str, + streamdetails: StreamDetails, + seek_position: int = 0, ) -> AsyncGenerator[bytes, None]: """Get audio stream from HTTP HLS stream.""" logger = LOGGER.getChild("hls_stream") + logger.debug("Start streaming HLS stream for url %s", url) timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60) - # fetch master playlist and select (best) child playlist - # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10 - async with mass.http_session.get(url, headers=HTTP_HEADERS, timeout=timeout) as resp: - charset = resp.charset or "utf-8" - master_m3u_data = await resp.text(charset) - substreams = parse_m3u(master_m3u_data) - if any(x for x in substreams if x.path.endswith(".ts")) or all( - x for x in substreams if (x.stream_info or x.length) - ): - # the url we got is already a substream - substream_url = url - else: - # sort substreams on best quality (highest bandwidth) - substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True) - substream = substreams[0] - substream_url = substream.path - if not substream_url.startswith("http"): - # path is relative, stitch it together - base_path = url.rsplit("/", 1)[0] - substream_url = base_path + "/" + substream.path - - logger.debug( - "Start streaming HLS stream for url %s (selected substream %s)", url, substream_url - ) - - if streamdetails.audio_format.content_type == ContentType.UNKNOWN: - streamdetails.audio_format = AudioFormat(content_type=ContentType.AAC) - - prev_chunks: deque[str] = deque(maxlen=30) + prev_chunks: deque[str] = deque(maxlen=50) has_playlist_metadata: bool | None = None has_id3_metadata: bool | None = None + is_live_stream = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration + # we simply select the best quality substream here + # if we ever want to support adaptive stream selection based on bandwidth + # we need to move the substream selection into the loop below and make it + # bandwidth aware. For now we just assume domestic high bandwidth where + # the user wants the best quality possible at all times. + substream_url = await get_hls_substream(mass, url) + seconds_skipped = 0 + empty_loops = 0 while True: + logger.log(VERBOSE_LOG_LEVEL, "start streaming chunks from substream %s", substream_url) async with mass.http_session.get( substream_url, headers=HTTP_HEADERS, timeout=timeout ) as resp: + resp.raise_for_status() charset = resp.charset or "utf-8" substream_m3u_data = await resp.text(charset) # get chunk-parts from the substream hls_chunks = parse_m3u(substream_m3u_data) + chunk_seconds = 0 + time_start = time.time() for chunk_item in hls_chunks: if chunk_item.path in prev_chunks: continue + chunk_length = int(chunk_item.length) if chunk_item.length else 6 + # try to support seeking here + if seek_position and (seconds_skipped + chunk_length) < seek_position: + seconds_skipped += chunk_length + continue chunk_item_url = chunk_item.path if not chunk_item_url.startswith("http"): # path is relative, stitch it together @@ -598,8 +607,8 @@ async def get_hls_stream( async with mass.http_session.get( chunk_item_url, headers=HTTP_HEADERS, timeout=timeout ) as resp: - async for chunk in resp.content.iter_any(): - yield chunk + yield await resp.content.read() + chunk_seconds += chunk_length # handle (optional) in-band (m3u) metadata if has_id3_metadata is not None and has_playlist_metadata: continue @@ -607,6 +616,52 @@ async def get_hls_stream( tags = await parse_tags(chunk_item_url) has_id3_metadata = tags.title and tags.title not in chunk_item.path logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata) + # end of stream reached - for non livestreams, we are ready and should return + # for livestreams we loop around to get the next playlist with chunks + if not is_live_stream: + return + # safeguard for an endless loop + # this may happen if we're simply going too fast for the live stream + # we already throttle it a bit but we may end up in a situation where something is wrong + # and we want to break out of this loop, hence this check + if chunk_seconds == 0: + empty_loops += 1 + await asyncio.sleep(1) + else: + empty_loops = 0 + if empty_loops == 50: + logger.warning("breaking out of endless loop") + break + # ensure that we're not going to fast - otherwise we get the same substream playlist + while (time.time() - time_start) < (chunk_seconds - 1): + await asyncio.sleep(0.5) + + +async def get_hls_substream( + mass: MusicAssistant, + url: str, +) -> str: + """Select the (highest quality) HLS substream for given HLS playlist/URL.""" + timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60) + # fetch master playlist and select (best) child playlist + # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10 + async with mass.http_session.get(url, headers=HTTP_HEADERS, timeout=timeout) as resp: + resp.raise_for_status() + charset = resp.charset or "utf-8" + master_m3u_data = await resp.text(charset) + substreams = parse_m3u(master_m3u_data) + if any(x for x in substreams if x.length): + # the url we got is already a substream + return url + # sort substreams on best quality (highest bandwidth) + substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True) + substream = substreams[0] + substream_url = substream.path + if not substream_url.startswith("http"): + # path is relative, stitch it together + base_path = url.rsplit("/", 1)[0] + substream_url = base_path + "/" + substream.path + return substream_url async def get_http_stream( @@ -908,7 +963,7 @@ def get_ffmpeg_args( "-nostats", "-ignore_unknown", "-protocol_whitelist", - "file,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp", + "file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp", ] # collect input args input_args = [] @@ -947,17 +1002,9 @@ def get_ffmpeg_args( "-i", input_path, ] - elif input_format.content_type in ( - ContentType.UNKNOWN, - ContentType.M4A, - ContentType.M4B, - ContentType.MP4, - ): - # let ffmpeg guess/auto detect the content type - input_args += ["-i", input_path] else: - # use explicit format identifier for all other - input_args += ["-f", input_format.content_type.value, "-i", input_path] + # let ffmpeg auto detect the content type from the metadata/headers + input_args += ["-i", input_path] # collect output args output_args = [] @@ -965,8 +1012,7 @@ def get_ffmpeg_args( # devnull stream output_args = ["-f", "null", "-"] elif output_format.content_type == ContentType.UNKNOWN: - # use wav so we at least have some headers for the rest of the chain - output_args = ["-f", "wav", output_path] + raise RuntimeError("Invalid output format specified") else: if output_format.content_type.is_pcm(): output_args += ["-acodec", output_format.content_type.name.lower()] @@ -981,18 +1027,27 @@ def get_ffmpeg_args( output_path, ] + # edge case: source file is not stereo - downmix to stereo + if input_format.channels > 2 and output_format.channels == 2: + filter_params = [ + "pan=stereo|FL=1.0*FL+0.707*FC+0.707*SL+0.707*LFE|FR=1.0*FR+0.707*FC+0.707*SR+0.707*LFE", + *filter_params, + ] + # determine if we need to do resampling if ( input_format.sample_rate != output_format.sample_rate or input_format.bit_depth != output_format.bit_depth ): # prefer resampling with libsoxr due to its high quality - resample_filter = f'aresample=resampler={"soxr" if libsoxr_support else "swr"}' + if libsoxr_support: + resample_filter = "aresample=resampler=soxr:precision=28" + else: + resample_filter = "aresample=resampler=swr" if output_format.bit_depth < input_format.bit_depth: # apply dithering when going down to 16 bits resample_filter += ":osf=s16:dither_method=triangular_hp" - if not output_format.content_type.is_pcm(): - # specify sample rate if output format is not pcm + if input_format.sample_rate != output_format.sample_rate: resample_filter += f":osr={output_format.sample_rate}" filter_params.append(resample_filter) diff --git a/music_assistant/server/providers/soundcloud/__init__.py b/music_assistant/server/providers/soundcloud/__init__.py index 663d2746..cc1b3a08 100644 --- a/music_assistant/server/providers/soundcloud/__init__.py +++ b/music_assistant/server/providers/soundcloud/__init__.py @@ -301,7 +301,7 @@ class SoundcloudMusicProvider(MusicProvider): async def get_stream_details(self, item_id: str) -> StreamDetails: """Return the content details for the given track when it will be streamed.""" - url = await self._soundcloud.get_stream_url(track_id=item_id) + url: str = await self._soundcloud.get_stream_url(track_id=item_id) return StreamDetails( provider=self.instance_id, item_id=item_id, @@ -310,7 +310,9 @@ class SoundcloudMusicProvider(MusicProvider): audio_format=AudioFormat( content_type=ContentType.UNKNOWN, ), - stream_type=StreamType.HTTP, + stream_type=StreamType.HLS + if url.startswith("https://cf-hls-media.sndcdn.com") + else StreamType.HTTP, path=url, ) -- 2.34.1