From 6d30449c61b92798cd73fa724e804156adfb33ba Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Sun, 10 Jul 2022 23:11:22 +0200 Subject: [PATCH] Allow more aggressive buffering by players that require it (#405) Allow more agressive buffering For example cast players (especially with Hires audio) want to buffer ahead, allow this without breaking the scenario where the audio source itself is throttled (streaming providers like YT Music) --- music_assistant/controllers/streams.py | 107 ++++++++------------- music_assistant/helpers/audio.py | 52 ++++------ music_assistant/helpers/process.py | 50 +++++++--- music_assistant/music_providers/spotify.py | 4 +- 4 files changed, 101 insertions(+), 112 deletions(-) diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index 80f1eb53..15fa89b8 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -182,7 +182,7 @@ class StreamsController: "icy-name": "Streaming from Music Assistant", "icy-pub": "0", "Cache-Control": "no-cache", - "icy-metaint": str(queue_stream.chunk_size), + "icy-metaint": str(queue_stream.output_chunk_size), "contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000", } @@ -256,22 +256,22 @@ class StreamsController: pcm_sample_rate = 41000 pcm_bit_depth = 16 pcm_channels = 2 - pcm_resample = True + allow_resample = True elif queue.settings.crossfade_mode == CrossFadeMode.ALWAYS: pcm_sample_rate = min(96000, queue.settings.max_sample_rate) pcm_bit_depth = 24 pcm_channels = 2 - pcm_resample = True + allow_resample = True elif streamdetails.sample_rate > queue.settings.max_sample_rate: pcm_sample_rate = queue.settings.max_sample_rate pcm_bit_depth = streamdetails.bit_depth pcm_channels = streamdetails.channels - pcm_resample = True + allow_resample = True else: pcm_sample_rate = streamdetails.sample_rate pcm_bit_depth = streamdetails.bit_depth pcm_channels = streamdetails.channels - pcm_resample = False + allow_resample = False self.queue_streams[stream_id] = stream = QueueStream( queue=queue, @@ -283,7 +283,7 @@ class StreamsController: pcm_sample_rate=pcm_sample_rate, pcm_bit_depth=pcm_bit_depth, pcm_channels=pcm_channels, - pcm_resample=pcm_resample, + allow_resample=allow_resample, is_alert=is_alert, autostart=True, ) @@ -316,7 +316,7 @@ class QueueStream: pcm_bit_depth: int, pcm_channels: int = 2, pcm_floating_point: bool = False, - pcm_resample: bool = False, + allow_resample: bool = False, is_alert: bool = False, autostart: bool = False, ): @@ -331,7 +331,7 @@ class QueueStream: self.pcm_bit_depth = pcm_bit_depth self.pcm_channels = pcm_channels self.pcm_floating_point = pcm_floating_point - self.pcm_resample = pcm_resample + self.allow_resample = allow_resample self.is_alert = is_alert self.url = queue.mass.streams.get_stream_url(stream_id, output_format) @@ -345,14 +345,14 @@ class QueueStream: self.all_clients_connected = asyncio.Event() self.index_in_buffer = start_index self.signal_next: bool = False - self.chunk_size = get_chunksize( - output_format, - self.pcm_sample_rate, - self.pcm_bit_depth, - self.pcm_channels, - ) self._runner_task: Optional[asyncio.Task] = None self._prev_chunk: bytes = b"" + self.output_chunk_size = get_chunksize( + output_format, + pcm_sample_rate, + pcm_bit_depth, + pcm_channels, + ) if autostart: self.mass.create_task(self.start()) @@ -370,7 +370,6 @@ class QueueStream: self._runner_task = None self.connected_clients = {} - self._prev_chunk = b"" # run garbage collection manually due to the high number of # processed bytes blocks @@ -392,11 +391,6 @@ class QueueStream: self.logger.debug("client connected: %s", client_id) if len(self.connected_clients) == self.expected_clients: self.all_clients_connected.set() - - # workaround for reconnecting clients (such as kodi) - # send the previous chunk if we have one - if self._prev_chunk: - await callback(self._prev_chunk) try: await self.done.wait() finally: @@ -405,7 +399,7 @@ class QueueStream: await self._check_stop() async def _queue_stream_runner(self) -> None: - """Distribute audio chunks over connected client queues.""" + """Distribute audio chunks over connected client(s).""" # collect ffmpeg args input_format = ContentType.from_bit_depth( self.pcm_bit_depth, self.pcm_floating_point @@ -414,7 +408,7 @@ class QueueStream: "ffmpeg", "-hide_banner", "-loglevel", - "error", + "quiet", "-ignore_unknown", # pcm input args "-f", @@ -437,13 +431,12 @@ class QueueStream: ] # get the raw pcm bytes from the queue stream and on the fly encode to wanted format # send the compressed/encoded stream to the client(s). - async with AsyncProcess(ffmpeg_args, True, self.chunk_size) as ffmpeg_proc: + async with AsyncProcess(ffmpeg_args, True) as ffmpeg_proc: async def writer(): """Task that sends the raw pcm audio to the ffmpeg process.""" async for audio_chunk in self._get_queue_stream(): await ffmpeg_proc.write(audio_chunk) - del audio_chunk # write eof when last packet is received ffmpeg_proc.write_eof() @@ -463,7 +456,7 @@ class QueueStream: # Read bytes from final output and send chunk to child callback. chunk_num = 0 - async for chunk in ffmpeg_proc.iterate_chunks(): + async for chunk in ffmpeg_proc.iter_chunked(self.output_chunk_size): chunk_num += 1 if len(self.connected_clients) == 0: @@ -482,10 +475,6 @@ class QueueStream: ): self.connected_clients.pop(client_id, None) - # back off a bit after first chunk to handle reconnecting clients (e.g. kodi) - if chunk_num == 1: - await asyncio.sleep(0.5) - # complete queue streamed if self.signal_next: # the queue stream was aborted (e.g. because of sample rate mismatch) @@ -558,7 +547,7 @@ class QueueStream: continue # check the PCM samplerate/bitrate - if not self.pcm_resample and streamdetails.bit_depth > self.pcm_bit_depth: + if not self.allow_resample and streamdetails.bit_depth > self.pcm_bit_depth: self.signal_next = True self.logger.debug( "Abort queue stream %s due to bit depth mismatch", @@ -566,7 +555,7 @@ class QueueStream: ) break if ( - not self.pcm_resample + not self.allow_resample and streamdetails.sample_rate > self.pcm_sample_rate and streamdetails.sample_rate <= self.queue.settings.max_sample_rate ): @@ -627,7 +616,6 @@ class QueueStream: pcm_fmt=pcm_fmt, sample_rate=self.pcm_sample_rate, channels=self.pcm_channels, - chunk_size=sample_size, seek_position=seek_position, ): chunk_count += 1 @@ -644,21 +632,18 @@ class QueueStream: if queue_track.duration is None or queue_track.duration < 30: bytes_written += len(chunk) yield chunk - del chunk continue # first part of track and we need to (cross)fade: fill buffer if bytes_written < buf_size and (last_fadeout_part or fade_in): bytes_written += len(chunk) buffer += chunk - del chunk continue # last part of track: fill buffer if bytes_written >= (total_size - buf_size): bytes_written += len(chunk) buffer += chunk - del chunk continue # buffer full for fade-in / crossfade @@ -682,13 +667,10 @@ class QueueStream: # send crossfade_part yield crossfade_part bytes_written += len(crossfade_part) - del crossfade_part - del fadein_part # also write the leftover bytes from the strip action if remaining_bytes: yield remaining_bytes bytes_written += len(remaining_bytes) - del remaining_bytes else: # fade-in fadein_part = await fadein_pcm_part( @@ -699,45 +681,40 @@ class QueueStream: ) yield fadein_part bytes_written += len(fadein_part) - del fadein_part # clear vars last_fadeout_part = b"" - del first_part - del chunk buffer = b"" continue # all other: middle of track or no fade actions, just yield the audio bytes_written += len(chunk) yield chunk - del chunk continue #### HANDLE END OF TRACK - # strip silence from end of audio - last_part = await strip_silence( - buffer, pcm_fmt, self.pcm_sample_rate, reverse=True - ) + if buffer: + # strip silence from end of audio + last_part = await strip_silence( + buffer, pcm_fmt, self.pcm_sample_rate, reverse=True + ) - # handle crossfading support - # store fade section to be picked up for next track - - if use_crossfade: - # crossfade is enabled, save fadeout part to pickup for next track - last_part = last_part[-crossfade_size:] - remaining_bytes = last_part[:-crossfade_size] - # yield remaining bytes - bytes_written += len(remaining_bytes) - yield remaining_bytes - last_fadeout_part = last_part - del remaining_bytes - else: - # no crossfade enabled, just yield the stripped audio data - bytes_written += len(last_part) - yield last_part - del last_part + # handle crossfading support + # store fade section to be picked up for next track + + if use_crossfade: + # crossfade is enabled, save fadeout part to pickup for next track + last_part = last_part[-crossfade_size:] + remaining_bytes = last_part[:-crossfade_size] + # yield remaining bytes + bytes_written += len(remaining_bytes) + yield remaining_bytes + last_fadeout_part = last_part + else: + # no crossfade enabled, just yield the stripped audio data + bytes_written += len(last_part) + yield last_part # end of the track reached queue_track.streamdetails.seconds_streamed = bytes_written / sample_size @@ -748,8 +725,8 @@ class QueueStream: self.queue.player.name, ) # end of queue reached, pass last fadeout bits to final output - yield last_fadeout_part - del last_fadeout_part + if last_fadeout_part: + yield last_fadeout_part # END OF QUEUE STREAM self.logger.debug("Queue stream for Queue %s finished.", self.queue.player.name) diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index 6bc9d76f..19a5a825 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -8,7 +8,7 @@ import re import struct from io import BytesIO from time import time -from typing import TYPE_CHECKING, AsyncGenerator, List, Optional, Tuple +from typing import TYPE_CHECKING, AsyncGenerator, List, Tuple import aiofiles from aiohttp import ClientError, ClientTimeout @@ -49,7 +49,7 @@ async def crossfade_pcm_parts( "ffmpeg", "-hide_banner", "-loglevel", - "error", + "quiet", # fadeout part (as file) "-acodec", fmt.name.lower(), @@ -104,7 +104,7 @@ async def fadein_pcm_part( "ffmpeg", "-hide_banner", "-loglevel", - "error", + "quiet", # fade_in part (stdin) "-acodec", fmt.name.lower(), @@ -138,7 +138,7 @@ async def strip_silence( ) -> bytes: """Strip silence from (a chunk of) pcm audio.""" # input args - args = ["ffmpeg", "-hide_banner", "-loglevel", "error"] + args = ["ffmpeg", "-hide_banner", "-loglevel", "quiet"] args += [ "-acodec", fmt.name.lower(), @@ -385,7 +385,6 @@ async def get_media_stream( pcm_fmt: ContentType, sample_rate: int, channels: int = 2, - chunk_size: Optional[int] = None, seek_position: int = 0, ) -> AsyncGenerator[bytes, None]: """Get the PCM audio stream for the given streamdetails.""" @@ -393,9 +392,7 @@ async def get_media_stream( args = await _get_ffmpeg_args( streamdetails, pcm_fmt, pcm_sample_rate=sample_rate, pcm_channels=channels ) - async with AsyncProcess( - args, enable_stdin=True, chunk_size=chunk_size - ) as ffmpeg_proc: + async with AsyncProcess(args, enable_stdin=True) as ffmpeg_proc: LOGGER.debug( "start media stream for: %s, using args: %s", streamdetails.uri, str(args) @@ -416,8 +413,9 @@ async def get_media_stream( ffmpeg_proc.attach_task(writer()) # yield chunks from stdout + sample_size = get_chunksize(pcm_fmt, sample_rate, 24, channels, 10) try: - async for chunk in ffmpeg_proc.iterate_chunks(): + async for chunk in ffmpeg_proc.iter_any(sample_size): yield chunk except (asyncio.CancelledError, GeneratorExit) as err: @@ -531,7 +529,6 @@ async def get_http_stream( if buffer_all: skip_bytes = streamdetails.size / streamdetails.duration * seek_position yield buffer[:skip_bytes] - del buffer async def get_file_stream( @@ -600,7 +597,7 @@ async def get_preview_stream( "ffmpeg", "-hide_banner", "-loglevel", - "error", + "quiet", "-f", streamdetails.content_type.value, "-i", @@ -621,7 +618,7 @@ async def get_preview_stream( ffmpeg_proc.attach_task(writer()) # yield chunks from stdout - async for chunk in ffmpeg_proc.iterate_chunks(): + async for chunk in ffmpeg_proc.iter_any(): yield chunk @@ -651,7 +648,7 @@ async def get_silence( "ffmpeg", "-hide_banner", "-loglevel", - "error", + "quiet", "-f", "lavfi", "-i", @@ -662,9 +659,8 @@ async def get_silence( output_fmt.value, "-", ] - chunk_size = get_chunksize(output_fmt) - async with AsyncProcess(args, chunk_size=chunk_size) as ffmpeg_proc: - async for chunk in ffmpeg_proc.iterate_chunks(): + async with AsyncProcess(args) as ffmpeg_proc: + async for chunk in ffmpeg_proc.iter_any(): yield chunk @@ -679,21 +675,13 @@ def get_chunksize( pcm_size = int(sample_rate * (bit_depth / 8) * channels * seconds) if content_type.is_pcm() or content_type == ContentType.WAV: return pcm_size - if content_type == ContentType.FLAC: - return int(pcm_size * 0.61) - if content_type == ContentType.WAVPACK: - return int(pcm_size * 0.60) - if content_type in ( - ContentType.AAC, - ContentType.M4A, - ): - return int(256000 * seconds) - if content_type in ( - ContentType.MP3, - ContentType.OGG, - ): - return int(320000 * seconds) - return 256000 + if content_type in (ContentType.WAV, ContentType.AIFF, ContentType.DSF): + return pcm_size + if content_type in (ContentType.FLAC, ContentType.WAVPACK, ContentType.ALAC): + return int(pcm_size * 0.6) + if content_type in (ContentType.MP3, ContentType.OGG, ContentType.M4A): + return int(640000 * seconds) + return 32000 * seconds async def _get_ffmpeg_args( @@ -718,7 +706,7 @@ async def _get_ffmpeg_args( "ffmpeg", "-hide_banner", "-loglevel", - "error", + "quiet", "-ignore_unknown", ] if streamdetails.content_type != ContentType.UNKNOWN: diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index ed33b76d..d88c4ca4 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -17,6 +17,8 @@ LOGGER = logging.getLogger(__name__) DEFAULT_CHUNKSIZE = 128000 DEFAULT_TIMEOUT = 120 +# pylint: disable=invalid-name + class AsyncProcess: """Implementation of a (truly) non blocking subprocess.""" @@ -25,7 +27,6 @@ class AsyncProcess: self, args: Union[List, str], enable_stdin: bool = False, - chunk_size: int = DEFAULT_CHUNKSIZE, enable_stdout: bool = True, enable_stderr: bool = False, ): @@ -33,7 +34,6 @@ class AsyncProcess: self._proc = None self._args = args self._enable_stdin = enable_stdin - self.chunk_size = chunk_size or DEFAULT_CHUNKSIZE self._enable_stdout = enable_stdout self._enable_stderr = enable_stderr self._attached_task: asyncio.Task = None @@ -52,7 +52,7 @@ class AsyncProcess: stdin=asyncio.subprocess.PIPE if self._enable_stdin else None, stdout=asyncio.subprocess.PIPE if self._enable_stdout else None, stderr=asyncio.subprocess.PIPE if self._enable_stderr else None, - limit=64000000, + limit=32 * 1024 * 1024, close_fds=True, ) else: @@ -61,7 +61,7 @@ class AsyncProcess: stdin=asyncio.subprocess.PIPE if self._enable_stdin else None, stdout=asyncio.subprocess.PIPE if self._enable_stdout else None, stderr=asyncio.subprocess.PIPE if self._enable_stderr else None, - limit=64000000, + limit=32 * 1024 * 102, close_fds=True, ) return self @@ -83,28 +83,52 @@ class AsyncProcess: # just in case? self._proc.kill() - async def iterate_chunks(self) -> AsyncGenerator[bytes, None]: - """Yield chunks from the process stdout. Generator.""" + async def iter_chunked( + self, n: int = DEFAULT_CHUNKSIZE + ) -> AsyncGenerator[bytes, None]: + """Yield chunks of n size from the process stdout.""" while True: - chunk = await self._read_chunk() + chunk = await self.readexactly(n) yield chunk - if len(chunk) < self.chunk_size: - del chunk + if len(chunk) < n: break - del chunk - async def _read_chunk(self, timeout: int = DEFAULT_TIMEOUT) -> bytes: - """Read chunk_size bytes from the process stdout.""" + async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: + """Yield chunks as they come in from process stdout.""" + while True: + chunk = await self._proc.stdout.read(n) + if chunk == b"": + break + yield chunk + + async def readexactly(self, n: int, timeout: int = DEFAULT_TIMEOUT) -> bytes: + """Read exactly n bytes from the process stdout (or less if eof).""" if self.closed: return b"" try: async with _timeout(timeout): - return await self._proc.stdout.readexactly(self.chunk_size) + return await self._proc.stdout.readexactly(n) except asyncio.IncompleteReadError as err: return err.partial except asyncio.TimeoutError: return b"" + async def read(self, n: int, timeout: int = DEFAULT_TIMEOUT) -> bytes: + """ + Read up to n bytes from the stdout stream. + + If n is positive, this function try to read n bytes, + and may return less or equal bytes than requested, but at least one byte. + If EOF was received before any byte is read, this function returns empty byte object. + """ + if self.closed: + return b"" + try: + async with _timeout(timeout): + return await self._proc.stdout.read(n) + except asyncio.TimeoutError: + return b"" + async def write(self, data: bytes) -> None: """Write data to process stdin.""" if self.closed: diff --git a/music_assistant/music_providers/spotify.py b/music_assistant/music_providers/spotify.py index fcb06d1f..b3d9011f 100644 --- a/music_assistant/music_providers/spotify.py +++ b/music_assistant/music_providers/spotify.py @@ -318,7 +318,7 @@ class SpotifyProvider(MusicProvider): args += ["--ap-port", "12345"] bytes_sent = 0 async with AsyncProcess(args) as librespot_proc: - async for chunk in librespot_proc.iterate_chunks(): + async for chunk in librespot_proc.iter_any(): yield chunk bytes_sent += len(chunk) @@ -328,7 +328,7 @@ class SpotifyProvider(MusicProvider): # retry with ap-port set to invalid value, which will force fallback args += ["--ap-port", "12345"] async with AsyncProcess(args) as librespot_proc: - async for chunk in librespot_proc.iterate_chunks(): + async for chunk in librespot_proc.iter_any(64000): yield chunk self._ap_workaround = True -- 2.34.1