From 771dc11ffd3b1b8acd158c0360d66853684ecd27 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Thu, 11 Apr 2024 15:09:34 +0200 Subject: [PATCH] fix some race conditions --- music_assistant/server/controllers/streams.py | 23 ++++++++++++------- music_assistant/server/helpers/process.py | 21 +++++++++++------ .../server/providers/spotify/__init__.py | 2 -- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index b8a1d6e9..becba616 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -49,6 +49,7 @@ from music_assistant.server.helpers.audio import ( FFMpeg, check_audio_support, crossfade_pcm_parts, + get_chunksize, get_ffmpeg_stream, get_hls_stream, get_icy_stream, @@ -743,19 +744,17 @@ class StreamsController(CoreController): logger=logger, ) as ffmpeg_proc: try: - async for chunk in ffmpeg_proc.iter_any(): + async for chunk in ffmpeg_proc.iter_any(get_chunksize(output_format)): bytes_sent += len(chunk) yield chunk del chunk finished = True finally: - await ffmpeg_proc.close() - logger.debug( - "stream %s (with code %s) for %s", - "finished" if finished else "aborted", - ffmpeg_proc.returncode, - streamdetails.uri, - ) + if finished: + await ffmpeg_proc.wait() + else: + await ffmpeg_proc.close() + # try to determine how many seconds we've streamed seconds_streamed = 0 if output_format.content_type.is_pcm(): @@ -766,6 +765,14 @@ class StreamsController(CoreController): duration_str = line.split("time=")[1].split(" ")[0] seconds_streamed = try_parse_duration(duration_str) + logger.debug( + "stream %s (with code %s) for %s - seconds streamed: %s", + "finished" if finished else "aborted", + ffmpeg_proc.returncode, + streamdetails.uri, + seconds_streamed, + ) + if seconds_streamed: streamdetails.seconds_streamed = seconds_streamed # store accurate duration diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index 83cb3226..3d1be5c3 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -109,17 +109,17 @@ class AsyncProcess: """Yield chunks of n size from the process stdout.""" while True: chunk = await self.readexactly(n) - if chunk == b"": - break yield chunk + if len(chunk) == 0: + break async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: """Yield chunks as they come in from process stdout.""" while True: chunk = await self.read(n) - if chunk == b"": - break yield chunk + if len(chunk) == 0: + break async def readexactly(self, n: int) -> bytes: """Read exactly n bytes from the process stdout (or less if eof).""" @@ -139,7 +139,7 @@ class AsyncProcess: async def write(self, data: bytes) -> None: """Write data to process stdin.""" - if self._close_called: + if self.closed: self.logger.warning("write called while process already done") return self.proc.stdin.write(data) @@ -148,7 +148,7 @@ class AsyncProcess: async def write_eof(self) -> None: """Write end of file to to process stdin.""" - if self._close_called: + if self.closed: return try: if self.proc.stdin.can_write_eof(): @@ -165,6 +165,8 @@ class AsyncProcess: async def read_stderr(self) -> bytes: """Read line from stderr.""" + if self.closed: + return b"" try: return await self.proc.stderr.readline() except ValueError as err: @@ -196,7 +198,6 @@ class AsyncProcess: self.proc.send_signal(SIGINT) if self.proc.stdin and not self.proc.stdin.is_closing(): self.proc.stdin.close() - await asyncio.sleep(0) # yield to loop # abort existing readers on stderr/stdout first before we send communicate if self.proc.stdout and self.proc.stdout._waiter is not None: with suppress(asyncio.exceptions.InvalidStateError): @@ -204,6 +205,7 @@ class AsyncProcess: if self.proc.stderr and self.proc.stderr._waiter is not None: with suppress(asyncio.exceptions.InvalidStateError): self.proc.stderr._waiter.set_exception(asyncio.CancelledError()) + await asyncio.sleep(0) # yield to loop # make sure the process is really cleaned up. # especially with pipes this can cause deadlocks if not properly guarded @@ -212,6 +214,11 @@ class AsyncProcess: try: # use communicate to flush all pipe buffers await asyncio.wait_for(self.proc.communicate(), 5) + except RuntimeError as err: + if "read() called while another coroutine" in str(err): + # race condition + continue + raise except TimeoutError: self.logger.debug( "Process %s with PID %s did not stop in time. Sending terminate...", diff --git a/music_assistant/server/providers/spotify/__init__.py b/music_assistant/server/providers/spotify/__init__.py index 85fff758..1ba4173e 100644 --- a/music_assistant/server/providers/spotify/__init__.py +++ b/music_assistant/server/providers/spotify/__init__.py @@ -429,11 +429,9 @@ class SpotifyProvider(MusicProvider): args += ["--start-position", str(int(seek_position))] if self._ap_workaround: args += ["--ap-port", "12345"] - bytes_sent = 0 async with AsyncProcess(args, stdout=True, name="librespot") as librespot_proc: async for chunk in librespot_proc.iter_any(): yield chunk - bytes_sent += len(chunk) async def _parse_artist(self, artist_obj): """Parse spotify artist object to generic layout.""" -- 2.34.1