From 299df7ce848cb023371fc828030898df19b559d3 Mon Sep 17 00:00:00 2001 From: marcelveldt Date: Sun, 20 Oct 2019 14:56:48 +0200 Subject: [PATCH] Update http_streamer.py will no longer load complete audio stream in memory but only small chunks at once --- music_assistant/http_streamer.py | 67 ++++++++++++-------------------- 1 file changed, 24 insertions(+), 43 deletions(-) diff --git a/music_assistant/http_streamer.py b/music_assistant/http_streamer.py index bd496abd..e4d7623e 100755 --- a/music_assistant/http_streamer.py +++ b/music_assistant/http_streamer.py @@ -52,13 +52,12 @@ class HTTPStreamer(): if http_request.method.upper() != 'GET': return resp # stream audio - buf_queue = asyncio.Queue() cancelled = threading.Event() if player.queue.use_queue_stream: # use queue stream bg_task = run_async_background_task( None, - self.__stream_queue, player, buf_queue, cancelled) + self.__stream_queue, player, resp, cancelled) else: # single track stream queue_item_id = http_request.match_info.get('queue_item_id') @@ -66,24 +65,16 @@ class HTTPStreamer(): assert(queue_item) bg_task = run_async_background_task( None, - self.__stream_single, player, queue_item, buf_queue, cancelled) + self.__stream_single, player, queue_item, resp, cancelled) # let the streaming begin! try: - while True: - chunk = await buf_queue.get() - if chunk: - await resp.write(chunk) - buf_queue.task_done() - del chunk - else: - buf_queue.task_done() - break + while not cancelled.is_set(): + await asyncio.sleep(0.1) except (asyncio.CancelledError, asyncio.TimeoutError): LOGGER.debug("stream interrupted") cancelled.set() # wait for bg_task to finish await asyncio.gather(bg_task) - del buf_queue raise asyncio.CancelledError() return resp @@ -98,12 +89,12 @@ class HTTPStreamer(): continue # put chunk in buffer asyncio.run_coroutine_threadsafe( - buffer.put(audio_chunk), + buffer.write(audio_chunk), self.mass.event_loop) # this should be garbage collected but just in case... del audio_chunk # wait for the queue to consume the data - while not cancelled.is_set() and buffer.qsize() > 5: + if not cancelled.is_set(): await asyncio.sleep(0.5) # all chunks received: streaming finished if cancelled.is_set(): @@ -111,7 +102,7 @@ class HTTPStreamer(): else: # indicate EOF if no more data asyncio.run_coroutine_threadsafe( - buffer.put(b''), + buffer.write_eof(), self.mass.event_loop) LOGGER.debug("stream single track finished for track %s on player %s" % (queue_item.name, player.name)) @@ -124,6 +115,7 @@ class HTTPStreamer(): if fade_length: fade_bytes = int(sample_rate * 4 * 2 * fade_length) else: + fade_length = 1 fade_bytes = int(sample_rate * 4 * 2) pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate args = 'sox -t %s - -t flac -C 0 -' % pcm_args @@ -141,12 +133,12 @@ class HTTPStreamer(): break if chunk and not cancelled.is_set(): asyncio.run_coroutine_threadsafe( - buffer.put(chunk), self.mass.event_loop) + buffer.write(chunk), self.mass.event_loop) del chunk # indicate EOF if no more data if not cancelled.is_set(): asyncio.run_coroutine_threadsafe( - buffer.put(b''), self.mass.event_loop) + buffer.write_eof(), self.mass.event_loop) # start fill buffer task in background fill_buffer_thread = threading.Thread(target=fill_buffer) fill_buffer_thread.start() @@ -234,7 +226,7 @@ class HTTPStreamer(): last_part = prev_chunk + chunk if len(last_part) < fade_bytes: # still not enough data so we'll skip the crossfading - LOGGER.debug("not enough data for fadeout so skip crossfade... %s" % len(last_part)) + LOGGER.info("not enough data for fadeout so skip crossfade... %s" % len(last_part)) sox_proc.stdin.write(last_part) bytes_written += len(last_part) del last_part @@ -259,6 +251,9 @@ class HTTPStreamer(): else: prev_chunk = chunk del chunk + ### throttle to prevent entire track sitting in memory + if not cancelled.is_set(): + await asyncio.sleep(fade_length) # end of the track reached if cancelled.is_set(): # break out the loop if the http session is cancelled @@ -269,9 +264,6 @@ class HTTPStreamer(): queue_track.duration = accurate_duration LOGGER.debug("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) LOGGER.debug("bytes written: %s - duration: %s" % (bytes_written, accurate_duration)) - # wait for the queue to consume the data - while not cancelled.is_set() and buffer.qsize() > 4: - await asyncio.sleep(0.5) # end of queue reached, pass last fadeout bits to final output if last_fadeout_data and not cancelled.is_set(): sox_proc.stdin.write(last_fadeout_data) @@ -333,33 +325,22 @@ class HTTPStreamer(): bytes_sent = 0 buf = b'' while True: - # read exactly buffersize of data if cancelled.is_set(): # http session ended # send terminate and pick up left over bytes process.terminate() - # try to read as much data as possible from stdin - data = process.stdout.read(chunksize) - if not data: - # no more data - # yield leftover data in buffer as last chunk - yield (True, buf) - bytes_sent += len(buf) - del buf - break - elif len(buf) + len(data) >= chunksize: - # enough data to send a chunk - new_data = buf + data - chunk = new_data[:chunksize] - yield (False, chunk) + # read exactly chunksize of data + chunk = process.stdout.read(chunksize) + if len(chunk) < chunksize: + # last chunk + LOGGER.info("last chunk") bytes_sent += len(chunk) - buf = new_data[chunksize:] - del chunk - del data - del new_data + yield (True, chunk) + break else: - buf += data - del data + bytes_sent += len(chunk) + yield (False, chunk) + # fire event that streaming has ended asyncio.run_coroutine_threadsafe( self.mass.signal_event(EVENT_STREAM_ENDED, queue_item), self.mass.event_loop) -- 2.34.1