import pyloudnorm
import io
import aiohttp
+import subprocess
from .utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size
from .models.media_types import TrackQuality, MediaType
from .models.playerstate import PlayerState
if not player:
LOGGER.error("Received stream request for non-existing player %s" %(player_id))
return
- queue_item_id = http_request.query.get('queue_item_id')
+ queue_item_id = http_request.match_info.get('queue_item_id')
queue_item = await player.queue.by_item_id(queue_item_id)
# prepare headers as audio/flac content
resp = web.StreamResponse(status=200, reason='OK', headers={'Content-Type': 'audio/flac'})
buf_queue = asyncio.Queue()
cancelled = threading.Event()
if queue_item:
- # single stream requested
- #asyncio.create_task(self.__stream_single(player, queue_item, buf_queue, cancelled))
+ # single stream requested, run stream in executor
run_async_background_task(
self.mass.bg_executor,
self.__stream_single, player, queue_item, buf_queue, cancelled)
else:
- # no item is given, start queue stream
+ # no item is given, start queue stream, run stream in executor
run_async_background_task(
self.mass.bg_executor,
self.__stream_queue, player, buf_queue, cancelled)
break
await resp.write(chunk)
buf_queue.task_done()
- LOGGER.info("stream fininished for player %s" % player.name)
except asyncio.CancelledError:
cancelled.set()
- LOGGER.warning("stream interrupted for player %s" % player.name)
raise asyncio.CancelledError()
return resp
-
+
async def __stream_single(self, player, queue_item, buffer, cancelled):
''' start streaming single track from provider '''
- LOGGER.info("stream single track started for track %s on player %s" % (queue_item.name, player.name))
+ LOGGER.debug("stream single track started for track %s on player %s" % (queue_item.name, player.name))
try:
audio_stream = self.__get_audio_stream(player, queue_item, cancelled)
async for is_last_chunk, audio_chunk in audio_stream:
self.mass.event_loop)
except (asyncio.CancelledError, asyncio.TimeoutError):
cancelled.set()
- LOGGER.info("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name))
+ LOGGER.debug("stream single track interrupted for track %s on player %s" % (queue_item.name, player.name))
raise asyncio.CancelledError()
else:
- LOGGER.info("stream single track finished for track %s on player %s" % (queue_item.name, player.name))
+ LOGGER.debug("stream single track finished for track %s on player %s" % (queue_item.name, player.name))
async def __stream_queue(self, player, buffer, cancelled):
''' start streaming all queue tracks '''
fade_bytes = int(sample_rate * 4 * 2 * fade_length)
else:
fade_bytes = int(sample_rate * 4 * 2)
- print("sample rate: %s" % sample_rate)
- print("fade_bytes: %s" % fade_bytes)
pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
args = 'sox -t %s - -t flac -C 0 -' % pcm_args
- print(args)
- sox_proc = await asyncio.create_subprocess_shell(args,
- stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
+ # start sox process
+ # we use normal subprocess instead of asyncio because of bug with executor
+ # this should be fixed with python 3.8
+ sox_proc = subprocess.Popen(args, shell=True,
+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
- async def fill_buffer():
- while not sox_proc.stdout.at_eof() and not sox_proc.returncode:
- try:
- chunk = await sox_proc.stdout.readexactly(256000)
- except asyncio.streams.IncompleteReadError as err:
- chunk = err.partial
+ def fill_buffer():
+ while True:
+ chunk = sox_proc.stdout.read(256000)
+ if not chunk:
+ # no more data
+ break
asyncio.run_coroutine_threadsafe(
buffer.put(chunk),
self.mass.event_loop)
asyncio.run_coroutine_threadsafe(
buffer.put(b''),
self.mass.event_loop)
- asyncio.create_task(fill_buffer())
+ threading.Thread(target=fill_buffer).start()
+
LOGGER.info("Start Queue Stream for player %s " %(player.name))
is_start = True
if cur_chunk <= 2 and not last_fadeout_data:
# no fadeout_part available so just pass it to the output directly
sox_proc.stdin.write(chunk)
- await sox_proc.stdin.drain()
bytes_written += len(chunk)
elif cur_chunk == 1 and last_fadeout_data:
prev_chunk = chunk
elif cur_chunk == 2 and last_fadeout_data:
# combine the first 2 chunks and strip off silence
args = 'sox --ignore-length -t %s - -t %s - silence 1 0.1 1%%' % (pcm_args, pcm_args)
- process = await asyncio.create_subprocess_shell(args,
- stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
- first_part, stderr = await process.communicate(prev_chunk + chunk)
+ first_part, std_err = subprocess.Popen(args, shell=True,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE).communicate(prev_chunk + chunk)
fade_in_part = first_part[:fade_bytes]
remaining_bytes = first_part[fade_bytes:]
del first_part
# do crossfade
- crossfade_part = await self.__crossfade_pcm_parts(fade_in_part, last_fadeout_data, pcm_args, fade_length)
+ crossfade_part = asyncio.run_coroutine_threadsafe(
+ self.__crossfade_pcm_parts(fade_in_part,
+ last_fadeout_data, pcm_args, fade_length), self.mass.event_loop).result()
sox_proc.stdin.write(crossfade_part)
- await sox_proc.stdin.drain()
bytes_written += len(crossfade_part)
del crossfade_part
del fade_in_part
last_fadeout_data = b''
# also write the leftover bytes from the strip action
sox_proc.stdin.write(remaining_bytes)
- await sox_proc.stdin.drain()
bytes_written += len(remaining_bytes)
del remaining_bytes
prev_chunk = None # needed to prevent this chunk being sent again
# last chunk received so create the fadeout_part with the previous chunk and this chunk
# and strip off silence
args = 'sox --ignore-length -t %s - -t %s - reverse silence 1 0.1 1%% reverse' % (pcm_args, pcm_args)
- process = await asyncio.create_subprocess_shell(args,
- stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
- last_part, stderr = await process.communicate(prev_chunk + chunk)
+ last_part, stderr = subprocess.Popen(args, shell=True,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE).communicate(prev_chunk + chunk)
if not player.queue.crossfade_enabled:
# crossfading is not enabled so just pass the (stripped) audio data
sox_proc.stdin.write(last_part)
bytes_written += len(last_part)
- await sox_proc.stdin.drain()
del last_part
else:
# handle crossfading support
LOGGER.debug("not enough data for fadeout so skip crossfade... %s" % len(last_part))
sox_proc.stdin.write(last_part)
bytes_written += len(last_part)
- await sox_proc.stdin.drain()
del last_part
else:
# store fade section to be picked up for next track
# write remaining bytes
sox_proc.stdin.write(remaining_bytes)
bytes_written += len(remaining_bytes)
- await sox_proc.stdin.drain()
del last_part
del remaining_bytes
### MIDDLE PARTS OF TRACK
# keep previous chunk in memory so we have enough samples to perform the crossfade
if prev_chunk:
sox_proc.stdin.write(prev_chunk)
- await sox_proc.stdin.drain()
bytes_written += len(prev_chunk)
prev_chunk = chunk
else:
prev_chunk = chunk
- # wait for the queue to consume the data
- # this prevents that the entire track is sitting in memory
- # and it helps a bit in the quest to follow where we are in the queue
- while buffer.qsize() > 2 and not cancelled.is_set():
- await asyncio.sleep(1)
# end of the track reached
if cancelled.is_set():
# break out the loop if the http session is cancelled
- LOGGER.debug("session cancelled")
break
else:
# WIP: update actual duration to the queue for more accurate now playing info
queue_track.duration = accurate_duration
LOGGER.debug("Finished Streaming queue track: %s (%s) on player %s" % (queue_track.item_id, queue_track.name, player.name))
LOGGER.debug("bytes written: %s - duration: %s" % (bytes_written, accurate_duration))
+ # wait for the queue to consume the data
+ while buffer.qsize() > 2 and not cancelled.is_set():
+ await asyncio.sleep(1)
# end of queue reached, pass last fadeout bits to final output
if last_fadeout_data and not cancelled.is_set():
sox_proc.stdin.write(last_fadeout_data)
- await sox_proc.stdin.drain()
sox_proc.stdin.close()
sox_proc.terminate()
- await sox_proc.wait()
LOGGER.info("streaming of queue for player %s completed" % player.name)
async def __get_audio_stream(self, player, queue_item, cancelled,
args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"],
streamdetails["content_type"], outputfmt, sox_effects)
# start sox process
- process = await asyncio.create_subprocess_shell(args,
- stdout=asyncio.subprocess.PIPE)
+ # we use normal subprocess instead of asyncio because of bug with executor
+ # this should be fixed with python 3.8
+ process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE)
# fire event that streaming has started for this track (needed by some streaming providers)
streamdetails["provider"] = queue_item.provider
streamdetails["track_id"] = queue_item.item_id
streamdetails["player_id"] = player.player_id
asyncio.run_coroutine_threadsafe(
- self.mass.signal_event('streaming_started', streamdetails), self.mass.event_loop)
+ self.mass.signal_event('streaming_started',
+ streamdetails), self.mass.event_loop)
# yield chunks from stdout
# we keep 1 chunk behind to detect end of stream properly
prev_chunk = b''
bytes_sent = 0
- while not process.stdout.at_eof() and not process.returncode:
+ while True:
if cancelled.is_set():
- try:
- process.terminate()
- except ProcessLookupError:
- pass
- try:
- chunk = await process.stdout.readexactly(chunksize)
- except asyncio.streams.IncompleteReadError as err:
- chunk = err.partial
+ process.terminate()
+ chunk = process.stdout.read(chunksize)
+ if not chunk:
+ # no more data
+ break
if prev_chunk:
yield (False, prev_chunk)
bytes_sent += len(prev_chunk)
# yield last chunk
yield (True, prev_chunk)
bytes_sent += len(prev_chunk)
- await process.wait()
- if cancelled.is_set():
- LOGGER.info("__get_audio_stream for track_id %s interrupted - bytes_sent: %s" % (queue_item.item_id, bytes_sent))
- else:
- LOGGER.info("__get_audio_stream for track_id %s completed- bytes_sent: %s" % (queue_item.item_id, bytes_sent))
# fire event that streaming has ended for this track (needed by some streaming providers)
if resample:
bytes_per_second = resample * (32/8) * 2
seconds_streamed = queue_item.duration
streamdetails["seconds"] = seconds_streamed
asyncio.run_coroutine_threadsafe(
- self.mass.signal_event('streaming_ended', streamdetails),
- self.mass.event_loop)
+ self.mass.signal_event('streaming_ended', streamdetails), self.mass.event_loop)
# send task to background to analyse the audio
asyncio.run_coroutine_threadsafe(
- self.__analyze_audio(queue_item),
- self.mass.event_loop)
+ self.__analyze_audio(queue_item), self.mass.event_loop)
async def __get_player_sox_options(self, player, queue_item):
''' get player specific sox effect options '''