Fix seeking in HLS streams (e.g. soundcloud) (#1221)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 12 Apr 2024 19:56:20 +0000 (21:56 +0200)
committerGitHub <noreply@github.com>
Fri, 12 Apr 2024 19:56:20 +0000 (21:56 +0200)
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/providers/soundcloud/__init__.py

index b07dd5e0743327dfaddd7f40a5b58947bb013641..e28a2c31176d4a64b17bd14cd82c202deefb8329 100644 (file)
@@ -710,6 +710,7 @@ class StreamsController(CoreController):
             streamdetails.seek_position = 0
         # collect all arguments for ffmpeg
         filter_params = []
+        extra_input_args = []
         if streamdetails.target_loudness is not None:
             # add loudnorm filters
             filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-1.5:LRA=11"
@@ -730,89 +731,87 @@ class StreamsController(CoreController):
                 streamdetails,
                 seek_position=streamdetails.seek_position,
             )
-        elif streamdetails.stream_type == StreamType.HLS:
-            audio_source = get_hls_stream(self.mass, streamdetails.path, streamdetails)
         elif streamdetails.stream_type == StreamType.ICY:
             audio_source = get_icy_stream(self.mass, streamdetails.path, streamdetails)
+        elif streamdetails.stream_type == StreamType.HLS:
+            audio_source = get_hls_stream(
+                self.mass, streamdetails.path, streamdetails, streamdetails.seek_position
+            )
         else:
             audio_source = streamdetails.path
-        extra_input_args = []
-        if streamdetails.seek_position and streamdetails.stream_type != StreamType.CUSTOM:
-            extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
+            if streamdetails.seek_position:
+                extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
 
         logger.debug("start media stream for: %s", streamdetails.uri)
         bytes_sent = 0
         finished = False
-        async with FFMpeg(
-            audio_input=audio_source,
-            input_format=streamdetails.audio_format,
-            output_format=pcm_format,
-            filter_params=filter_params,
-            extra_input_args=[
-                *extra_input_args,
-                # we criple ffmpeg a bit on purpose with the filter_threads
-                # option so it doesn't consume all cpu when calculating loudnorm
-                "-filter_threads",
-                "2",
-            ],
-            collect_log_history=True,
-            logger=logger,
-        ) as ffmpeg_proc:
-            try:
+        try:
+            async with FFMpeg(
+                audio_input=audio_source,
+                input_format=streamdetails.audio_format,
+                output_format=pcm_format,
+                filter_params=filter_params,
+                extra_input_args=[
+                    *extra_input_args,
+                    # we criple ffmpeg a bit on purpose with the filter_threads
+                    # option so it doesn't consume all cpu when calculating loudnorm
+                    "-filter_threads",
+                    "2",
+                ],
+                collect_log_history=True,
+                logger=logger,
+            ) as ffmpeg_proc:
                 async for chunk in ffmpeg_proc.iter_any(pcm_format.pcm_sample_size):
                     bytes_sent += len(chunk)
                     yield chunk
                     del chunk
                 finished = True
