Allow more aggressive buffering by players that require it (#405)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 10 Jul 2022 21:11:22 +0000 (23:11 +0200)
committerGitHub <noreply@github.com>
Sun, 10 Jul 2022 21:11:22 +0000 (23:11 +0200)
Allow more agressive buffering

For example cast players (especially with Hires audio) want to buffer ahead, allow this without breaking the scenario where the audio source itself is throttled (streaming providers like YT Music)

music_assistant/controllers/streams.py
music_assistant/helpers/audio.py
music_assistant/helpers/process.py
music_assistant/music_providers/spotify.py

index 80f1eb537b89fd5b56dcaa04c2d9e9ea2dcd7c88..15fa89b8f77c1ececa795efdc7e065432b275441 100644 (file)
@@ -182,7 +182,7 @@ class StreamsController:
             "icy-name": "Streaming from Music Assistant",
             "icy-pub": "0",
             "Cache-Control": "no-cache",
-            "icy-metaint": str(queue_stream.chunk_size),
+            "icy-metaint": str(queue_stream.output_chunk_size),
             "contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000",
         }
 
@@ -256,22 +256,22 @@ class StreamsController:
             pcm_sample_rate = 41000
             pcm_bit_depth = 16
             pcm_channels = 2
-            pcm_resample = True
+            allow_resample = True
         elif queue.settings.crossfade_mode == CrossFadeMode.ALWAYS:
             pcm_sample_rate = min(96000, queue.settings.max_sample_rate)
             pcm_bit_depth = 24
             pcm_channels = 2
-            pcm_resample = True
+            allow_resample = True
         elif streamdetails.sample_rate > queue.settings.max_sample_rate:
             pcm_sample_rate = queue.settings.max_sample_rate
             pcm_bit_depth = streamdetails.bit_depth
             pcm_channels = streamdetails.channels
-            pcm_resample = True
+            allow_resample = True
         else:
             pcm_sample_rate = streamdetails.sample_rate
             pcm_bit_depth = streamdetails.bit_depth
             pcm_channels = streamdetails.channels
-            pcm_resample = False
+            allow_resample = False
 
         self.queue_streams[stream_id] = stream = QueueStream(
             queue=queue,
@@ -283,7 +283,7 @@ class StreamsController:
             pcm_sample_rate=pcm_sample_rate,
             pcm_bit_depth=pcm_bit_depth,
             pcm_channels=pcm_channels,
-            pcm_resample=pcm_resample,
+            allow_resample=allow_resample,
             is_alert=is_alert,
             autostart=True,
         )
@@ -316,7 +316,7 @@ class QueueStream:
         pcm_bit_depth: int,
         pcm_channels: int = 2,
         pcm_floating_point: bool = False,
-        pcm_resample: bool = False,
+        allow_resample: bool = False,
         is_alert: bool = False,
         autostart: bool = False,
     ):
@@ -331,7 +331,7 @@ class QueueStream:
         self.pcm_bit_depth = pcm_bit_depth
         self.pcm_channels = pcm_channels
         self.pcm_floating_point = pcm_floating_point
-        self.pcm_resample = pcm_resample
+        self.allow_resample = allow_resample
         self.is_alert = is_alert
         self.url = queue.mass.streams.get_stream_url(stream_id, output_format)
 
@@ -345,14 +345,14 @@ class QueueStream:
         self.all_clients_connected = asyncio.Event()
         self.index_in_buffer = start_index
         self.signal_next: bool = False
-        self.chunk_size = get_chunksize(
-            output_format,
-            self.pcm_sample_rate,
-            self.pcm_bit_depth,
-            self.pcm_channels,
-        )
         self._runner_task: Optional[asyncio.Task] = None
         self._prev_chunk: bytes = b""
+        self.output_chunk_size = get_chunksize(
+            output_format,
+            pcm_sample_rate,
+            pcm_bit_depth,
+            pcm_channels,
+        )
         if autostart:
             self.mass.create_task(self.start())
 
@@ -370,7 +370,6 @@ class QueueStream:
 
         self._runner_task = None
         self.connected_clients = {}
-        self._prev_chunk = b""
 
         # run garbage collection manually due to the high number of
         # processed bytes blocks
@@ -392,11 +391,6 @@ class QueueStream:
         self.logger.debug("client connected: %s", client_id)
         if len(self.connected_clients) == self.expected_clients:
             self.all_clients_connected.set()
