From 47d53b9fcebac13f7944b1a3890bc086a92c904b Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Fri, 12 Apr 2024 10:40:23 +0200 Subject: [PATCH] fix buffer issues --- music_assistant/server/controllers/streams.py | 65 ++++++++++--------- music_assistant/server/helpers/audio.py | 7 +- music_assistant/server/helpers/process.py | 25 ++++--- 3 files changed, 53 insertions(+), 44 deletions(-) diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 66c7835b..30f48144 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -19,12 +19,7 @@ from typing import TYPE_CHECKING from aiofiles.os import wrap from aiohttp import web -from music_assistant.common.helpers.util import ( - get_ip, - select_free_port, - try_parse_bool, - try_parse_duration, -) +from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool from music_assistant.common.models.config_entries import ( ConfigEntry, ConfigValueOption, @@ -49,7 +44,6 @@ from music_assistant.server.helpers.audio import ( FFMpeg, check_audio_support, crossfade_pcm_parts, - get_chunksize, get_ffmpeg_stream, get_hls_stream, get_icy_stream, @@ -111,8 +105,7 @@ class StreamsController(CoreController): self.manifest.name = "Streamserver" self.manifest.description = ( "Music Assistant's core controller that is responsible for " - "streaming audio to players on the local network as well as " - "some player specific local control callbacks." + "streaming audio to players on the local network." ) self.manifest.icon = "cast-audio" self.announcements: dict[str, str] = {} @@ -288,10 +281,25 @@ class StreamsController(CoreController): queue.display_name, ) queue.index_in_buffer = self.mass.player_queues.index_by_id(queue_id, queue_item_id) - async for chunk in self.get_media_stream( - streamdetails=queue_item.streamdetails, + pcm_format = AudioFormat( + content_type=ContentType.from_bit_depth(output_format.bit_depth), + sample_rate=queue_item.streamdetails.audio_format.sample_rate, + bit_depth=queue_item.streamdetails.audio_format.bit_depth, + channels=2, + ) + async for chunk in get_ffmpeg_stream( + audio_input=self.get_media_stream( + streamdetails=queue_item.streamdetails, + pcm_format=pcm_format, + ), + input_format=pcm_format, output_format=output_format, - extra_filter_params=get_player_filter_params(self.mass, queue_player.player_id), + filter_params=get_player_filter_params(self.mass, queue_player.player_id), + extra_input_args=[ + # use readrate to limit buffering ahead too much + "-readrate", + "1.2", + ], ): try: await resp.write(chunk) @@ -364,6 +372,11 @@ class StreamsController(CoreController): output_format=output_format, filter_params=get_player_filter_params(self.mass, queue_player.player_id), chunk_size=icy_meta_interval if enable_icy else None, + extra_input_args=[ + # use readrate to limit buffering ahead too much + "-readrate", + "1.2", + ], ): try: await resp.write(chunk) @@ -545,7 +558,7 @@ class StreamsController(CoreController): # handle incoming audio chunks async for chunk in self.get_media_stream( queue_track.streamdetails, - output_format=pcm_format, + pcm_format=pcm_format, ): # buffer size needs to be big enough to include the crossfade part # allow it to be a bit smaller when playback just starts @@ -688,16 +701,15 @@ class StreamsController(CoreController): async def get_media_stream( self, streamdetails: StreamDetails, - output_format: AudioFormat, - extra_filter_params: list[str] | None = None, + pcm_format: AudioFormat, ) -> AsyncGenerator[tuple[bool, bytes], None]: - """Get the audio stream for the given streamdetails.""" + """Get the audio stream for the given streamdetails as raw pcm chunks.""" logger = self.logger.getChild("media_stream") is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration if is_radio: streamdetails.seek_position = 0 # collect all arguments for ffmpeg - filter_params = extra_filter_params or [] + filter_params = [] if streamdetails.target_loudness is not None: # add loudnorm filters filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-1.5:LRA=11" @@ -734,20 +746,20 @@ class StreamsController(CoreController): async with FFMpeg( audio_input=audio_source, input_format=streamdetails.audio_format, - output_format=output_format, + output_format=pcm_format, filter_params=filter_params, extra_input_args=[ *extra_input_args, # we criple ffmpeg a bit on purpose with the filter_threads # option so it doesn't consume all cpu when calculating loudnorm "-filter_threads", - "1", + "2", ], collect_log_history=True, logger=logger, ) as ffmpeg_proc: try: - async for chunk in ffmpeg_proc.iter_any(get_chunksize(output_format)): + async for chunk in ffmpeg_proc.iter_any(pcm_format.pcm_sample_size): bytes_sent += len(chunk) yield chunk del chunk @@ -759,15 +771,7 @@ class StreamsController(CoreController): await ffmpeg_proc.close() # try to determine how many seconds we've streamed - seconds_streamed = 0 - if output_format.content_type.is_pcm(): - seconds_streamed = ( - bytes_sent / output_format.pcm_sample_size if bytes_sent else 0 - ) - elif line := next((x for x in ffmpeg_proc.log_history if "time=" in x), None): - duration_str = line.split("time=")[1].split(" ")[0] - seconds_streamed = try_parse_duration(duration_str) - + seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 logger.debug( "stream %s (with code %s) for %s - seconds streamed: %s", "finished" if finished else "aborted", @@ -775,7 +779,6 @@ class StreamsController(CoreController): streamdetails.uri, seconds_streamed, ) - if seconds_streamed: streamdetails.seconds_streamed = seconds_streamed # store accurate duration @@ -797,9 +800,7 @@ class StreamsController(CoreController): streamdetails.item_id, streamdetails.provider, loudness_details ) ) - # report playback - # TODO: Move this to the queue controller ? if finished or seconds_streamed > 30: self.mass.create_task( self.mass.music.mark_item_played( diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 6ea68932..8dfa838f 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -959,6 +959,7 @@ def get_ffmpeg_args( input_args += ["-f", input_format.content_type.value, "-i", input_path] # collect output args + output_args = [] if output_path.upper() == "NULL": # devnull stream output_args = ["-f", "null", "-"] @@ -966,8 +967,10 @@ def get_ffmpeg_args( # use wav so we at least have some headers for the rest of the chain output_args = ["-f", "wav", output_path] else: + if output_format.content_type.is_pcm(): + output_args += ["-acodec", output_format.content_type.name.lower()] # use explicit format identifier for all other - output_args = [ + output_args += [ "-f", output_format.content_type.value, "-ar", @@ -976,8 +979,6 @@ def get_ffmpeg_args( str(output_format.channels), output_path, ] - if output_format.content_type.is_pcm(): - output_args += ["-acodec", output_format.content_type.name.lower()] # determine if we need to do resampling if ( diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index 3d1be5c3..466fe4b6 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -109,20 +109,22 @@ class AsyncProcess: """Yield chunks of n size from the process stdout.""" while True: chunk = await self.readexactly(n) - yield chunk if len(chunk) == 0: break + yield chunk async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: """Yield chunks as they come in from process stdout.""" while True: chunk = await self.read(n) - yield chunk if len(chunk) == 0: break + yield chunk async def readexactly(self, n: int) -> bytes: """Read exactly n bytes from the process stdout (or less if eof).""" + if self._close_called: + return b"" try: return await self.proc.stdout.readexactly(n) except asyncio.IncompleteReadError as err: @@ -135,6 +137,8 @@ class AsyncProcess: and may return less or equal bytes than requested, but at least one byte. If EOF was received before any byte is read, this function returns empty byte object. """ + if self._close_called: + return b"" return await self.proc.stdout.read(n) async def write(self, data: bytes) -> None: @@ -165,7 +169,7 @@ class AsyncProcess: async def read_stderr(self) -> bytes: """Read line from stderr.""" - if self.closed: + if self._close_called: return b"" try: return await self.proc.stderr.readline() @@ -199,12 +203,15 @@ class AsyncProcess: if self.proc.stdin and not self.proc.stdin.is_closing(): self.proc.stdin.close() # abort existing readers on stderr/stdout first before we send communicate - if self.proc.stdout and self.proc.stdout._waiter is not None: - with suppress(asyncio.exceptions.InvalidStateError): - self.proc.stdout._waiter.set_exception(asyncio.CancelledError()) - if self.proc.stderr and self.proc.stderr._waiter is not None: - with suppress(asyncio.exceptions.InvalidStateError): - self.proc.stderr._waiter.set_exception(asyncio.CancelledError()) + waiter: asyncio.Future + if self.proc.stdout and (waiter := self.proc.stdout._waiter): + self.proc.stdout._waiter = None + if waiter and not waiter.done(): + waiter.set_exception(asyncio.CancelledError()) + if self.proc.stderr and (waiter := self.proc.stderr._waiter): + self.proc.stderr._waiter = None + if waiter and not waiter.done(): + waiter.set_exception(asyncio.CancelledError()) await asyncio.sleep(0) # yield to loop # make sure the process is really cleaned up. -- 2.34.1