Fix subsonic streaming logic deadlock
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 19 Dec 2025 02:31:33 +0000 (03:31 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 19 Dec 2025 02:31:33 +0000 (03:31 +0100)
music_assistant/providers/opensubsonic/sonic_provider.py

index a32874650eaf81ac64efa0c25e3ea79f5acc1f11..fad69dd6f9862f1d97fa2df17ead4e8bcdf3e52c 100644 (file)
@@ -769,7 +769,7 @@ class OpenSonicProvider(MusicProvider):
         self, streamdetails: StreamDetails, seek_position: int = 0
     ) -> AsyncGenerator[bytes, None]:
         """Provide a generator for the stream data."""
-        audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(1)
+        audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(10)
         # ignore seek position if the server does not support it
         # in that case we let the core handle seeking
         if not self._seek_support:
@@ -777,6 +777,9 @@ class OpenSonicProvider(MusicProvider):
 
         self.logger.debug("Streaming %s", streamdetails.item_id)
 
+        # Use a threading event to signal cancellation to the streamer thread
+        cancelled = asyncio.Event()
+
         def _streamer() -> None:
             self.logger.debug("starting stream of item '%s'", streamdetails.item_id)
             try:
@@ -786,11 +789,32 @@ class OpenSonicProvider(MusicProvider):
                     estimate_length=True,
                 ) as stream:
                     for chunk in stream.iter_content(chunk_size=40960):
-                        asyncio.run_coroutine_threadsafe(
-                            audio_buffer.put(chunk), self.mass.loop
-                        ).result()
+                        # Use a timeout on result() to periodically check for cancellation
+                        # This prevents indefinite blocking if the consumer stops reading
+                        while True:
+                            if cancelled.is_set():
+                                self.logger.debug(
+                                    "Stream cancelled for item '%s'", streamdetails.item_id
+                                )
+                                return
+                            try:
+                                asyncio.run_coroutine_threadsafe(
+                                    audio_buffer.put(chunk), self.mass.loop
+                                ).result(timeout=1.0)
+                                break  # Successfully put chunk, move to next
+                            except TimeoutError:
+                                # Queue is full and consumer isn't reading, check cancellation
+                                continue
                 # send empty chunk when we're done
-                asyncio.run_coroutine_threadsafe(audio_buffer.put(b"EOF"), self.mass.loop).result()
+                if not cancelled.is_set():
+                    try:
+                        asyncio.run_coroutine_threadsafe(
+                            audio_buffer.put(b"EOF"), self.mass.loop
+                        ).result(timeout=5.0)
+                    except TimeoutError:
+                        self.logger.debug(
+                            "Timeout sending EOF for item '%s'", streamdetails.item_id
+                        )
             except DataNotFoundError as err:
                 msg = f"Item '{streamdetails.item_id}' not found"
                 raise MediaNotFoundError(msg) from err
@@ -805,6 +829,8 @@ class OpenSonicProvider(MusicProvider):
                     break
                 yield chunk
         finally:
+            # Signal the streamer thread to stop
+            cancelled.set()
             if not streamer_task.done():
                 streamer_task.cancel()