if http_request.method.upper() != 'GET':
return resp
# stream audio
- buf_queue = asyncio.Queue()
cancelled = threading.Event()
if player.queue.use_queue_stream:
# use queue stream
bg_task = run_async_background_task(
None,
- self.__stream_queue, player, buf_queue, cancelled)
+ self.__stream_queue, player, resp, cancelled)
else:
# single track stream
queue_item_id = http_request.match_info.get('queue_item_id')
assert(queue_item)
bg_task = run_async_background_task(
None,
- self.__stream_single, player, queue_item, buf_queue, cancelled)
+ self.__stream_single, player, queue_item, resp, cancelled)
# let the streaming begin!
try:
- while True:
- chunk = await buf_queue.get()
- if chunk:
- await resp.write(chunk)
- buf_queue.task_done()
- del chunk
- else:
- buf_queue.task_done()
- break
+ while not cancelled.is_set():
+ await asyncio.sleep(0.1)
except (asyncio.CancelledError, asyncio.TimeoutError):
LOGGER.debug("stream interrupted")
cancelled.set()
# wait for bg_task to finish
await asyncio.gather(bg_task)
- del buf_queue
raise asyncio.CancelledError()
return resp
continue
# put chunk in buffer
asyncio.run_coroutine_threadsafe(
- buffer.put(audio_chunk),
+ buffer.write(audio_chunk),
self.mass.event_loop)
# this should be garbage collected but just in case...
del audio_chunk
# wait for the queue to consume the data
- while not cancelled.is_set() and buffer.qsize() > 5:
+ if not cancelled.is_set():
await asyncio.sleep(0.5)
# all chunks received: streaming finished
if cancelled.is_set():
else:
# indicate EOF if no more data
asyncio.run_coroutine_threadsafe(
- buffer.put(b''),
+ buffer.write_eof(),
self.mass.event_loop)
LOGGER.debug("stream single track finished for track %s on player %s" % (queue_item.name, player.name))
if fade_length:
fade_bytes = int(sample_rate * 4 * 2 * fade_length)
else:
+ fade_length = 1
fade_bytes = int(sample_rate * 4 * 2)
pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
args = 'sox -t %s - -t flac -C 0 -' % pcm_args
break
if chunk and not cancelled.is_set():
asyncio.run_coroutine_threadsafe(
- buffer.put(chunk), self.mass.event_loop)
+ buffer.write(chunk), self.mass.event_loop)
del chunk
# indicate EOF if no more data
if not cancelled.is_set():
asyncio.run_coroutine_threadsafe(
- buffer.put(b''), self.mass.event_loop)
+ buffer.write_eof(), self.mass.event_loop)
# start fill buffer task in background
fill_buffer_thread = threading.Thread(target=fill_buffer)
fill_buffer_thread.start()
last_part = prev_chunk + chunk
if len(last_part) < fade_bytes:
# still not enough data so we'll skip the crossfading
- LOGGER.debug("not enough data for fadeout so skip crossfade... %s" % len(last_part))
+ LOGGER.info("not enough data for fadeout so skip crossfade... %s" % len(last_part))
sox_proc.stdin.write(last_part)
bytes_written += len(last_part)
del last_part
else:
prev_chunk = chunk
del chunk
+ ### throttle to prevent entire track sitting in memory
+ if not cancelled.is_set():
+ await asyncio.sleep(fade_length)
# end of the track reached
if cancelled.is_set():
# break out the loop if the http session is cancelled
queue_track.duration = accurate_duration
LOGGER.debug("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
LOGGER.debug("bytes written: %s - duration: %s" % (bytes_written, accurate_duration))
- # wait for the queue to consume the data
- while not cancelled.is_set() and buffer.qsize() > 4:
- await asyncio.sleep(0.5)
# end of queue reached, pass last fadeout bits to final output
if last_fadeout_data and not cancelled.is_set():
sox_proc.stdin.write(last_fadeout_data)
bytes_sent = 0
buf = b''
while True:
- # read exactly buffersize of data
if cancelled.is_set():
# http session ended
# send terminate and pick up left over bytes
process.terminate()
- # try to read as much data as possible from stdin
- data = process.stdout.read(chunksize)
- if not data:
- # no more data
- # yield leftover data in buffer as last chunk
- yield (True, buf)
- bytes_sent += len(buf)
- del buf
- break
- elif len(buf) + len(data) >= chunksize:
- # enough data to send a chunk
- new_data = buf + data
- chunk = new_data[:chunksize]
- yield (False, chunk)
+ # read exactly chunksize of data
+ chunk = process.stdout.read(chunksize)
+ if len(chunk) < chunksize:
+ # last chunk
+ LOGGER.info("last chunk")
bytes_sent += len(chunk)
- buf = new_data[chunksize:]
- del chunk
- del data
- del new_data
+ yield (True, chunk)
+ break
else:
- buf += data
- del data
+ bytes_sent += len(chunk)
+ yield (False, chunk)
+
# fire event that streaming has ended
asyncio.run_coroutine_threadsafe(
self.mass.signal_event(EVENT_STREAM_ENDED, queue_item), self.mass.event_loop)