-
-        # workaround for reconnecting clients (such as kodi)
-        # send the previous chunk if we have one
-        if self._prev_chunk:
-            await callback(self._prev_chunk)
         try:
             await self.done.wait()
         finally:
@@ -405,7 +399,7 @@ class QueueStream:
             await self._check_stop()
 
     async def _queue_stream_runner(self) -> None:
-        """Distribute audio chunks over connected client queues."""
+        """Distribute audio chunks over connected client(s)."""
         # collect ffmpeg args
         input_format = ContentType.from_bit_depth(
             self.pcm_bit_depth, self.pcm_floating_point
@@ -414,7 +408,7 @@ class QueueStream:
             "ffmpeg",
             "-hide_banner",
             "-loglevel",
-            "error",
+            "quiet",
             "-ignore_unknown",
             # pcm input args
             "-f",
@@ -437,13 +431,12 @@ class QueueStream:
         ]
         # get the raw pcm bytes from the queue stream and on the fly encode to wanted format
         # send the compressed/encoded stream to the client(s).
-        async with AsyncProcess(ffmpeg_args, True, self.chunk_size) as ffmpeg_proc:
+        async with AsyncProcess(ffmpeg_args, True) as ffmpeg_proc:
 
             async def writer():
                 """Task that sends the raw pcm audio to the ffmpeg process."""
                 async for audio_chunk in self._get_queue_stream():
                     await ffmpeg_proc.write(audio_chunk)
-                    del audio_chunk
                 # write eof when last packet is received
                 ffmpeg_proc.write_eof()
 
@@ -463,7 +456,7 @@ class QueueStream:
 
             # Read bytes from final output and send chunk to child callback.
             chunk_num = 0
-            async for chunk in ffmpeg_proc.iterate_chunks():
+            async for chunk in ffmpeg_proc.iter_chunked(self.output_chunk_size):
                 chunk_num += 1
 
                 if len(self.connected_clients) == 0:
@@ -482,10 +475,6 @@ class QueueStream:
                     ):
                         self.connected_clients.pop(client_id, None)
 
-                # back off a bit after first chunk to handle reconnecting clients (e.g. kodi)
-                if chunk_num == 1:
-                    await asyncio.sleep(0.5)
-
             # complete queue streamed
             if self.signal_next:
                 # the queue stream was aborted (e.g. because of sample rate mismatch)
@@ -558,7 +547,7 @@ class QueueStream:
                 continue
 
             # check the PCM samplerate/bitrate
-            if not self.pcm_resample and streamdetails.bit_depth > self.pcm_bit_depth:
+            if not self.allow_resample and streamdetails.bit_depth > self.pcm_bit_depth:
                 self.signal_next = True
                 self.logger.debug(
                     "Abort queue stream %s due to bit depth mismatch",
@@ -566,7 +555,7 @@ class QueueStream:
                 )
                 break
             if (
-                not self.pcm_resample
+                not self.allow_resample
                 and streamdetails.sample_rate > self.pcm_sample_rate
                 and streamdetails.sample_rate <= self.queue.settings.max_sample_rate
             ):
@@ -627,7 +616,6 @@ class QueueStream:
                 pcm_fmt=pcm_fmt,
                 sample_rate=self.pcm_sample_rate,
                 channels=self.pcm_channels,
-                chunk_size=sample_size,
                 seek_position=seek_position,
             ):
                 chunk_count += 1
@@ -644,21 +632,18 @@ class QueueStream:
                 if queue_track.duration is None or queue_track.duration < 30:
                     bytes_written += len(chunk)
                     yield chunk
-                    del chunk
                     continue
 
                 # first part of track and we need to (cross)fade: fill buffer
                 if bytes_written < buf_size and (last_fadeout_part or fade_in):
                     bytes_written += len(chunk)
                     buffer += chunk
-                    del chunk
                     continue
 
                 # last part of track: fill buffer
                 if bytes_written >= (total_size - buf_size):
                     bytes_written += len(chunk)
                     buffer += chunk
-                    del chunk
                     continue
 
                 # buffer full for fade-in / crossfade
@@ -682,13 +667,10 @@ class QueueStream:
                         # send crossfade_part
                         yield crossfade_part
                         bytes_written += len(crossfade_part)
-                        del crossfade_part
-                        del fadein_part
                         # also write the leftover bytes from the strip action
                         if remaining_bytes:
                             yield remaining_bytes
                             bytes_written += len(remaining_bytes)
