From: marcelveldt Date: Mon, 14 Oct 2019 20:36:00 +0000 (+0200) Subject: fix queue stream X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=d64aa3e0d60a4558bbc6fbf8352ef516975bfc29;p=music-assistant-server.git fix queue stream work-around python bug with asyncio.subprocess --- diff --git a/music_assistant/http_streamer.py b/music_assistant/http_streamer.py index 20aebfa4..190e913a 100755 --- a/music_assistant/http_streamer.py +++ b/music_assistant/http_streamer.py @@ -12,6 +12,7 @@ import soundfile import pyloudnorm import io import aiohttp +import subprocess from .utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size from .models.media_types import TrackQuality, MediaType from .models.playerstate import PlayerState @@ -35,7 +36,7 @@ class HTTPStreamer(): if not player: LOGGER.error("Received stream request for non-existing player %s" %(player_id)) return - queue_item_id = http_request.query.get('queue_item_id') + queue_item_id = http_request.match_info.get('queue_item_id') queue_item = await player.queue.by_item_id(queue_item_id) # prepare headers as audio/flac content resp = web.StreamResponse(status=200, reason='OK', headers={'Content-Type': 'audio/flac'}) @@ -46,13 +47,12 @@ class HTTPStreamer(): buf_queue = asyncio.Queue() cancelled = threading.Event() if queue_item: - # single stream requested - #asyncio.create_task(self.__stream_single(player, queue_item, buf_queue, cancelled)) + # single stream requested, run stream in executor run_async_background_task( self.mass.bg_executor, self.__stream_single, player, queue_item, buf_queue, cancelled) else: - # no item is given, start queue stream + # no item is given, start queue stream, run stream in executor run_async_background_task( self.mass.bg_executor, self.__stream_queue, player, buf_queue, cancelled) @@ -64,16 +64,14 @@ class HTTPStreamer(): break await resp.write(chunk) buf_queue.task_done() - LOGGER.info("stream fininished for player %s" % player.name) except asyncio.CancelledError: cancelled.set() - LOGGER.warning("stream interrupted for player %s" % player.name) raise asyncio.CancelledError() return resp - + async def __stream_single(self, player, queue_item, buffer, cancelled): ''' start streaming single track from provider ''' - LOGGER.info("stream single track started for track %s on player %s" % (queue_item.name, player.name)) + LOGGER.debug("stream single track started for track %s on player %s" % (queue_item.name, player.name)) try: audio_stream = self.__get_audio_stream(player, queue_item, cancelled) async for is_last_chunk, audio_chunk in audio_stream: @@ -86,10 +84,10 @@ class HTTPStreamer(): self.mass.event_loop) except (asyncio.CancelledError, asyncio.TimeoutError): cancelled.set() - LOGGER.info("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name)) + LOGGER.debug("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name)) raise asyncio.CancelledError() else: - LOGGER.info("stream single track finished for track %s on player %s" % (queue_item.name, player.name)) + LOGGER.debug("stream single track finished for track %s on player %s" % (queue_item.name, player.name)) async def __stream_queue(self, player, buffer, cancelled): ''' start streaming all queue tracks ''' @@ -101,20 +99,20 @@ class HTTPStreamer(): fade_bytes = int(sample_rate * 4 * 2 * fade_length) else: fade_bytes = int(sample_rate * 4 * 2) - print("sample rate: %s" % sample_rate) - print("fade_bytes: %s" % fade_bytes) pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate args = 'sox -t %s - -t flac -C 0 -' % pcm_args - print(args) - sox_proc = await asyncio.create_subprocess_shell(args, - stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) + # start sox process + # we use normal subprocess instead of asyncio because of bug with executor + # this should be fixed with python 3.8 + sox_proc = subprocess.Popen(args, shell=True, + stdout=subprocess.PIPE, stdin=subprocess.PIPE) - async def fill_buffer(): - while not sox_proc.stdout.at_eof() and not sox_proc.returncode: - try: - chunk = await sox_proc.stdout.readexactly(256000) - except asyncio.streams.IncompleteReadError as err: - chunk = err.partial + def fill_buffer(): + while True: + chunk = sox_proc.stdout.read(256000) + if not chunk: + # no more data + break asyncio.run_coroutine_threadsafe( buffer.put(chunk), self.mass.event_loop) @@ -122,7 +120,8 @@ class HTTPStreamer(): asyncio.run_coroutine_threadsafe( buffer.put(b''), self.mass.event_loop) - asyncio.create_task(fill_buffer()) + threading.Thread(target=fill_buffer).start() + LOGGER.info("Start Queue Stream for player %s " %(player.name)) is_start = True @@ -154,7 +153,6 @@ class HTTPStreamer(): if cur_chunk <= 2 and not last_fadeout_data: # no fadeout_part available so just pass it to the output directly sox_proc.stdin.write(chunk) - await sox_proc.stdin.drain() bytes_written += len(chunk) elif cur_chunk == 1 and last_fadeout_data: prev_chunk = chunk @@ -162,23 +160,23 @@ class HTTPStreamer(): elif cur_chunk == 2 and last_fadeout_data: # combine the first 2 chunks and strip off silence args = 'sox --ignore-length -t %s - -t %s - silence 1 0.1 1%%' % (pcm_args, pcm_args) - process = await asyncio.create_subprocess_shell(args, - stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) - first_part, stderr = await process.communicate(prev_chunk + chunk) + first_part, std_err = subprocess.Popen(args, shell=True, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE).communicate(prev_chunk + chunk) fade_in_part = first_part[:fade_bytes] remaining_bytes = first_part[fade_bytes:] del first_part # do crossfade - crossfade_part = await self.__crossfade_pcm_parts(fade_in_part, last_fadeout_data, pcm_args, fade_length) + crossfade_part = asyncio.run_coroutine_threadsafe( + self.__crossfade_pcm_parts(fade_in_part, + last_fadeout_data, pcm_args, fade_length), self.mass.event_loop).result() sox_proc.stdin.write(crossfade_part) - await sox_proc.stdin.drain() bytes_written += len(crossfade_part) del crossfade_part del fade_in_part last_fadeout_data = b'' # also write the leftover bytes from the strip action sox_proc.stdin.write(remaining_bytes) - await sox_proc.stdin.drain() bytes_written += len(remaining_bytes) del remaining_bytes prev_chunk = None # needed to prevent this chunk being sent again @@ -187,14 +185,13 @@ class HTTPStreamer(): # last chunk received so create the fadeout_part with the previous chunk and this chunk # and strip off silence args = 'sox --ignore-length -t %s - -t %s - reverse silence 1 0.1 1%% reverse' % (pcm_args, pcm_args) - process = await asyncio.create_subprocess_shell(args, - stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) - last_part, stderr = await process.communicate(prev_chunk + chunk) + last_part, stderr = subprocess.Popen(args, shell=True, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE).communicate(prev_chunk + chunk) if not player.queue.crossfade_enabled: # crossfading is not enabled so just pass the (stripped) audio data sox_proc.stdin.write(last_part) bytes_written += len(last_part) - await sox_proc.stdin.drain() del last_part else: # handle crossfading support @@ -206,7 +203,6 @@ class HTTPStreamer(): LOGGER.debug("not enough data for fadeout so skip crossfade... %s" % len(last_part)) sox_proc.stdin.write(last_part) bytes_written += len(last_part) - await sox_proc.stdin.drain() del last_part else: # store fade section to be picked up for next track @@ -215,7 +211,6 @@ class HTTPStreamer(): # write remaining bytes sox_proc.stdin.write(remaining_bytes) bytes_written += len(remaining_bytes) - await sox_proc.stdin.drain() del last_part del remaining_bytes ### MIDDLE PARTS OF TRACK @@ -224,20 +219,13 @@ class HTTPStreamer(): # keep previous chunk in memory so we have enough samples to perform the crossfade if prev_chunk: sox_proc.stdin.write(prev_chunk) - await sox_proc.stdin.drain() bytes_written += len(prev_chunk) prev_chunk = chunk else: prev_chunk = chunk - # wait for the queue to consume the data - # this prevents that the entire track is sitting in memory - # and it helps a bit in the quest to follow where we are in the queue - while buffer.qsize() > 2 and not cancelled.is_set(): - await asyncio.sleep(1) # end of the track reached if cancelled.is_set(): # break out the loop if the http session is cancelled - LOGGER.debug("session cancelled") break else: # WIP: update actual duration to the queue for more accurate now playing info @@ -245,13 +233,14 @@ class HTTPStreamer(): queue_track.duration = accurate_duration LOGGER.debug("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name)) LOGGER.debug("bytes written: %s - duration: %s" % (bytes_written, accurate_duration)) + # wait for the queue to consume the data + while buffer.qsize() > 2 and not cancelled.is_set(): + await asyncio.sleep(1) # end of queue reached, pass last fadeout bits to final output if last_fadeout_data and not cancelled.is_set(): sox_proc.stdin.write(last_fadeout_data) - await sox_proc.stdin.drain() sox_proc.stdin.close() sox_proc.terminate() - await sox_proc.wait() LOGGER.info("streaming of queue for player %s completed" % player.name) async def __get_audio_stream(self, player, queue_item, cancelled, @@ -291,29 +280,28 @@ class HTTPStreamer(): args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"], streamdetails["content_type"], outputfmt, sox_effects) # start sox process - process = await asyncio.create_subprocess_shell(args, - stdout=asyncio.subprocess.PIPE) + # we use normal subprocess instead of asyncio because of bug with executor + # this should be fixed with python 3.8 + process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE) # fire event that streaming has started for this track (needed by some streaming providers) streamdetails["provider"] = queue_item.provider streamdetails["track_id"] = queue_item.item_id streamdetails["player_id"] = player.player_id asyncio.run_coroutine_threadsafe( - self.mass.signal_event('streaming_started', streamdetails), self.mass.event_loop) + self.mass.signal_event('streaming_started', + streamdetails), self.mass.event_loop) # yield chunks from stdout # we keep 1 chunk behind to detect end of stream properly prev_chunk = b'' bytes_sent = 0 - while not process.stdout.at_eof() and not process.returncode: + while True: if cancelled.is_set(): - try: - process.terminate() - except ProcessLookupError: - pass - try: - chunk = await process.stdout.readexactly(chunksize) - except asyncio.streams.IncompleteReadError as err: - chunk = err.partial + process.terminate() + chunk = process.stdout.read(chunksize) + if not chunk: + # no more data + break if prev_chunk: yield (False, prev_chunk) bytes_sent += len(prev_chunk) @@ -321,11 +309,6 @@ class HTTPStreamer(): # yield last chunk yield (True, prev_chunk) bytes_sent += len(prev_chunk) - await process.wait() - if cancelled.is_set(): - LOGGER.info("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent)) - else: - LOGGER.info("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent)) # fire event that streaming has ended for this track (needed by some streaming providers) if resample: bytes_per_second = resample * (32/8) * 2 @@ -335,12 +318,10 @@ class HTTPStreamer(): seconds_streamed = queue_item.duration streamdetails["seconds"] = seconds_streamed asyncio.run_coroutine_threadsafe( - self.mass.signal_event('streaming_ended', streamdetails), - self.mass.event_loop) + self.mass.signal_event('streaming_ended', streamdetails), self.mass.event_loop) # send task to background to analyse the audio asyncio.run_coroutine_threadsafe( - self.__analyze_audio(queue_item), - self.mass.event_loop) + self.__analyze_audio(queue_item), self.mass.event_loop) async def __get_player_sox_options(self, player, queue_item): ''' get player specific sox effect options ''' diff --git a/music_assistant/player_manager.py b/music_assistant/player_manager.py index 357d7bec..c6ff40c5 100755 --- a/music_assistant/player_manager.py +++ b/music_assistant/player_manager.py @@ -93,7 +93,7 @@ class PlayerManager(): for track in tracks: queue_item = QueueItem(track) # generate uri for this queue item - queue_item.uri = 'http://%s:%s/stream/%s?queue_item_id=%s'% ( + queue_item.uri = 'http://%s:%s/stream/%s/%s'% ( self.mass.web.local_ip, self.mass.web.http_port, player_id, queue_item.queue_item_id) queue_items.append(queue_item) diff --git a/music_assistant/web.py b/music_assistant/web.py index 6e5512d3..b20d8024 100755 --- a/music_assistant/web.py +++ b/music_assistant/web.py @@ -99,9 +99,8 @@ class Web(): app.add_routes([web.get('/jsonrpc.js', self.json_rpc)]) app.add_routes([web.post('/jsonrpc.js', self.json_rpc)]) app.add_routes([web.get('/ws', self.websocket_handler)]) - # app.add_routes([web.get('/stream_track', self.mass.http_streamer.stream_track)]) - # app.add_routes([web.get('/stream_radio', self.mass.http_streamer.stream_radio)]) app.add_routes([web.get('/stream/{player_id}', self.mass.http_streamer.stream)]) + app.add_routes([web.get('/stream/{player_id}/{queue_item_id}', self.mass.http_streamer.stream)]) app.add_routes([web.get('/api/search', self.search)]) app.add_routes([web.get('/api/config', self.get_config)]) app.add_routes([web.post('/api/config', self.save_config)])