fix queue stream
authormarcelveldt <marcelvanderveldt@MacBook-Silvia.local>
Mon, 14 Oct 2019 20:36:00 +0000 (22:36 +0200)
committermarcelveldt <marcelvanderveldt@MacBook-Silvia.local>
Mon, 14 Oct 2019 20:36:00 +0000 (22:36 +0200)
work-around python bug with asyncio.subprocess

music_assistant/http_streamer.py
music_assistant/player_manager.py
music_assistant/web.py

index 20aebfa4a3f32d9790d8bd69cfea8cfc20090571..190e913a1a159a17ace1832b0cdc593392e8b170 100755 (executable)
@@ -12,6 +12,7 @@ import soundfile
 import pyloudnorm
 import io
 import aiohttp
+import subprocess
 from .utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size
 from .models.media_types import TrackQuality, MediaType
 from .models.playerstate import PlayerState
@@ -35,7 +36,7 @@ class HTTPStreamer():
         if not player:
             LOGGER.error("Received stream request for non-existing player %s" %(player_id))
             return
-        queue_item_id = http_request.query.get('queue_item_id')
+        queue_item_id = http_request.match_info.get('queue_item_id')
         queue_item = await player.queue.by_item_id(queue_item_id)
         # prepare headers as audio/flac content
         resp = web.StreamResponse(status=200, reason='OK', headers={'Content-Type': 'audio/flac'})
@@ -46,13 +47,12 @@ class HTTPStreamer():
             buf_queue = asyncio.Queue()
             cancelled = threading.Event()
             if queue_item:
-                # single stream requested
-                #asyncio.create_task(self.__stream_single(player, queue_item, buf_queue, cancelled))
+                # single stream requested, run stream in executor
                 run_async_background_task(
                     self.mass.bg_executor, 
                     self.__stream_single, player, queue_item, buf_queue, cancelled)
             else:
-                # no item is given, start queue stream
+                # no item is given, start queue stream, run stream in executor
                 run_async_background_task(
                     self.mass.bg_executor, 
                     self.__stream_queue, player, buf_queue, cancelled)
@@ -64,16 +64,14 @@ class HTTPStreamer():
                         break
                     await resp.write(chunk)
                     buf_queue.task_done()
-                LOGGER.info("stream fininished for player %s" % player.name)
             except asyncio.CancelledError:
                 cancelled.set()
-                LOGGER.warning("stream interrupted for player %s" % player.name)
                 raise asyncio.CancelledError()
         return resp
-        
+    
     async def __stream_single(self, player, queue_item, buffer, cancelled):
         ''' start streaming single track from provider '''
-        LOGGER.info("stream single track started for track %s on player %s" % (queue_item.name, player.name))
+        LOGGER.debug("stream single track started for track %s on player %s" % (queue_item.name, player.name))
         try:
             audio_stream = self.__get_audio_stream(player, queue_item, cancelled)
             async for is_last_chunk, audio_chunk in audio_stream:
@@ -86,10 +84,10 @@ class HTTPStreamer():
                     self.mass.event_loop)
         except (asyncio.CancelledError, asyncio.TimeoutError):
             cancelled.set()
