self, streamdetails: StreamDetails, seek_position: int = 0
) -> AsyncGenerator[bytes, None]:
"""Provide a generator for the stream data."""
- audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(1)
+ audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(10)
# ignore seek position if the server does not support it
# in that case we let the core handle seeking
if not self._seek_support:
self.logger.debug("Streaming %s", streamdetails.item_id)
+ # Use a threading event to signal cancellation to the streamer thread
+ cancelled = asyncio.Event()
+
def _streamer() -> None:
self.logger.debug("starting stream of item '%s'", streamdetails.item_id)
try:
estimate_length=True,
) as stream:
for chunk in stream.iter_content(chunk_size=40960):
- asyncio.run_coroutine_threadsafe(
- audio_buffer.put(chunk), self.mass.loop
- ).result()
+ # Use a timeout on result() to periodically check for cancellation
+ # This prevents indefinite blocking if the consumer stops reading
+ while True:
+ if cancelled.is_set():
+ self.logger.debug(
+ "Stream cancelled for item '%s'", streamdetails.item_id
+ )
+ return
+ try:
+ asyncio.run_coroutine_threadsafe(
+ audio_buffer.put(chunk), self.mass.loop
+ ).result(timeout=1.0)
+ break # Successfully put chunk, move to next
+ except TimeoutError:
+ # Queue is full and consumer isn't reading, check cancellation
+ continue
# send empty chunk when we're done
- asyncio.run_coroutine_threadsafe(audio_buffer.put(b"EOF"), self.mass.loop).result()
+ if not cancelled.is_set():
+ try:
+ asyncio.run_coroutine_threadsafe(
+ audio_buffer.put(b"EOF"), self.mass.loop
+ ).result(timeout=5.0)
+ except TimeoutError:
+ self.logger.debug(
+ "Timeout sending EOF for item '%s'", streamdetails.item_id
+ )
except DataNotFoundError as err:
msg = f"Item '{streamdetails.item_id}' not found"
raise MediaNotFoundError(msg) from err
break
yield chunk
finally:
+ # Signal the streamer thread to stop
+ cancelled.set()
if not streamer_task.done():
streamer_task.cancel()