-            finally:
-                if finished:
-                    await ffmpeg_proc.wait()
-                else:
-                    await ffmpeg_proc.close()
-
-                # try to determine how many seconds we've streamed
-                seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
-                logger.debug(
-                    "stream %s (with code %s) for %s - seconds streamed: %s",
-                    "finished" if finished else "aborted",
-                    ffmpeg_proc.returncode,
-                    streamdetails.uri,
-                    seconds_streamed,
-                )
-                if seconds_streamed:
-                    streamdetails.seconds_streamed = seconds_streamed
-                # store accurate duration
-                if finished and not streamdetails.seek_position and seconds_streamed:
-                    streamdetails.duration = seconds_streamed
-
-                # parse loudnorm data if we have that collected
-                if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
-                    required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
-                    if finished or (seconds_streamed >= required_seconds):
-                        logger.debug(
-                            "Loudness measurement for %s: %s",
-                            streamdetails.uri,
-                            loudness_details,
-                        )
-                        streamdetails.loudness = loudness_details
-                        self.mass.create_task(
-                            self.mass.music.set_track_loudness(
-                                streamdetails.item_id, streamdetails.provider, loudness_details
-                            )
-                        )
-                # report playback
-                if finished or seconds_streamed > 30:
+        finally:
+            if finished and not ffmpeg_proc.closed:
+                await asyncio.wait_for(ffmpeg_proc.wait(), 60)
+            elif not ffmpeg_proc.closed:
+                await ffmpeg_proc.close()
+
+            # try to determine how many seconds we've streamed
+            seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
+            logger.debug(
+                "stream %s (with code %s) for %s - seconds streamed: %s",
+                "finished" if finished else "aborted",
+                ffmpeg_proc.returncode,
+                streamdetails.uri,
+                seconds_streamed,
+            )
+            streamdetails.seconds_streamed = seconds_streamed
+            # store accurate duration
+            if finished and not streamdetails.seek_position and seconds_streamed:
+                streamdetails.duration = seconds_streamed
+
+            # parse loudnorm data if we have that collected
+            if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
+                required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
+                if finished or (seconds_streamed >= required_seconds):
+                    logger.debug(
+                        "Loudness measurement for %s: %s",
+                        streamdetails.uri,
+                        loudness_details,
+                    )
+                    streamdetails.loudness = loudness_details
                     self.mass.create_task(
-                        self.mass.music.mark_item_played(
-                            streamdetails.media_type,
-                            streamdetails.item_id,
-                            streamdetails.provider,
+                        self.mass.music.set_track_loudness(
+                            streamdetails.item_id, streamdetails.provider, loudness_details
                         )
                     )
-                    if music_prov := self.mass.get_provider(streamdetails.provider):
-                        self.mass.create_task(
-                            music_prov.on_streamed(streamdetails, seconds_streamed)
-                        )
+            # report playback
+            if finished or seconds_streamed > 30:
+                self.mass.create_task(
+                    self.mass.music.mark_item_played(
+                        streamdetails.media_type,
+                        streamdetails.item_id,
+                        streamdetails.provider,
+                    )
+                )
+                if music_prov := self.mass.get_provider(streamdetails.provider):
+                    self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
 
     def _log_request(self, request: web.Request) -> None:
         """Log request."""
index 648cb3d176d7438b2776aa6a8b9848789c37a2d4..2d4c0db8f0c5516ea1bbce84067102638e4f27ba 100644 (file)
@@ -7,6 +7,7 @@ import logging
 import os
 import re
 import struct
+import time
 from collections import deque
 from collections.abc import AsyncGenerator
 from contextlib import suppress
@@ -125,8 +126,6 @@ class FFMpeg(AsyncProcess):
         """Close/terminate the process and wait for exit."""
         if self._stdin_task and not self._stdin_task.done():
             self._stdin_task.cancel()
-            with suppress(asyncio.CancelledError):
-                await self._stdin_task
             # make sure the stdin generator is also properly closed
             # by propagating a cancellederror within
             with suppress(RuntimeError):
@@ -157,20 +156,27 @@ class FFMpeg(AsyncProcess):
 
     async def _log_reader_task(self) -> None:
         """Read ffmpeg log from stderr."""
+        decode_errors = 0
         async for line in self.iter_stderr():
             if self.collect_log_history:
                 self.log_history.append(line)
             if "error" in line or "warning" in line:
-                self.logger.warning(line)
+                self.logger.debug(line)
             elif "critical" in line:
-                self.logger.critical(line)
+                self.logger.warning(line)
             else:
                 self.logger.log(VERBOSE_LOG_LEVEL, line)
 
+            if "Invalid data found when processing input" in line:
+                decode_errors += 1
+            if decode_errors >= 50:
+                self.logger.error(line)
+                await super().close(True)
+
             # if streamdetails contenttype is unknown, try parse it from the ffmpeg log
-            if line.startswith("Stream #0:0: Audio: "):
+            if line.startswith("Stream #") and ": Audio: " in line:
                 if self.input_format.content_type == ContentType.UNKNOWN:
-                    content_type_raw = line.split("Stream #0:0: Audio: ")[1].split(" ")[0]
+                    content_type_raw = line.split(": Audio: ")[1].split(" ")[0]
                     content_type = ContentType.try_parse(content_type_raw)
                     self.logger.info(
                         "Detected (input) content type: %s (%s)", content_type, content_type_raw
@@ -182,10 +188,17 @@ class FFMpeg(AsyncProcess):
         """Feed stdin with audio chunks from an AsyncGenerator."""
         if TYPE_CHECKING:
             self.audio_input: AsyncGenerator[bytes, None]
-        async for chunk in self.audio_input:
-            await self.write(chunk)
-        # write EOF once we've reached the end of the input stream
-        await self.write_eof()
+        try:
+            async for chunk in self.audio_input:
+                await self.write(chunk)
+            # write EOF once we've reached the end of the input stream
+            await self.write_eof()
+        except Exception as err:
+            # make sure we dont swallow any exceptions and we bail out
+            # once our audio source fails.
+            if isinstance(err, asyncio.CancelledError):
+                self.logger.exception(err)
+                await self.close(True)
 
 
 async def crossfade_pcm_parts(
@@ -323,6 +336,9 @@ async def get_stream_details(
     Do not try to request streamdetails in advance as this is expiring data.
         param media_item: The QueueItem for which to request the streamdetails for.
     """
+    if seek_position and (queue_item.media_type == MediaType.RADIO or not queue_item.duration):
+        LOGGER.warning("seeking is not possible on duration-less streams!")
+        seek_position = 0
     if queue_item.streamdetails and seek_position:
         LOGGER.debug(f"Using (pre)cached streamdetails from queue_item for {queue_item.uri}")
         # we already have (fresh?) streamdetails stored on the queueitem, use these.
@@ -367,7 +383,6 @@ async def get_stream_details(
                 streamdetails.stream_type = StreamType.HLS
             elif is_icy:
                 streamdetails.stream_type = StreamType.ICY
-
     # set queue_id on the streamdetails so we know what is being streamed
     streamdetails.queue_id = queue_item.queue_id
     # handle skip/fade_in details
@@ -533,53 +548,47 @@ async def get_icy_stream(
 
 
 async def get_hls_stream(
-    mass: MusicAssistant, url: str, streamdetails: StreamDetails
+    mass: MusicAssistant,
+    url: str,
+    streamdetails: StreamDetails,
+    seek_position: int = 0,
 ) -> AsyncGenerator[bytes, None]:
     """Get audio stream from HTTP HLS stream."""
     logger = LOGGER.getChild("hls_stream")
+    logger.debug("Start streaming HLS stream for url %s", url)
     timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
-    # fetch master playlist and select (best) child playlist
-    # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
-    async with mass.http_session.get(url, headers=HTTP_HEADERS, timeout=timeout) as resp:
-        charset = resp.charset or "utf-8"
-        master_m3u_data = await resp.text(charset)
-    substreams = parse_m3u(master_m3u_data)
-    if any(x for x in substreams if x.path.endswith(".ts")) or all(
-        x for x in substreams if (x.stream_info or x.length)
-    ):
-        # the url we got is already a substream
-        substream_url = url
-    else:
-        # sort substreams on best quality (highest bandwidth)
-        substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True)
-        substream = substreams[0]
-        substream_url = substream.path
-        if not substream_url.startswith("http"):
-            # path is relative, stitch it together
-            base_path = url.rsplit("/", 1)[0]
-            substream_url = base_path + "/" + substream.path
-
-    logger.debug(
-        "Start streaming HLS stream for url %s (selected substream %s)", url, substream_url
-    )
-
-    if streamdetails.audio_format.content_type == ContentType.UNKNOWN:
-        streamdetails.audio_format = AudioFormat(content_type=ContentType.AAC)
-
-    prev_chunks: deque[str] = deque(maxlen=30)
+    prev_chunks: deque[str] = deque(maxlen=50)
     has_playlist_metadata: bool | None = None
     has_id3_metadata: bool | None = None
+    is_live_stream = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
+    # we simply select the best quality substream here
+    # if we ever want to support adaptive stream selection based on bandwidth
+    # we need to move the substream selection into the loop below and make it
+    # bandwidth aware. For now we just assume domestic high bandwidth where
+    # the user wants the best quality possible at all times.
+    substream_url = await get_hls_substream(mass, url)
+    seconds_skipped = 0
+    empty_loops = 0
     while True:
+        logger.log(VERBOSE_LOG_LEVEL, "start streaming chunks from substream %s", substream_url)
         async with mass.http_session.get(
             substream_url, headers=HTTP_HEADERS, timeout=timeout
         ) as resp:
+            resp.raise_for_status()
             charset = resp.charset or "utf-8"
             substream_m3u_data = await resp.text(charset)
         # get chunk-parts from the substream
         hls_chunks = parse_m3u(substream_m3u_data)
+        chunk_seconds = 0
+        time_start = time.time()
         for chunk_item in hls_chunks:
             if chunk_item.path in prev_chunks:
                 continue
+            chunk_length = int(chunk_item.length) if chunk_item.length else 6
+            # try to support seeking here
+            if seek_position and (seconds_skipped + chunk_length) < seek_position:
+                seconds_skipped += chunk_length
+                continue
             chunk_item_url = chunk_item.path
             if not chunk_item_url.startswith("http"):
                 # path is relative, stitch it together
@@ -598,8 +607,8 @@ async def get_hls_stream(
             async with mass.http_session.get(
                 chunk_item_url, headers=HTTP_HEADERS, timeout=timeout
             ) as resp:
-                async for chunk in resp.content.iter_any():
-                    yield chunk
+                yield await resp.content.read()
+            chunk_seconds += chunk_length
             # handle (optional) in-band (m3u) metadata
             if has_id3_metadata is not None and has_playlist_metadata:
                 continue
@@ -607,6 +616,52 @@ async def get_hls_stream(
                 tags = await parse_tags(chunk_item_url)
                 has_id3_metadata = tags.title and tags.title not in chunk_item.path
                 logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata)
+        # end of stream reached - for non livestreams, we are ready and should return
+        # for livestreams we loop around to get the next playlist with chunks
+        if not is_live_stream:
+            return
+        # safeguard for an endless loop
+        # this may happen if we're simply going too fast for the live stream
+        # we already throttle it a bit but we may end up in a situation where something is wrong
+        # and we want to break out of this loop, hence this check
+        if chunk_seconds == 0:
+            empty_loops += 1
+            await asyncio.sleep(1)
+        else:
+            empty_loops = 0
+        if empty_loops == 50:
+            logger.warning("breaking out of endless loop")
+            break
+        # ensure that we're not going to fast - otherwise we get the same substream playlist
+        while (time.time() - time_start) < (chunk_seconds - 1):
+            await asyncio.sleep(0.5)
+
+
+async def get_hls_substream(
+    mass: MusicAssistant,
+    url: str,
+) -> str:
+    """Select the (highest quality) HLS substream for given HLS playlist/URL."""
+    timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
+    # fetch master playlist and select (best) child playlist
+    # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
+    async with mass.http_session.get(url, headers=HTTP_HEADERS, timeout=timeout) as resp:
+        resp.raise_for_status()
+        charset = resp.charset or "utf-8"
+        master_m3u_data = await resp.text(charset)
+    substreams = parse_m3u(master_m3u_data)
+    if any(x for x in substreams if x.length):
+        # the url we got is already a substream
+        return url
+    # sort substreams on best quality (highest bandwidth)
+    substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True)
+    substream = substreams[0]
+    substream_url = substream.path
+    if not substream_url.startswith("http"):
+        # path is relative, stitch it together
+        base_path = url.rsplit("/", 1)[0]
+        substream_url = base_path + "/" + substream.path
+    return substream_url
 
 
 async def get_http_stream(
@@ -908,7 +963,7 @@ def get_ffmpeg_args(
         "-nostats",
         "-ignore_unknown",
         "-protocol_whitelist",
-        "file,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp",
+        "file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp",
     ]
     # collect input args
     input_args = []
@@ -947,17 +1002,9 @@ def get_ffmpeg_args(
             "-i",
             input_path,
         ]
-    elif input_format.content_type in (
-        ContentType.UNKNOWN,
-        ContentType.M4A,
-        ContentType.M4B,
-        ContentType.MP4,
-    ):
-        # let ffmpeg guess/auto detect the content type
-        input_args += ["-i", input_path]
     else:
-        # use explicit format identifier for all other
-        input_args += ["-f", input_format.content_type.value, "-i", input_path]
+        # let ffmpeg auto detect the content type from the metadata/headers
+        input_args += ["-i", input_path]
 
     # collect output args
     output_args = []
@@ -965,8 +1012,7 @@ def get_ffmpeg_args(
         # devnull stream
         output_args = ["-f", "null", "-"]
     elif output_format.content_type == ContentType.UNKNOWN:
-        # use wav so we at least have some headers for the rest of the chain
-        output_args = ["-f", "wav", output_path]
+        raise RuntimeError("Invalid output format specified")
     else:
         if output_format.content_type.is_pcm():
             output_args += ["-acodec", output_format.content_type.name.lower()]
@@ -981,18 +1027,27 @@ def get_ffmpeg_args(
             output_path,
         ]
 
+    # edge case: source file is not stereo - downmix to stereo
+    if input_format.channels > 2 and output_format.channels == 2:
+        filter_params = [
+            "pan=stereo|FL=1.0*FL+0.707*FC+0.707*SL+0.707*LFE|FR=1.0*FR+0.707*FC+0.707*SR+0.707*LFE",
+            *filter_params,
+        ]
+
     # determine if we need to do resampling
     if (
         input_format.sample_rate != output_format.sample_rate
         or input_format.bit_depth != output_format.bit_depth
     ):
         # prefer resampling with libsoxr due to its high quality
-        resample_filter = f'aresample=resampler={"soxr" if libsoxr_support else "swr"}'
+        if libsoxr_support:
+            resample_filter = "aresample=resampler=soxr:precision=28"
+        else:
+            resample_filter = "aresample=resampler=swr"
         if output_format.bit_depth < input_format.bit_depth:
             # apply dithering when going down to 16 bits
             resample_filter += ":osf=s16:dither_method=triangular_hp"
-        if not output_format.content_type.is_pcm():
-            # specify sample rate if output format is not pcm
+        if input_format.sample_rate != output_format.sample_rate:
             resample_filter += f":osr={output_format.sample_rate}"
         filter_params.append(resample_filter)
 
index 663d27460bf0cc23e57c6fa5a54aa59a0290e4d8..cc1b3a08e537253cc55a0cd0f769bf016618632d 100644 (file)
@@ -301,7 +301,7 @@ class SoundcloudMusicProvider(MusicProvider):
 
     async def get_stream_details(self, item_id: str) -> StreamDetails:
         """Return the content details for the given track when it will be streamed."""
-        url = await self._soundcloud.get_stream_url(track_id=item_id)
+        url: str = await self._soundcloud.get_stream_url(track_id=item_id)
         return StreamDetails(
             provider=self.instance_id,
             item_id=item_id,
@@ -310,7 +310,9 @@ class SoundcloudMusicProvider(MusicProvider):
             audio_format=AudioFormat(
                 content_type=ContentType.UNKNOWN,
             ),
-            stream_type=StreamType.HTTP,
+            stream_type=StreamType.HLS
+            if url.startswith("https://cf-hls-media.sndcdn.com")
+            else StreamType.HTTP,
             path=url,
         )