seek_position = 0
self.logger.debug("Streaming %s", streamdetails.item_id)
-
- # Use a threading event to signal cancellation to the streamer thread
cancelled = threading.Event()
def _streamer() -> None:
estimate_length=True,
) as stream:
for chunk in stream.iter_content(chunk_size=40960):
- # Use a timeout on result() to periodically check for cancellation
- # This prevents indefinite blocking if the consumer stops reading
+ # Use put_nowait to avoid blocking and potential duplicate chunks
+ # that can occur when using put() with timeouts
while True:
if cancelled.is_set():
self.logger.debug(
)
return
try:
- asyncio.run_coroutine_threadsafe(
- audio_buffer.put(chunk), self.mass.loop
- ).result(timeout=1.0)
+ audio_buffer.put_nowait(chunk)
break # Successfully put chunk, move to next
- except TimeoutError:
- # Queue is full and consumer isn't reading, check cancellation
- continue
+ except asyncio.QueueFull:
+ # Queue is full, wait a bit and check for cancellation
+ cancelled.wait(timeout=0.1)
# send empty chunk when we're done
if not cancelled.is_set():
- try:
- asyncio.run_coroutine_threadsafe(
- audio_buffer.put(b"EOF"), self.mass.loop
- ).result(timeout=5.0)
- except TimeoutError:
+ # For EOF, we can wait a bit longer since it's the final message
+ for _ in range(50): # Try for up to 5 seconds
+ try:
+ audio_buffer.put_nowait(b"EOF")
+ break
+ except asyncio.QueueFull:
+ if cancelled.is_set():
+ break
+ cancelled.wait(timeout=0.1)
+ else:
self.logger.debug(
"Timeout sending EOF for item '%s'", streamdetails.item_id
)