Update http_streamer.py
authormarcelveldt <marcelvanderveldt@MacBook-Silvia.local>
Sun, 20 Oct 2019 12:56:48 +0000 (14:56 +0200)
committermarcelveldt <marcelvanderveldt@MacBook-Silvia.local>
Sun, 20 Oct 2019 12:56:48 +0000 (14:56 +0200)
will no longer load complete audio stream in memory but only small chunks at once

music_assistant/http_streamer.py

index bd496abd438462be7f129a096768695e9237d93e..e4d7623e990b04b3f3d7e3790fdf61361aba9581 100755 (executable)
@@ -52,13 +52,12 @@ class HTTPStreamer():
         if http_request.method.upper() != 'GET':
             return resp
         # stream audio
-        buf_queue = asyncio.Queue()
         cancelled = threading.Event()
         if player.queue.use_queue_stream:
             # use queue stream
             bg_task = run_async_background_task(
                 None, 
-                self.__stream_queue, player, buf_queue, cancelled)
+                self.__stream_queue, player, resp, cancelled)
         else:
             # single track stream
             queue_item_id = http_request.match_info.get('queue_item_id')
@@ -66,24 +65,16 @@ class HTTPStreamer():
             assert(queue_item)
             bg_task = run_async_background_task(
                 None, 
-                self.__stream_single, player, queue_item, buf_queue, cancelled)
+                self.__stream_single, player, queue_item, resp, cancelled)
         # let the streaming begin!
         try:
-            while True:
-                chunk = await buf_queue.get()
-                if chunk:
-                    await resp.write(chunk)
-                    buf_queue.task_done()
-                    del chunk
-                else:
-                    buf_queue.task_done()
-                    break
+            while not cancelled.is_set():
+                await asyncio.sleep(0.1)
         except (asyncio.CancelledError, asyncio.TimeoutError):
             LOGGER.debug("stream interrupted")
             cancelled.set()
             # wait for bg_task to finish
             await asyncio.gather(bg_task)
-            del buf_queue
             raise asyncio.CancelledError()
         return resp
     
@@ -98,12 +89,12 @@ class HTTPStreamer():
                 continue
             # put chunk in buffer
             asyncio.run_coroutine_threadsafe(
-                    buffer.put(audio_chunk), 
+                    buffer.write(audio_chunk), 
                     self.mass.event_loop)
             # this should be garbage collected but just in case...
             del audio_chunk
             # wait for the queue to consume the data
-            while not cancelled.is_set() and buffer.qsize() > 5:
+            if not cancelled.is_set():
                 await asyncio.sleep(0.5)
         # all chunks received: streaming finished
         if cancelled.is_set():
@@ -111,7 +102,7 @@ class HTTPStreamer():
         else:
             # indicate EOF if no more data
             asyncio.run_coroutine_threadsafe(
-                    buffer.put(b''), 
+                    buffer.write_eof(), 
                     self.mass.event_loop)
             LOGGER.debug("stream single track finished for track %s on player %s" % (queue_item.name, player.name))
 
@@ -124,6 +115,7 @@ class HTTPStreamer():
         if fade_length:
             fade_bytes = int(sample_rate * 4 * 2 * fade_length)
         else:
+            fade_length = 1
             fade_bytes = int(sample_rate * 4 * 2)
         pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
         args = 'sox -t %s - -t flac -C 0 -' % pcm_args
@@ -141,12 +133,12 @@ class HTTPStreamer():
                     break
                 if chunk and not cancelled.is_set():
                     asyncio.run_coroutine_threadsafe(
-                        buffer.put(chunk), self.mass.event_loop)
+                        buffer.write(chunk), self.mass.event_loop)
                 del chunk
             # indicate EOF if no more data
             if not cancelled.is_set():
                 asyncio.run_coroutine_threadsafe(
-                        buffer.put(b''),  self.mass.event_loop)
+                        buffer.write_eof(),  self.mass.event_loop)
         # start fill buffer task in background
         fill_buffer_thread = threading.Thread(target=fill_buffer)
         fill_buffer_thread.start()
@@ -234,7 +226,7 @@ class HTTPStreamer():
                             last_part = prev_chunk + chunk
                         if len(last_part) < fade_bytes:
                             # still not enough data so we'll skip the crossfading
-                            LOGGER.debug("not enough data for fadeout so skip crossfade... %s" % len(last_part))
+                            LOGGER.info("not enough data for fadeout so skip crossfade... %s" % len(last_part))
                             sox_proc.stdin.write(last_part)
                             bytes_written += len(last_part)
                             del last_part
@@ -259,6 +251,9 @@ class HTTPStreamer():
                     else:
                         prev_chunk = chunk
                     del chunk
+                    ### throttle to prevent entire track sitting in memory
+                    if not cancelled.is_set():
+                        await asyncio.sleep(fade_length)
             # end of the track reached
             if cancelled.is_set():
                 # break out the loop if the http session is cancelled
@@ -269,9 +264,6 @@ class HTTPStreamer():
                 queue_track.duration = accurate_duration
                 LOGGER.debug("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
                 LOGGER.debug("bytes written: %s - duration: %s" % (bytes_written, accurate_duration))
-            # wait for the queue to consume the data
-            while not cancelled.is_set() and buffer.qsize() > 4:
-                await asyncio.sleep(0.5)
         # end of queue reached, pass last fadeout bits to final output
         if last_fadeout_data and not cancelled.is_set():
             sox_proc.stdin.write(last_fadeout_data)
@@ -333,33 +325,22 @@ class HTTPStreamer():
         bytes_sent = 0
         buf = b''
         while True:
-            # read exactly buffersize of data
             if cancelled.is_set():
                 # http session ended
                 # send terminate and pick up left over bytes
                 process.terminate()
-            # try to read as much data as possible from stdin
-            data = process.stdout.read(chunksize)
-            if not data:
-                # no more data
-                # yield leftover data in buffer as last chunk
-                yield (True, buf)
-                bytes_sent += len(buf)
-                del buf
-                break
-            elif len(buf) + len(data) >= chunksize:
-                # enough data to send a chunk
-                new_data = buf + data
-                chunk = new_data[:chunksize]
-                yield (False, chunk)
+            # read exactly chunksize of data
+            chunk = process.stdout.read(chunksize)
+            if len(chunk) < chunksize:
+                # last chunk
+                LOGGER.info("last chunk")
                 bytes_sent += len(chunk)
-                buf = new_data[chunksize:]
-                del chunk
-                del data
-                del new_data
+                yield (True, chunk)
+                break
             else:
-                buf += data
-                del data
+                bytes_sent += len(chunk)
+                yield (False, chunk)
+
         # fire event that streaming has ended
         asyncio.run_coroutine_threadsafe(
                 self.mass.signal_event(EVENT_STREAM_ENDED, queue_item), self.mass.event_loop)