fix some race conditions
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 11 Apr 2024 13:09:34 +0000 (15:09 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 11 Apr 2024 13:09:34 +0000 (15:09 +0200)
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/process.py
music_assistant/server/providers/spotify/__init__.py

index b8a1d6e9c4a3a59a0f0352f91a667f6c5edfb5dd..becba616780bc67951f609c28eae78c892d21c19 100644 (file)
@@ -49,6 +49,7 @@ from music_assistant.server.helpers.audio import (
     FFMpeg,
     check_audio_support,
     crossfade_pcm_parts,
+    get_chunksize,
     get_ffmpeg_stream,
     get_hls_stream,
     get_icy_stream,
@@ -743,19 +744,17 @@ class StreamsController(CoreController):
             logger=logger,
         ) as ffmpeg_proc:
             try:
-                async for chunk in ffmpeg_proc.iter_any():
+                async for chunk in ffmpeg_proc.iter_any(get_chunksize(output_format)):
                     bytes_sent += len(chunk)
                     yield chunk
                     del chunk
                 finished = True
             finally:
-                await ffmpeg_proc.close()
-                logger.debug(
-                    "stream %s (with code %s) for %s",
-                    "finished" if finished else "aborted",
-                    ffmpeg_proc.returncode,
-                    streamdetails.uri,
-                )
+                if finished:
+                    await ffmpeg_proc.wait()
+                else:
+                    await ffmpeg_proc.close()
+
                 # try to determine how many seconds we've streamed
                 seconds_streamed = 0
                 if output_format.content_type.is_pcm():
@@ -766,6 +765,14 @@ class StreamsController(CoreController):
                     duration_str = line.split("time=")[1].split(" ")[0]
                     seconds_streamed = try_parse_duration(duration_str)
 
+                logger.debug(
+                    "stream %s (with code %s) for %s - seconds streamed: %s",
+                    "finished" if finished else "aborted",
+                    ffmpeg_proc.returncode,
+                    streamdetails.uri,
+                    seconds_streamed,
+                )
+
                 if seconds_streamed:
                     streamdetails.seconds_streamed = seconds_streamed
                 # store accurate duration
index 83cb322681db5a487d283348f98a9c3456d00bcc..3d1be5c3303cb80d81cbde59c9172457796e47c4 100644 (file)
@@ -109,17 +109,17 @@ class AsyncProcess:
         """Yield chunks of n size from the process stdout."""
         while True:
             chunk = await self.readexactly(n)
-            if chunk == b"":
-                break
             yield chunk
+            if len(chunk) == 0:
+                break
 
     async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
         """Yield chunks as they come in from process stdout."""
         while True:
             chunk = await self.read(n)
-            if chunk == b"":
-                break
             yield chunk
+            if len(chunk) == 0:
+                break
 
     async def readexactly(self, n: int) -> bytes:
         """Read exactly n bytes from the process stdout (or less if eof)."""
@@ -139,7 +139,7 @@ class AsyncProcess:
 
     async def write(self, data: bytes) -> None:
         """Write data to process stdin."""
-        if self._close_called:
+        if self.closed:
             self.logger.warning("write called while process already done")
             return
         self.proc.stdin.write(data)
@@ -148,7 +148,7 @@ class AsyncProcess:
 
     async def write_eof(self) -> None:
         """Write end of file to to process stdin."""
-        if self._close_called:
+        if self.closed:
             return
         try:
             if self.proc.stdin.can_write_eof():
@@ -165,6 +165,8 @@ class AsyncProcess:
 
     async def read_stderr(self) -> bytes:
         """Read line from stderr."""
+        if self.closed:
+            return b""
         try:
             return await self.proc.stderr.readline()
         except ValueError as err:
@@ -196,7 +198,6 @@ class AsyncProcess:
             self.proc.send_signal(SIGINT)
         if self.proc.stdin and not self.proc.stdin.is_closing():
             self.proc.stdin.close()
-            await asyncio.sleep(0)  # yield to loop
         # abort existing readers on stderr/stdout first before we send communicate
         if self.proc.stdout and self.proc.stdout._waiter is not None:
             with suppress(asyncio.exceptions.InvalidStateError):
@@ -204,6 +205,7 @@ class AsyncProcess:
         if self.proc.stderr and self.proc.stderr._waiter is not None:
             with suppress(asyncio.exceptions.InvalidStateError):
                 self.proc.stderr._waiter.set_exception(asyncio.CancelledError())
+        await asyncio.sleep(0)  # yield to loop
 
         # make sure the process is really cleaned up.
         # especially with pipes this can cause deadlocks if not properly guarded
@@ -212,6 +214,11 @@ class AsyncProcess:
             try:
                 # use communicate to flush all pipe buffers
                 await asyncio.wait_for(self.proc.communicate(), 5)
+            except RuntimeError as err:
+                if "read() called while another coroutine" in str(err):
+                    # race condition
+                    continue
+                raise
             except TimeoutError:
                 self.logger.debug(
                     "Process %s with PID %s did not stop in time. Sending terminate...",
index 85fff75898eb297f09c298a9d3f668754709b15d..1ba4173e22ac68709d4ac5e2c99646d6b3be67e1 100644 (file)
@@ -429,11 +429,9 @@ class SpotifyProvider(MusicProvider):
             args += ["--start-position", str(int(seek_position))]
         if self._ap_workaround:
             args += ["--ap-port", "12345"]
-        bytes_sent = 0
         async with AsyncProcess(args, stdout=True, name="librespot") as librespot_proc:
             async for chunk in librespot_proc.iter_any():
                 yield chunk
-                bytes_sent += len(chunk)
 
     async def _parse_artist(self, artist_obj):
         """Parse spotify artist object to generic layout."""