-                            del remaining_bytes
                     else:
                         # fade-in
                         fadein_part = await fadein_pcm_part(
@@ -699,45 +681,40 @@ class QueueStream:
                         )
                         yield fadein_part
                         bytes_written += len(fadein_part)
-                        del fadein_part
 
                     # clear vars
                     last_fadeout_part = b""
-                    del first_part
-                    del chunk
                     buffer = b""
                     continue
 
                 # all other: middle of track or no fade actions, just yield the audio
                 bytes_written += len(chunk)
                 yield chunk
-                del chunk
                 continue
 
             #### HANDLE END OF TRACK
 
-            # strip silence from end of audio
-            last_part = await strip_silence(
-                buffer, pcm_fmt, self.pcm_sample_rate, reverse=True
-            )
+            if buffer:
+                # strip silence from end of audio
+                last_part = await strip_silence(
+                    buffer, pcm_fmt, self.pcm_sample_rate, reverse=True
+                )
 
-            # handle crossfading support
-            # store fade section to be picked up for next track
-
-            if use_crossfade:
-                # crossfade is enabled, save fadeout part to pickup for next track
-                last_part = last_part[-crossfade_size:]
-                remaining_bytes = last_part[:-crossfade_size]
-                # yield remaining bytes
-                bytes_written += len(remaining_bytes)
-                yield remaining_bytes
-                last_fadeout_part = last_part
-                del remaining_bytes
-            else:
-                # no crossfade enabled, just yield the stripped audio data
-                bytes_written += len(last_part)
-                yield last_part
-                del last_part
+                # handle crossfading support
+                # store fade section to be picked up for next track
+
+                if use_crossfade:
+                    # crossfade is enabled, save fadeout part to pickup for next track
+                    last_part = last_part[-crossfade_size:]
+                    remaining_bytes = last_part[:-crossfade_size]
+                    # yield remaining bytes
+                    bytes_written += len(remaining_bytes)
+                    yield remaining_bytes
+                    last_fadeout_part = last_part
+                else:
+                    # no crossfade enabled, just yield the stripped audio data
+                    bytes_written += len(last_part)
+                    yield last_part
 
             # end of the track reached
             queue_track.streamdetails.seconds_streamed = bytes_written / sample_size
@@ -748,8 +725,8 @@ class QueueStream:
                 self.queue.player.name,
             )
         # end of queue reached, pass last fadeout bits to final output
-        yield last_fadeout_part
-        del last_fadeout_part
+        if last_fadeout_part:
+            yield last_fadeout_part
         # END OF QUEUE STREAM
         self.logger.debug("Queue stream for Queue %s finished.", self.queue.player.name)
 
index 6bc9d76f59dc9a05fb8470ffe713ec1969905ba5..19a5a8256e0ada2263cec9e25d1f00c61393f9fc 100644 (file)
@@ -8,7 +8,7 @@ import re
 import struct
 from io import BytesIO
 from time import time
-from typing import TYPE_CHECKING, AsyncGenerator, List, Optional, Tuple
+from typing import TYPE_CHECKING, AsyncGenerator, List, Tuple
 
 import aiofiles
 from aiohttp import ClientError, ClientTimeout