-            LOGGER.info("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name))
+            LOGGER.debug("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name))
             raise asyncio.CancelledError()
         else:
-            LOGGER.info("stream single track finished for track %s on player %s" % (queue_item.name, player.name))
+            LOGGER.debug("stream single track finished for track %s on player %s" % (queue_item.name, player.name))
 
     async def __stream_queue(self, player, buffer, cancelled):
         ''' start streaming all queue tracks '''
@@ -101,20 +99,20 @@ class HTTPStreamer():
             fade_bytes = int(sample_rate * 4 * 2 * fade_length)
         else:
             fade_bytes = int(sample_rate * 4 * 2)
-        print("sample rate: %s" % sample_rate)
-        print("fade_bytes: %s" % fade_bytes)
         pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
         args = 'sox -t %s - -t flac -C 0 -' % pcm_args
-        print(args)
-        sox_proc = await asyncio.create_subprocess_shell(args, 
-                stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
+        # start sox process
+        # we use normal subprocess instead of asyncio because of bug with executor
+        # this should be fixed with python 3.8
+        sox_proc = subprocess.Popen(args, shell=True, 
+            stdout=subprocess.PIPE, stdin=subprocess.PIPE)
 
-        async def fill_buffer():
-            while not sox_proc.stdout.at_eof() and not sox_proc.returncode:
-                try:
-                    chunk = await sox_proc.stdout.readexactly(256000)
-                except asyncio.streams.IncompleteReadError as err:
-                    chunk = err.partial
+        def fill_buffer():
+            while True:
+                chunk = sox_proc.stdout.read(256000)
+                if not chunk:
+                    # no more data
+                    break
                 asyncio.run_coroutine_threadsafe(
                     buffer.put(chunk), 
                     self.mass.event_loop)
@@ -122,7 +120,8 @@ class HTTPStreamer():
             asyncio.run_coroutine_threadsafe(
                     buffer.put(b''), 
                     self.mass.event_loop)
-        asyncio.create_task(fill_buffer())
+        threading.Thread(target=fill_buffer).start()
+        
 
         LOGGER.info("Start Queue Stream for player %s " %(player.name))
         is_start = True
@@ -154,7 +153,6 @@ class HTTPStreamer():
                 if cur_chunk <= 2 and not last_fadeout_data:
                     # no fadeout_part available so just pass it to the output directly
                     sox_proc.stdin.write(chunk)
-                    await sox_proc.stdin.drain()
                     bytes_written += len(chunk)
                 elif cur_chunk == 1 and last_fadeout_data:
                     prev_chunk = chunk
@@ -162,23 +160,23 @@ class HTTPStreamer():
                 elif cur_chunk == 2 and last_fadeout_data:
                     # combine the first 2 chunks and strip off silence
                     args = 'sox --ignore-length -t %s - -t %s - silence 1 0.1 1%%' % (pcm_args, pcm_args)
-                    process = await asyncio.create_subprocess_shell(args,
-                            stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
-                    first_part, stderr = await process.communicate(prev_chunk + chunk)
+                    first_part, std_err = subprocess.Popen(args, shell=True,
+                            stdout=subprocess.PIPE, 
+                            stdin=subprocess.PIPE).communicate(prev_chunk + chunk)
                     fade_in_part = first_part[:fade_bytes]
                     remaining_bytes = first_part[fade_bytes:]
                     del first_part
                     # do crossfade
-                    crossfade_part = await self.__crossfade_pcm_parts(fade_in_part, last_fadeout_data, pcm_args, fade_length) 
+                    crossfade_part = asyncio.run_coroutine_threadsafe(
+                        self.__crossfade_pcm_parts(fade_in_part, 
+                            last_fadeout_data, pcm_args, fade_length), self.mass.event_loop).result() 
                     sox_proc.stdin.write(crossfade_part)
-                    await sox_proc.stdin.drain()
                     bytes_written += len(crossfade_part)
                     del crossfade_part
                     del fade_in_part
                     last_fadeout_data = b''
                     # also write the leftover bytes from the strip action
                     sox_proc.stdin.write(remaining_bytes)
-                    await sox_proc.stdin.drain()
                     bytes_written += len(remaining_bytes)
                     del remaining_bytes
                     prev_chunk = None # needed to prevent this chunk being sent again
@@ -187,14 +185,13 @@ class HTTPStreamer():
                     # last chunk received so create the fadeout_part with the previous chunk and this chunk
                     # and strip off silence
                     args = 'sox --ignore-length -t %s - -t %s - reverse silence 1 0.1 1%% reverse' % (pcm_args, pcm_args)
-                    process = await asyncio.create_subprocess_shell(args,
-                            stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
-                    last_part, stderr = await process.communicate(prev_chunk + chunk)
+                    last_part, stderr = subprocess.Popen(args, shell=True,
+                            stdout=subprocess.PIPE, 
+                            stdin=subprocess.PIPE).communicate(prev_chunk + chunk)
                     if not player.queue.crossfade_enabled:
                         # crossfading is not enabled so just pass the (stripped) audio data
                         sox_proc.stdin.write(last_part)
                         bytes_written += len(last_part)
-                        await sox_proc.stdin.drain()
                         del last_part
                     else:
                         # handle crossfading support
@@ -206,7 +203,6 @@ class HTTPStreamer():
                             LOGGER.debug("not enough data for fadeout so skip crossfade... %s" % len(last_part))
                             sox_proc.stdin.write(last_part)
                             bytes_written += len(last_part)
-                            await sox_proc.stdin.drain()
                             del last_part
                         else:
                             # store fade section to be picked up for next track
@@ -215,7 +211,6 @@ class HTTPStreamer():
                             # write remaining bytes
                             sox_proc.stdin.write(remaining_bytes)
                             bytes_written += len(remaining_bytes)
-                            await sox_proc.stdin.drain()
                             del last_part
                             del remaining_bytes
                 ### MIDDLE PARTS OF TRACK
@@ -224,20 +219,13 @@ class HTTPStreamer():
                     # keep previous chunk in memory so we have enough samples to perform the crossfade
                     if prev_chunk:
                         sox_proc.stdin.write(prev_chunk)
-                        await sox_proc.stdin.drain()
                         bytes_written += len(prev_chunk)
                         prev_chunk = chunk
                     else:
                         prev_chunk = chunk
-                # wait for the queue to consume the data
-                # this prevents that the entire track is sitting in memory
-                # and it helps a bit in the quest to follow where we are in the queue
-                while buffer.qsize() > 2 and not cancelled.is_set():
-                    await asyncio.sleep(1)
             # end of the track reached
             if cancelled.is_set():
                 # break out the loop if the http session is cancelled
-                LOGGER.debug("session cancelled")
                 break
             else:
                 # WIP: update actual duration to the queue for more accurate now playing info
@@ -245,13 +233,14 @@ class HTTPStreamer():
                 queue_track.duration = accurate_duration
                 LOGGER.debug("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
                 LOGGER.debug("bytes written: %s - duration: %s" % (bytes_written, accurate_duration))
+            # wait for the queue to consume the data
+            while buffer.qsize() > 2 and not cancelled.is_set():
+                await asyncio.sleep(1)
         # end of queue reached, pass last fadeout bits to final output
         if last_fadeout_data and not cancelled.is_set():
             sox_proc.stdin.write(last_fadeout_data)
-            await sox_proc.stdin.drain()
         sox_proc.stdin.close()
         sox_proc.terminate()
-        await sox_proc.wait()
         LOGGER.info("streaming of queue for player %s completed" % player.name)
 
     async def __get_audio_stream(self, player, queue_item, cancelled,
@@ -291,29 +280,28 @@ class HTTPStreamer():
             args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"], 
                     streamdetails["content_type"], outputfmt, sox_effects)
         # start sox process
-        process = await asyncio.create_subprocess_shell(args,
-                stdout=asyncio.subprocess.PIPE)
+        # we use normal subprocess instead of asyncio because of bug with executor
+        # this should be fixed with python 3.8
+        process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE)
         
         # fire event that streaming has started for this track (needed by some streaming providers)
         streamdetails["provider"] = queue_item.provider
         streamdetails["track_id"] = queue_item.item_id
         streamdetails["player_id"] = player.player_id
         asyncio.run_coroutine_threadsafe(
-                self.mass.signal_event('streaming_started', streamdetails), self.mass.event_loop)
+                self.mass.signal_event('streaming_started', 
+                streamdetails), self.mass.event_loop)
         # yield chunks from stdout
         # we keep 1 chunk behind to detect end of stream properly
         prev_chunk = b''
         bytes_sent = 0
-        while not process.stdout.at_eof() and not process.returncode:
+        while True:
             if cancelled.is_set():
-                try:
-                    process.terminate()
-                except ProcessLookupError:
-                    pass
-            try:
-                chunk = await process.stdout.readexactly(chunksize)
-            except asyncio.streams.IncompleteReadError as err:
-                chunk = err.partial
+                process.terminate()
+            chunk = process.stdout.read(chunksize)
+            if not chunk:
+                # no more data
+                break
             if prev_chunk:
                 yield (False, prev_chunk)
                 bytes_sent += len(prev_chunk)
@@ -321,11 +309,6 @@ class HTTPStreamer():
         # yield last chunk
         yield (True, prev_chunk)
         bytes_sent += len(prev_chunk)
-        await process.wait()
-        if cancelled.is_set():
-            LOGGER.info("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent))
-        else:
-            LOGGER.info("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent))
         # fire event that streaming has ended for this track (needed by some streaming providers)
         if resample:
             bytes_per_second = resample * (32/8) * 2
@@ -335,12 +318,10 @@ class HTTPStreamer():
             seconds_streamed = queue_item.duration
         streamdetails["seconds"] = seconds_streamed
         asyncio.run_coroutine_threadsafe(
-                self.mass.signal_event('streaming_ended', streamdetails), 
-                self.mass.event_loop)
+                self.mass.signal_event('streaming_ended', streamdetails), self.mass.event_loop)
         # send task to background to analyse the audio
         asyncio.run_coroutine_threadsafe(
-            self.__analyze_audio(queue_item), 
-            self.mass.event_loop)
+            self.__analyze_audio(queue_item), self.mass.event_loop)
 
     async def __get_player_sox_options(self, player, queue_item):
         ''' get player specific sox effect options '''
index 357d7bec39ef57b613513ddf678b4fef894667ea..c6ff40c554a65445b4928ed1d155cdb5d2dbcacf 100755 (executable)
@@ -93,7 +93,7 @@ class PlayerManager():
             for track in tracks:
                 queue_item = QueueItem(track)
                 # generate uri for this queue item
-                queue_item.uri = 'http://%s:%s/stream/%s?queue_item_id=%s'% (
+                queue_item.uri = 'http://%s:%s/stream/%s/%s'% (
                         self.mass.web.local_ip, self.mass.web.http_port, player_id, queue_item.queue_item_id)
                 queue_items.append(queue_item)
                     
index 6e5512d3e4d8e2471afe309d15a883c317e03c98..b20d8024bd05c6a65cb0f59df1f3bad0145caf9b 100755 (executable)
@@ -99,9 +99,8 @@ class Web():
         app.add_routes([web.get('/jsonrpc.js', self.json_rpc)])
         app.add_routes([web.post('/jsonrpc.js', self.json_rpc)])
         app.add_routes([web.get('/ws', self.websocket_handler)])
-        # app.add_routes([web.get('/stream_track', self.mass.http_streamer.stream_track)])
-        # app.add_routes([web.get('/stream_radio', self.mass.http_streamer.stream_radio)])
         app.add_routes([web.get('/stream/{player_id}', self.mass.http_streamer.stream)])
+        app.add_routes([web.get('/stream/{player_id}/{queue_item_id}', self.mass.http_streamer.stream)])
         app.add_routes([web.get('/api/search', self.search)])
         app.add_routes([web.get('/api/config', self.get_config)])
         app.add_routes([web.post('/api/config', self.save_config)])