Fix stream issues (#364)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 14 Jun 2022 12:35:24 +0000 (14:35 +0200)
committerGitHub <noreply@github.com>
Tue, 14 Jun 2022 12:35:24 +0000 (14:35 +0200)
* fix stream issues

* try to survive radio reconnects

* no need for redundant logging

* only allow 30 seconds buffer ahead

music_assistant/controllers/streams.py
music_assistant/helpers/audio.py
music_assistant/models/player_queue.py

index dbefc0c5b8ea8a09c254f93f155ae8fd5860577b..817246f89e242dce64d3475709dafd14db5fd0c0 100644 (file)
@@ -4,6 +4,7 @@ from __future__ import annotations
 import asyncio
 import gc
 import urllib.parse
+from time import time
 from types import CoroutineType
 from typing import TYPE_CHECKING, AsyncGenerator, Dict, Optional
 from uuid import uuid4
@@ -16,7 +17,6 @@ from music_assistant.helpers.audio import (
     crossfade_pcm_parts,
     fadein_pcm_part,
     get_chunksize,
-    get_ffmpeg_args_for_pcm_stream,
     get_media_stream,
     get_preview_stream,
     get_stream_details,
@@ -265,11 +265,13 @@ class QueueStream:
         self.logger = self.queue.logger.getChild("stream")
         self.expected_clients = expected_clients
         self.connected_clients: Dict[str, CoroutineType[bytes]] = {}
-        self._runner_task: Optional[asyncio.Task] = None
+        self.seconds_streamed = 0
+        self.streaming_started = 0
         self.done = asyncio.Event()
         self.all_clients_connected = asyncio.Event()
         self.index_in_buffer = start_index
         self.signal_next: bool = False
+        self._runner_task: Optional[asyncio.Task] = None
         if autostart:
             self.mass.create_task(self.start())
 
@@ -313,22 +315,55 @@ class QueueStream:
 
     async def _queue_stream_runner(self) -> None:
         """Distribute audio chunks over connected client queues."""
-        ffmpeg_args = await get_ffmpeg_args_for_pcm_stream(
-            self.pcm_sample_rate,
-            self.pcm_bit_depth,
-            self.pcm_channels,
-            output_format=self.output_format,
+        # collect ffmpeg args
+        input_format = ContentType.from_bit_depth(
+            self.pcm_bit_depth, self.pcm_floating_point
         )
+        ffmpeg_args = [
+            "ffmpeg",
+            "-hide_banner",
+            "-loglevel",
+            "error",
+            "-ignore_unknown",
+            # pcm input args
+            "-f",
+            input_format.value,
+            "-ac",
+            str(self.pcm_channels),
+            "-ar",
+            str(self.pcm_sample_rate),
+            "-i",
+            "-",
+            # output args
+            "-f",
+            self.output_format.value,
+            "-compression_level",
+            "0",
+            "-",
+        ]
         # get the raw pcm bytes from the queue stream and on the fly encode to wanted format
         # send the compressed/encoded stream to the client(s).
         chunk_size = get_chunksize(self.output_format)
+        sample_size = int(
+            self.pcm_sample_rate * (self.pcm_bit_depth / 8) * self.pcm_channels
+        )
         async with AsyncProcess(ffmpeg_args, True, chunk_size) as ffmpeg_proc:
 
             async def writer():
                 """Task that sends the raw pcm audio to the ffmpeg process."""
                 async for audio_chunk in self._get_queue_stream():
                     await ffmpeg_proc.write(audio_chunk)
+                    self.seconds_streamed += len(audio_chunk) / sample_size
                     del audio_chunk
+                    # allow clients to only buffer max ~30 seconds ahead
+                    seconds_allowed = int(time() - self.streaming_started) + 30
+                    diff = self.seconds_streamed - seconds_allowed
+                    if diff > 1:
+                        self.logger.debug(
+                            "Player is buffering %s seconds ahead, slowing it down",
+                            diff,
+                        )
+                        await asyncio.sleep(10)
                 # write eof when last packet is received
                 ffmpeg_proc.write_eof()
 
@@ -336,14 +371,15 @@ class QueueStream:
 
             # wait max 5 seconds for all client(s) to connect
             try:
-                await asyncio.wait_for(self.all_clients_connected.wait(), 5)
+                await asyncio.wait_for(self.all_clients_connected.wait(), 10)
             except asyncio.exceptions.TimeoutError:
                 self.logger.warning(
-                    "Abort: client(s) did not connect within 5 seconds."
+                    "Abort: client(s) did not connect within 10 seconds."
                 )
                 self.done.set()
                 return
             self.logger.debug("%s clients connected", len(self.connected_clients))
+            self.streaming_started = time()
 
             # Read bytes from final output and send chunk to child callback.
             async for chunk in ffmpeg_proc.iterate_chunks():
@@ -492,21 +528,26 @@ class QueueStream:
             prev_chunk = None
             bytes_written = 0
             # handle incoming audio chunks
-            async for chunk in get_media_stream(
+            async for is_last_chunk, chunk in get_media_stream(
                 self.mass,
                 streamdetails,
-                pcm_fmt,
-                pcm_sample_rate=self.pcm_sample_rate,
+                pcm_fmt=pcm_fmt,
+                sample_rate=self.pcm_sample_rate,
+                channels=self.pcm_channels,
                 chunk_size=buffer_size,
                 seek_position=seek_position,
             ):
                 cur_chunk += 1
-                is_last_chunk = len(chunk) < buffer_size
 
                 # HANDLE FIRST PART OF TRACK
                 if len(chunk) == 0 and bytes_written == 0 and is_last_chunk:
                     # stream error: got empy first chunk ?!
                     self.logger.warning("Stream error on %s", queue_track.uri)
+                elif cur_chunk == 1 and is_last_chunk:
+                    # audio only has one single chunk (alert?)
+                    bytes_written += len(chunk)
+                    yield chunk
+                    del chunk
                 elif cur_chunk == 1 and last_fadeout_data:
                     prev_chunk = chunk
                     del chunk
@@ -563,7 +604,7 @@ class QueueStream:
                     # with the previous chunk and this chunk
                     # and strip off silence
                     last_part = await strip_silence(
-                        prev_chunk + chunk, pcm_fmt, self.pcm_sample_rate, True
+                        prev_chunk + chunk, pcm_fmt, self.pcm_sample_rate, reverse=True
                     )
                     if len(last_part) < buffer_size:
                         # part is too short after the strip action
index cb59054da6176f0be5a53ed499b0a6caeac16bdc..c449dda576e37b8de774f6c6f59c33c7650189cf 100644 (file)
@@ -37,28 +37,48 @@ async def crossfade_pcm_parts(
     fade_length: int,
     fmt: ContentType,
     sample_rate: int,
+    channels: int = 2,
 ) -> bytes:
     """Crossfade two chunks of pcm/raw audio using ffmpeg."""
     fadeoutfile = create_tempfile()
     async with aiofiles.open(fadeoutfile.name, "wb") as outfile:
         await outfile.write(fade_out_part)
-    # input args
-    args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
-    args += [
+    args = [
+        # generic args
+        "ffmpeg",
+        "-hide_banner",
+        "-loglevel",
+        "error",
+        # fadeout part (as file)
+        "-acodec",
+        fmt.name.lower(),
         "-f",
         fmt.value,
         "-ac",
-        "2",
+        str(channels),
         "-ar",
         str(sample_rate),
         "-i",
         fadeoutfile.name,
+        # fade_in part (stdin)
+        "-acodec",
+        fmt.name.lower(),
+        "-f",
+        fmt.value,
+        "-ac",
+        str(channels),
+        "-ar",
+        str(sample_rate),
+        "-i",
+        "-",
+        # filter args
+        "-filter_complex",
+        f"[0][1]acrossfade=d={fade_length}",
+        # output args
+        "-f",
+        fmt.value,
+        "-",
     ]
-    args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"]
-    # filter args
-    args += ["-filter_complex", f"[0][1]acrossfade=d={fade_length}"]
-    # output args
-    args += ["-f", fmt.value, "-"]
     async with AsyncProcess(args, True) as proc:
         crossfade_data, _ = await proc.communicate(fade_in_part)
         LOGGER.debug(
@@ -75,36 +95,61 @@ async def fadein_pcm_part(
     fade_length: int,
     fmt: ContentType,
     sample_rate: int,
+    channels: int = 2,
 ) -> bytes:
     """Fadein chunk of pcm/raw audio using ffmpeg."""
-    # input args
-    args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
-    args += [
+    args = [
+        # generic args
+        "ffmpeg",
+        "-hide_banner",
+        "-loglevel",
+        "error",
+        # fade_in part (stdin)
+        "-acodec",
+        fmt.name.lower(),
         "-f",
         fmt.value,
         "-ac",
-        "2",
+        str(channels),
         "-ar",
         str(sample_rate),
         "-i",
         "-",
+        # filter args
+        "-af",
+        f"afade=type=in:start_time=0:duration={fade_length}",
+        # output args
+        "-f",
+        fmt.value,
+        "-",
     ]
-    # filter args
-    args += ["-af", f"afade=type=in:start_time=0:duration={fade_length}"]
-    # output args
-    args += ["-f", fmt.value, "-"]
     async with AsyncProcess(args, True) as proc:
         result_audio, _ = await proc.communicate(pcm_audio)
         return result_audio
 
 
 async def strip_silence(
-    audio_data: bytes, fmt: ContentType, sample_rate: int, reverse=False
+    audio_data: bytes,
+    fmt: ContentType,
+    sample_rate: int,
+    channels: int = 2,
+    reverse=False,
 ) -> bytes:
     """Strip silence from (a chunk of) pcm audio."""
     # input args
     args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
-    args += ["-f", fmt.value, "-ac", "2", "-ar", str(sample_rate), "-i", "-"]
+    args += [
+        "-acodec",
+        fmt.name.lower(),
+        "-f",
+        fmt.value,
+        "-ac",
+        str(channels),
+        "-ar",
+        str(sample_rate),
+        "-i",
+        "-",
+    ]
     # filter args
     if reverse:
         args += ["-af", "areverse,silenceremove=1:0:-50dB:detection=peak,areverse"]
@@ -134,10 +179,10 @@ async def analyze_audio(mass: MusicAssistant, streamdetails: StreamDetails) -> N
     started = time()
     proc_args = [
         "ffmpeg",
-        "-f",
-        streamdetails.content_type.value,
         "-i",
         "-",
+        "-f",
+        streamdetails.content_type.value,
         "-af",
         "ebur128=framelog=verbose",
         "-f",
@@ -331,83 +376,20 @@ def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=
     return file.getvalue()
 
 
-async def get_ffmpeg_args(
-    streamdetails: StreamDetails,
-    output_format: Optional[ContentType] = None,
-    pcm_sample_rate: Optional[int] = None,
-    pcm_channels: int = 2,
-) -> List[str]:
-    """Collect all args to send to the ffmpeg process."""
-    input_format = streamdetails.content_type
-    if output_format is None:
-        output_format = input_format
-
-    ffmpeg_present, libsoxr_support = await check_audio_support()
-
-    if not ffmpeg_present:
-        raise AudioError(
-            "FFmpeg binary is missing from system."
-            "Please install ffmpeg on your OS to enable playback.",
-        )
-    # collect input args
-    input_args = [
-        "ffmpeg",
-        "-hide_banner",
-        "-loglevel",
-        "error",
-        "-ignore_unknown",
-    ]
-    if streamdetails.content_type != ContentType.UNKNOWN:
-        input_args += ["-f", input_format.value]
-    input_args += ["-i", "-"]
-    # collect output args
-    if output_format.is_pcm():
-        output_args = [
-            "-acodec",
-            output_format.name.lower(),
-            "-f",
-            output_format.value,
-            "-ac",
-            str(pcm_channels),
-            "-ar",
-            str(pcm_sample_rate),
-            "-",
-        ]
-    else:
-        output_args = ["-f", output_format.value, "-"]
-    # collect extra and filter args
-    extra_args = []
-    filter_params = []
-    if streamdetails.gain_correct:
-        filter_params.append(f"volume={streamdetails.gain_correct}dB")
-    if (
-        pcm_sample_rate is not None
-        and streamdetails.sample_rate != pcm_sample_rate
-        and libsoxr_support
-        and streamdetails.media_type == MediaType.TRACK
-    ):
-        # prefer libsoxr high quality resampler (if present) for sample rate conversions
-        filter_params.append("aresample=resampler=soxr")
-    if filter_params:
-        extra_args += ["-af", ",".join(filter_params)]
-
-    if pcm_sample_rate is not None and not output_format.is_pcm():
-        extra_args += ["-ar", str(pcm_sample_rate)]
-
-    return input_args + extra_args + output_args
-
-
 async def get_media_stream(
     mass: MusicAssistant,
     streamdetails: StreamDetails,
-    output_format: Optional[ContentType] = None,
-    pcm_sample_rate: Optional[int] = None,
+    pcm_fmt: ContentType,
+    sample_rate: int,
+    channels: int = 2,
     chunk_size: Optional[int] = None,
     seek_position: int = 0,
-) -> AsyncGenerator[bytes, None]:
-    """Get the audio stream for the given streamdetails."""
-
-    args = await get_ffmpeg_args(streamdetails, output_format, pcm_sample_rate)
+) -> AsyncGenerator[Tuple[bool, bytes], None]:
+    """Get the PCM audio stream for the given streamdetails."""
+    assert pcm_fmt.is_pcm(), "Output format must be a PCM type"
+    args = await _get_ffmpeg_args(
+        streamdetails, pcm_fmt, pcm_sample_rate=sample_rate, pcm_channels=channels
+    )
     async with AsyncProcess(
         args, enable_write=True, chunk_size=chunk_size
     ) as ffmpeg_proc:
@@ -431,9 +413,15 @@ async def get_media_stream(
         ffmpeg_proc.attach_task(writer())
 
         # yield chunks from stdout
+        # we keep 1 chunk behind to detect end of stream properly
         try:
+            prev_chunk = b""
             async for chunk in ffmpeg_proc.iterate_chunks():
-                yield chunk
+                if prev_chunk:
+                    yield (False, prev_chunk)
+                prev_chunk = chunk
+            # send last chunk
+            yield (True, prev_chunk)
         except (asyncio.CancelledError, GeneratorExit) as err:
             LOGGER.debug("media stream aborted for: %s", streamdetails.uri)
             raise err
@@ -458,35 +446,38 @@ async def get_radio_stream(
     headers = {"Icy-MetaData": "1"}
     while True:
         # in loop to reconnect on connection failure
-        LOGGER.debug("radio stream (re)connecting to: %s", url)
-        async with mass.http_session.get(url, headers=headers) as resp:
-            headers = resp.headers
-            meta_int = int(headers.get("icy-metaint", "0"))
-            # stream with ICY Metadata
-            if meta_int:
-                while True:
-                    audio_chunk = await resp.content.readexactly(meta_int)
-                    yield audio_chunk
-                    meta_byte = await resp.content.readexactly(1)
-                    meta_length = ord(meta_byte) * 16
-                    meta_data = await resp.content.readexactly(meta_length)
-                    if not meta_data:
-                        continue
-                    meta_data = meta_data.rstrip(b"\0")
-                    stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data)
-                    if not stream_title:
-                        continue
-                    stream_title = stream_title.group(1).decode()
-                    if stream_title != streamdetails.stream_title:
-                        streamdetails.stream_title = stream_title
-                        if queue := mass.players.get_player_queue(
-                            streamdetails.queue_id
-                        ):
-                            queue.signal_update()
-            # Regular HTTP stream
-            else:
-                async for chunk in resp.content.iter_any():
-                    yield chunk
+        try:
+            LOGGER.debug("radio stream (re)connecting to: %s", url)
+            async with mass.http_session.get(url, headers=headers, timeout=60) as resp:
+                headers = resp.headers
+                meta_int = int(headers.get("icy-metaint", "0"))
+                # stream with ICY Metadata
+                if meta_int:
+                    while True:
+                        audio_chunk = await resp.content.readexactly(meta_int)
+                        yield audio_chunk
+                        meta_byte = await resp.content.readexactly(1)
+                        meta_length = ord(meta_byte) * 16
+                        meta_data = await resp.content.readexactly(meta_length)
+                        if not meta_data:
+                            continue
+                        meta_data = meta_data.rstrip(b"\0")
+                        stream_title = re.search(rb"StreamTitle='([^']*)';", meta_data)
+                        if not stream_title:
+                            continue
+                        stream_title = stream_title.group(1).decode()
+                        if stream_title != streamdetails.stream_title:
+                            streamdetails.stream_title = stream_title
+                            if queue := mass.players.get_player_queue(
+                                streamdetails.queue_id
+                            ):
+                                queue.signal_update()
+                # Regular HTTP stream
+                else:
+                    async for chunk in resp.content.iter_any():
+                        yield chunk
+        except asyncio.exceptions.TimeoutError:
+            pass
 
 
 async def get_http_stream(
@@ -498,7 +489,6 @@ async def get_http_stream(
     """Get audio stream from HTTP."""
     if seek_position:
         assert streamdetails.duration, "Duration required for seek requests"
-    chunk_size = get_chunksize(streamdetails.content_type)
     # try to get filesize with a head request
     if seek_position and not streamdetails.size:
         async with mass.http_session.head(url) as resp:
@@ -518,7 +508,7 @@ async def get_http_stream(
     async with mass.http_session.get(url, headers=headers) as resp:
         is_partial = resp.status == 206
         buffer_all = seek_position and not is_partial
-        async for chunk in resp.content.iter_chunked(chunk_size):
+        async for chunk in resp.content.iter_any():
             bytes_received += len(chunk)
             if buffer_all and not skip_bytes:
                 buffer += chunk
@@ -588,32 +578,6 @@ async def check_audio_support(try_install: bool = False) -> Tuple[bool, bool]:
     return result
 
 
-async def get_ffmpeg_args_for_pcm_stream(
-    sample_rate: int,
-    bit_depth: int,
-    channels: int,
-    floating_point: bool = False,
-    output_format: ContentType = ContentType.FLAC,
-) -> List[str]:
-    """Collect args for ffmpeg when converting from raw pcm to another contenttype."""
-    input_format = ContentType.from_bit_depth(bit_depth, floating_point)
-    # collect input args
-    input_args = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-ignore_unknown"]
-    input_args += [
-        "-f",
-        input_format.value,
-        "-ac",
-        str(channels),
-        "-ar",
-        str(sample_rate),
-        "-i",
-        "-",
-    ]
-    # collect output args
-    output_args = ["-f", output_format.value, "-"]
-    return input_args + output_args
-
-
 async def get_preview_stream(
     mass: MusicAssistant,
     provider_id: str,
@@ -668,3 +632,61 @@ def get_chunksize(content_type: ContentType) -> int:
     ):
         return 64000
     return 256000
+
+
+async def _get_ffmpeg_args(
+    streamdetails: StreamDetails,
+    pcm_output_format: ContentType,
+    pcm_sample_rate: int,
+    pcm_channels: int = 2,
+) -> List[str]:
+    """Collect all args to send to the ffmpeg process."""
+    input_format = streamdetails.content_type
+    assert pcm_output_format.is_pcm(), "Output format needs to be PCM"
+
+    ffmpeg_present, libsoxr_support = await check_audio_support()
+
+    if not ffmpeg_present:
+        raise AudioError(
+            "FFmpeg binary is missing from system."
+            "Please install ffmpeg on your OS to enable playback.",
+        )
+    # collect input args
+    input_args = [
+        "ffmpeg",
+        "-hide_banner",
+        "-loglevel",
+        "error",
+        "-ignore_unknown",
+    ]
+    if streamdetails.content_type != ContentType.UNKNOWN:
+        input_args += ["-f", input_format.value]
+    input_args += ["-i", "-"]
+    # collect output args
+    output_args = [
+        "-acodec",
+        pcm_output_format.name.lower(),
+        "-f",
+        pcm_output_format.value,
+        "-ac",
+        str(pcm_channels),
+        "-ar",
+        str(pcm_sample_rate),
+        "-",
+    ]
+    # collect extra and filter args
+    extra_args = []
+    filter_params = []
+    if streamdetails.gain_correct:
+        filter_params.append(f"volume={streamdetails.gain_correct}dB")
+    if (
+        streamdetails.sample_rate != pcm_sample_rate
+        and libsoxr_support
+        and streamdetails.media_type == MediaType.TRACK
+    ):
+        # prefer libsoxr high quality resampler (if present) for sample rate conversions
+        filter_params.append("aresample=resampler=soxr")
+    if filter_params:
+        extra_args += ["-af", ",".join(filter_params)]
+
+    return input_args + extra_args + output_args
index 1eba0a1bd5c0f5d92b4a5d2057550d67b44b3e6e..65c9575c3c4c6e06bd14a0112a047b448ab864a9 100644 (file)
@@ -262,14 +262,14 @@ class PlayerQueue:
             await self.append(queue_items)
 
     async def play_alert(
-        self, uri: str, announce: bool = False, volume_adjust: int = 10
+        self, uri: str, announce: bool = False, gain_correct: int = 6
     ) -> str:
         """
         Play given uri as Alert on the queue.
 
         uri: Uri that should be played as announcement, can be Music Assistant URI or plain url.
         announce: Prepend the (TTS) alert with a small announce sound.
-        volume_adjust: Adjust the volume of the player by this percentage (relative).
+        gain_correct: Adjust the gain of the alert sound (in dB).
         """
         if self._snapshot:
             self.logger.debug("Ignore play_alert: already in progress")
@@ -295,37 +295,28 @@ class PlayerQueue:
             if uri.startswith("http") or os.path.isfile(uri):
                 # a plain url was provided
                 queue_item = QueueItem.from_url(uri, "alert")
-                queue_item.streamdetails.gain_correct = 6
+                queue_item.streamdetails.gain_correct = gain_correct
                 queue_items.append(queue_item)
             else:
                 raise MediaNotFoundError(f"Invalid uri: {uri}") from err
 
-        # append silence track, we use this to reliably detect when the alert is ready
-        silence_url = self.mass.streams.get_silence_url(600)
-        queue_item = QueueItem.from_url(silence_url, "alert")
-        queue_items.append(queue_item)
-
         # load queue with alert sound(s)
         await self.load(queue_items)
 
         # wait for the alert to finish playing
+        await self.stream.done.wait()
         alert_done = asyncio.Event()
 
         def handle_event(evt: MassEvent):
-            if (
-                self.current_item
-                and self.current_item.uri == silence_url
-                and self.elapsed_time
-            ):
+            if self.player.state != PlayerState.PLAYING:
                 alert_done.set()
 
         unsub = self.mass.subscribe(
-            handle_event, EventType.QUEUE_TIME_UPDATED, self.queue_id
+            handle_event, EventType.QUEUE_UPDATED, self.queue_id
         )
         try:
-            await asyncio.wait_for(alert_done.wait(), 120)
+            await asyncio.wait_for(alert_done.wait(), 30)
         finally:
-
             unsub()
             # restore queue
             await self.snapshot_restore()
@@ -635,7 +626,7 @@ class PlayerQueue:
 
     async def queue_stream_start(
         self, start_index: int, seek_position: int, fade_in: bool, passive: bool = False
-    ) -> None:
+    ) -> QueueStream:
         """Start the queue stream runner."""
         output_format = self._settings.stream_type
         if self.player.use_multi_stream:
@@ -660,6 +651,7 @@ class PlayerQueue:
         # execute the play command on the player(s)
         if not passive:
             await self.player.play_url(stream.url)
+        return stream
 
     def get_next_index(self, cur_index: Optional[int]) -> int:
         """Return the next index for the queue, accounting for repeat settings."""