@@ -49,7 +49,7 @@ async def crossfade_pcm_parts(
         "ffmpeg",
         "-hide_banner",
         "-loglevel",
-        "error",
+        "quiet",
         # fadeout part (as file)
         "-acodec",
         fmt.name.lower(),
@@ -104,7 +104,7 @@ async def fadein_pcm_part(
         "ffmpeg",
         "-hide_banner",
         "-loglevel",
-        "error",
+        "quiet",
         # fade_in part (stdin)
         "-acodec",
         fmt.name.lower(),
@@ -138,7 +138,7 @@ async def strip_silence(
 ) -> bytes:
     """Strip silence from (a chunk of) pcm audio."""
     # input args
-    args = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
+    args = ["ffmpeg", "-hide_banner", "-loglevel", "quiet"]
     args += [
         "-acodec",
         fmt.name.lower(),
@@ -385,7 +385,6 @@ async def get_media_stream(
     pcm_fmt: ContentType,
     sample_rate: int,
     channels: int = 2,
-    chunk_size: Optional[int] = None,
     seek_position: int = 0,
 ) -> AsyncGenerator[bytes, None]:
     """Get the PCM audio stream for the given streamdetails."""
@@ -393,9 +392,7 @@ async def get_media_stream(
     args = await _get_ffmpeg_args(
         streamdetails, pcm_fmt, pcm_sample_rate=sample_rate, pcm_channels=channels
     )
-    async with AsyncProcess(
-        args, enable_stdin=True, chunk_size=chunk_size
-    ) as ffmpeg_proc:
+    async with AsyncProcess(args, enable_stdin=True) as ffmpeg_proc:
 
         LOGGER.debug(
             "start media stream for: %s, using args: %s", streamdetails.uri, str(args)
@@ -416,8 +413,9 @@ async def get_media_stream(
         ffmpeg_proc.attach_task(writer())
 
         # yield chunks from stdout
+        sample_size = get_chunksize(pcm_fmt, sample_rate, 24, channels, 10)
         try:
-            async for chunk in ffmpeg_proc.iterate_chunks():
+            async for chunk in ffmpeg_proc.iter_any(sample_size):
                 yield chunk
 
         except (asyncio.CancelledError, GeneratorExit) as err:
@@ -531,7 +529,6 @@ async def get_http_stream(
     if buffer_all:
         skip_bytes = streamdetails.size / streamdetails.duration * seek_position
         yield buffer[:skip_bytes]
-        del buffer
 
 
 async def get_file_stream(
@@ -600,7 +597,7 @@ async def get_preview_stream(
         "ffmpeg",
         "-hide_banner",
         "-loglevel",
-        "error",
+        "quiet",
         "-f",
         streamdetails.content_type.value,
         "-i",
@@ -621,7 +618,7 @@ async def get_preview_stream(
         ffmpeg_proc.attach_task(writer())
 
         # yield chunks from stdout
-        async for chunk in ffmpeg_proc.iterate_chunks():
+        async for chunk in ffmpeg_proc.iter_any():
             yield chunk
 
 
@@ -651,7 +648,7 @@ async def get_silence(
         "ffmpeg",
         "-hide_banner",
         "-loglevel",
-        "error",
+        "quiet",
         "-f",
         "lavfi",
         "-i",
@@ -662,9 +659,8 @@ async def get_silence(
         output_fmt.value,
         "-",
     ]
-    chunk_size = get_chunksize(output_fmt)
-    async with AsyncProcess(args, chunk_size=chunk_size) as ffmpeg_proc:
-        async for chunk in ffmpeg_proc.iterate_chunks():
+    async with AsyncProcess(args) as ffmpeg_proc:
+        async for chunk in ffmpeg_proc.iter_any():
             yield chunk
 
 
@@ -679,21 +675,13 @@ def get_chunksize(
     pcm_size = int(sample_rate * (bit_depth / 8) * channels * seconds)
     if content_type.is_pcm() or content_type == ContentType.WAV:
         return pcm_size
-    if content_type == ContentType.FLAC:
-        return int(pcm_size * 0.61)
-    if content_type == ContentType.WAVPACK:
-        return int(pcm_size * 0.60)
-    if content_type in (
-        ContentType.AAC,
-        ContentType.M4A,
-    ):
-        return int(256000 * seconds)
-    if content_type in (
-        ContentType.MP3,
-        ContentType.OGG,
-    ):
-        return int(320000 * seconds)
-    return 256000
+    if content_type in (ContentType.WAV, ContentType.AIFF, ContentType.DSF):
+        return pcm_size
+    if content_type in (ContentType.FLAC, ContentType.WAVPACK, ContentType.ALAC):
+        return int(pcm_size * 0.6)
+    if content_type in (ContentType.MP3, ContentType.OGG, ContentType.M4A):
+        return int(640000 * seconds)
+    return 32000 * seconds
 
 
 async def _get_ffmpeg_args(
@@ -718,7 +706,7 @@ async def _get_ffmpeg_args(
         "ffmpeg",
         "-hide_banner",
         "-loglevel",
-        "error",
+        "quiet",
         "-ignore_unknown",
     ]
     if streamdetails.content_type != ContentType.UNKNOWN:
index ed33b76d600d280be4683dcab4bbcd96eb4ce803..d88c4ca4f59224e57b973bf47e525ee6bb18f28f 100644 (file)
@@ -17,6 +17,8 @@ LOGGER = logging.getLogger(__name__)
 DEFAULT_CHUNKSIZE = 128000
 DEFAULT_TIMEOUT = 120
 
+# pylint: disable=invalid-name
+
 
 class AsyncProcess:
     """Implementation of a (truly) non blocking subprocess."""
@@ -25,7 +27,6 @@ class AsyncProcess:
         self,
         args: Union[List, str],
         enable_stdin: bool = False,
-        chunk_size: int = DEFAULT_CHUNKSIZE,
         enable_stdout: bool = True,
         enable_stderr: bool = False,
     ):
@@ -33,7 +34,6 @@ class AsyncProcess:
         self._proc = None
         self._args = args
         self._enable_stdin = enable_stdin
-        self.chunk_size = chunk_size or DEFAULT_CHUNKSIZE
         self._enable_stdout = enable_stdout
         self._enable_stderr = enable_stderr
         self._attached_task: asyncio.Task = None
@@ -52,7 +52,7 @@ class AsyncProcess:
                 stdin=asyncio.subprocess.PIPE if self._enable_stdin else None,
                 stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
                 stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
-                limit=64000000,
+                limit=32 * 1024 * 1024,
                 close_fds=True,
             )
         else:
@@ -61,7 +61,7 @@ class AsyncProcess:
                 stdin=asyncio.subprocess.PIPE if self._enable_stdin else None,
                 stdout=asyncio.subprocess.PIPE if self._enable_stdout else None,
                 stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
-                limit=64000000,
+                limit=32 * 1024 * 102,
                 close_fds=True,
             )
         return self
@@ -83,28 +83,52 @@ class AsyncProcess:
                 # just in case?
                 self._proc.kill()
 
-    async def iterate_chunks(self) -> AsyncGenerator[bytes, None]:
-        """Yield chunks from the process stdout. Generator."""
+    async def iter_chunked(
+        self, n: int = DEFAULT_CHUNKSIZE
+    ) -> AsyncGenerator[bytes, None]:
+        """Yield chunks of n size from the process stdout."""
         while True:
-            chunk = await self._read_chunk()
+            chunk = await self.readexactly(n)
             yield chunk
-            if len(chunk) < self.chunk_size:
-                del chunk
+            if len(chunk) < n:
                 break
-            del chunk
 
-    async def _read_chunk(self, timeout: int = DEFAULT_TIMEOUT) -> bytes:
-        """Read chunk_size bytes from the process stdout."""
+    async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
+        """Yield chunks as they come in from process stdout."""
+        while True:
+            chunk = await self._proc.stdout.read(n)
+            if chunk == b"":
+                break
+            yield chunk
+
+    async def readexactly(self, n: int, timeout: int = DEFAULT_TIMEOUT) -> bytes:
+        """Read exactly n bytes from the process stdout (or less if eof)."""
         if self.closed:
             return b""
         try:
             async with _timeout(timeout):
-                return await self._proc.stdout.readexactly(self.chunk_size)
+                return await self._proc.stdout.readexactly(n)
         except asyncio.IncompleteReadError as err:
             return err.partial
         except asyncio.TimeoutError:
             return b""
 
+    async def read(self, n: int, timeout: int = DEFAULT_TIMEOUT) -> bytes:
+        """
+        Read up to n bytes from the stdout stream.
+
+        If n is positive, this function try to read n bytes,
+        and may return less or equal bytes than requested, but at least one byte.
+        If EOF was received before any byte is read, this function returns empty byte object.
+        """
+        if self.closed:
+            return b""
+        try:
+            async with _timeout(timeout):
+                return await self._proc.stdout.read(n)
+        except asyncio.TimeoutError:
+            return b""
+
     async def write(self, data: bytes) -> None:
         """Write data to process stdin."""
         if self.closed:
index fcb06d1f9526a9c68241a87cee1ae182a04c2d66..b3d9011fe32eb9659a9dde35e0ab94cbcb0d3756 100644 (file)
@@ -318,7 +318,7 @@ class SpotifyProvider(MusicProvider):
             args += ["--ap-port", "12345"]
         bytes_sent = 0
         async with AsyncProcess(args) as librespot_proc:
-            async for chunk in librespot_proc.iterate_chunks():
+            async for chunk in librespot_proc.iter_any():
                 yield chunk
                 bytes_sent += len(chunk)
 
@@ -328,7 +328,7 @@ class SpotifyProvider(MusicProvider):
             # retry with ap-port set to invalid value, which will force fallback
             args += ["--ap-port", "12345"]
             async with AsyncProcess(args) as librespot_proc:
-                async for chunk in librespot_proc.iterate_chunks():
+                async for chunk in librespot_proc.iter_any(64000):
                     yield chunk
             self._ap_workaround = True