Another (potential) fix for subsonic streaming
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 20 Dec 2025 12:35:50 +0000 (13:35 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 20 Dec 2025 12:35:50 +0000 (13:35 +0100)
music_assistant/providers/opensubsonic/sonic_provider.py

index 24e1d9b1a19aa43b5adfd3557bb462c572d9a795..2fd6fe946aa272c6001cdd93869e62a3942b6b1e 100644 (file)
@@ -777,8 +777,6 @@ class OpenSonicProvider(MusicProvider):
             seek_position = 0
 
         self.logger.debug("Streaming %s", streamdetails.item_id)
-
-        # Use a threading event to signal cancellation to the streamer thread
         cancelled = threading.Event()
 
         def _streamer() -> None:
@@ -790,8 +788,8 @@ class OpenSonicProvider(MusicProvider):
                     estimate_length=True,
                 ) as stream:
                     for chunk in stream.iter_content(chunk_size=40960):
-                        # Use a timeout on result() to periodically check for cancellation
-                        # This prevents indefinite blocking if the consumer stops reading
+                        # Use put_nowait to avoid blocking and potential duplicate chunks
+                        # that can occur when using put() with timeouts
                         while True:
                             if cancelled.is_set():
                                 self.logger.debug(
@@ -799,20 +797,23 @@ class OpenSonicProvider(MusicProvider):
                                 )
                                 return
                             try:
-                                asyncio.run_coroutine_threadsafe(
-                                    audio_buffer.put(chunk), self.mass.loop
-                                ).result(timeout=1.0)
+                                audio_buffer.put_nowait(chunk)
                                 break  # Successfully put chunk, move to next
-                            except TimeoutError:
-                                # Queue is full and consumer isn't reading, check cancellation
-                                continue
+                            except asyncio.QueueFull:
+                                # Queue is full, wait a bit and check for cancellation
+                                cancelled.wait(timeout=0.1)
                 # send empty chunk when we're done
                 if not cancelled.is_set():
-                    try:
-                        asyncio.run_coroutine_threadsafe(
-                            audio_buffer.put(b"EOF"), self.mass.loop
-                        ).result(timeout=5.0)
-                    except TimeoutError:
+                    # For EOF, we can wait a bit longer since it's the final message
+                    for _ in range(50):  # Try for up to 5 seconds
+                        try:
+                            audio_buffer.put_nowait(b"EOF")
+                            break
+                        except asyncio.QueueFull:
+                            if cancelled.is_set():
+                                break
+                            cancelled.wait(timeout=0.1)
+                    else:
                         self.logger.debug(
                             "Timeout sending EOF for item '%s'", streamdetails.item_id
                         )