From: Marcel van der Veldt Date: Fri, 19 Dec 2025 02:31:33 +0000 (+0100) Subject: Fix subsonic streaming logic deadlock X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=9dc9acf9442a96f8c9195f9383e10f91ce1e2514;p=music-assistant-server.git Fix subsonic streaming logic deadlock --- diff --git a/music_assistant/providers/opensubsonic/sonic_provider.py b/music_assistant/providers/opensubsonic/sonic_provider.py index a3287465..fad69dd6 100644 --- a/music_assistant/providers/opensubsonic/sonic_provider.py +++ b/music_assistant/providers/opensubsonic/sonic_provider.py @@ -769,7 +769,7 @@ class OpenSonicProvider(MusicProvider): self, streamdetails: StreamDetails, seek_position: int = 0 ) -> AsyncGenerator[bytes, None]: """Provide a generator for the stream data.""" - audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(1) + audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(10) # ignore seek position if the server does not support it # in that case we let the core handle seeking if not self._seek_support: @@ -777,6 +777,9 @@ class OpenSonicProvider(MusicProvider): self.logger.debug("Streaming %s", streamdetails.item_id) + # Use a threading event to signal cancellation to the streamer thread + cancelled = asyncio.Event() + def _streamer() -> None: self.logger.debug("starting stream of item '%s'", streamdetails.item_id) try: @@ -786,11 +789,32 @@ class OpenSonicProvider(MusicProvider): estimate_length=True, ) as stream: for chunk in stream.iter_content(chunk_size=40960): - asyncio.run_coroutine_threadsafe( - audio_buffer.put(chunk), self.mass.loop - ).result() + # Use a timeout on result() to periodically check for cancellation + # This prevents indefinite blocking if the consumer stops reading + while True: + if cancelled.is_set(): + self.logger.debug( + "Stream cancelled for item '%s'", streamdetails.item_id + ) + return + try: + asyncio.run_coroutine_threadsafe( + audio_buffer.put(chunk), self.mass.loop + ).result(timeout=1.0) + break # Successfully put chunk, move to next + except TimeoutError: + # Queue is full and consumer isn't reading, check cancellation + continue # send empty chunk when we're done - asyncio.run_coroutine_threadsafe(audio_buffer.put(b"EOF"), self.mass.loop).result() + if not cancelled.is_set(): + try: + asyncio.run_coroutine_threadsafe( + audio_buffer.put(b"EOF"), self.mass.loop + ).result(timeout=5.0) + except TimeoutError: + self.logger.debug( + "Timeout sending EOF for item '%s'", streamdetails.item_id + ) except DataNotFoundError as err: msg = f"Item '{streamdetails.item_id}' not found" raise MediaNotFoundError(msg) from err @@ -805,6 +829,8 @@ class OpenSonicProvider(MusicProvider): break yield chunk finally: + # Signal the streamer thread to stop + cancelled.set() if not streamer_task.done(): streamer_task.cancel()