raise asyncio.CancelledError()
return resp
- async def __stream_queue_org(self, player_id, startindex, buffer, cancelled):
- ''' start streaming all queue tracks '''
- # TODO: get correct queue index and implement reporting of position
- sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate']
- fade_length = self.mass.config['player_settings'][player_id]["crossfade_duration"]
- pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
- args = 'sox -t %s - -t flac -C 2 -' % pcm_args
- sox_proc = await asyncio.create_subprocess_shell(args,
- stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
-
- async def fill_buffer():
- while not sox_proc.stdout.at_eof():
- chunk = await sox_proc.stdout.read(256000)
- if not chunk:
- break
- await buffer.put(chunk)
- await buffer.put(b'') # indicate EOF
- asyncio.create_task(fill_buffer())
-
- queue_index = startindex
- last_fadeout_data = None
- while True:
- # get the (next) track in queue
- try:
- queue_tracks = await self.mass.player.player_queue(player_id, queue_index, queue_index+1)
- queue_track = queue_tracks[0]
- except IndexError:
- LOGGER.info("queue index out of range or end reached")
- break
-
- params = urllib.parse.parse_qs(queue_track.uri.split('?')[1])
- track_id = params['track_id'][0]
- provider = params['provider'][0]
- LOGGER.info("Start Streaming queue track: %s - %s" % (track_id, queue_track.name))
- audiodata = await self.__get_raw_audio(track_id, provider, sample_rate)
- fade_bytes = int(sample_rate * 4 * 2 * fade_length)
- LOGGER.debug("total bytes in audio_data: %s - fade_bytes: %s" % (len(audiodata),fade_bytes))
-
- # report start stream of current queue index
- self.mass.event_loop.create_task(self.mass.player.player_queue_stream_move(player_id, queue_index))
- queue_index += 1
-
- if last_fadeout_data:
- # get fade in part
- args = 'sox --ignore-length -t %s - -t %s - fade t %s' % (pcm_args, pcm_args, fade_length)
- process = await asyncio.create_subprocess_shell(args,
- stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
- fade_in_part, stderr = await process.communicate(audiodata[:fade_bytes])
- LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part))
- # perform crossfade with previous fadeout samples
- fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
- fadeinfile.write(fade_in_part)
- fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
- fadeoutfile.write(last_fadeout_data)
- args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args)
- process = await asyncio.create_subprocess_shell(args,
- stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
- crossfade_part, stderr = await process.communicate(fade_in_part)
- LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part))
- sox_proc.stdin.write(crossfade_part)
- await sox_proc.stdin.drain()
- fadeinfile.close()
- fadeoutfile.close()
- del crossfade_part
- del fade_in_part
- last_fadeout_data = None
- else:
- # simply put the fadein part in the final file
- sox_proc.stdin.write(audiodata[:fade_bytes])
- await sox_proc.stdin.drain()
-
- # feed the middle part into the main sox
- sox_proc.stdin.write(audiodata[fade_bytes:-fade_bytes])
- await sox_proc.stdin.drain()
-
- # get fade out part
- args = 'sox --ignore-length -t %s - -t %s - reverse fade t %s reverse' % (pcm_args, pcm_args, fade_length)
- process = await asyncio.create_subprocess_shell(args,
- stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
- last_fadeout_data, stderr = await process.communicate(audiodata[-fade_bytes:])
- LOGGER.debug("Got %s bytes in memory for fade_out_part after sox" % len(last_fadeout_data))
- # cleanup audio data
- del audiodata
-
- LOGGER.info("Queued Streaming queue track: %s - %s" % (track_id, queue_track.name))
-
- # wait for the queue to consume the data
- while buffer.qsize() > 1 and not cancelled.is_set():
- await asyncio.sleep(1)
- if cancelled.is_set():
- break
- LOGGER.info("Finished Streaming queue track: %s - %s" % (track_id, queue_track.name))
-
- # end of queue reached, pass last fadeout bits to final output
- if last_fadeout_data:
- sox_proc.stdin.write(last_fadeout_data)
- await sox_proc.stdin.drain()
- sox_proc.stdin.close()
- await sox_proc.wait()
- LOGGER.info("streaming of queue for player %s completed" % player_id)
-
- async def __get_raw_audio_org(self, track_id, provider, sample_rate=96000):
- ''' get raw pcm data for a track upsampled to given sample_rate packed as wav '''
- audiodata = b''
- cachefile = self.__get_track_cache_filename(track_id, provider)
- pcm_args = 'raw -b 32 -c 2 -e signed-integer'
- if self.mass.config['base']['http_streamer']['volume_normalisation']:
- gain_correct = await self.__get_track_gain_correct(track_id, provider)
- else:
- gain_correct = -6 # always need some headroom for upsampling and crossfades
- if os.path.isfile(cachefile):
- # we have a cache file for this track which we can use
- args = 'sox -t flac "%s" -t %s - vol %s dB rate -v %s' % (cachefile, pcm_args, gain_correct, sample_rate)
- process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE)
- else:
- # stream from provider
- input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
- assert(input_content_type)
- args = 'sox -t %s - -t %s - vol %s dB rate -v %s' % (input_content_type, pcm_args, gain_correct, sample_rate)
- process = await asyncio.create_subprocess_shell(args,
- stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
- asyncio.get_event_loop().create_task(
- self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
- audiodata, stderr = await process.communicate()
- LOGGER.debug("__get_raw_audio for track_id %s completed" % (track_id))
- return audiodata
-
async def __stream_queue(self, player_id, startindex, buffer, cancelled):
''' start streaming all queue tracks '''
- # TODO: get correct queue index and implement reporting of position
sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate']
fade_length = self.mass.config['player_settings'][player_id]["crossfade_duration"]
pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
- args = 'sox -t %s - -t flac -C 2 -' % pcm_args
+ args = 'sox -t %s - -t flac -C 0 -' % pcm_args
sox_proc = await asyncio.create_subprocess_shell(args,
stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
provider = params['provider'][0]
LOGGER.info("Start Streaming queue track: %s - %s" % (track_id, queue_track.name))
fade_bytes = int(sample_rate * 4 * 2 * fade_length)
- cachefile = self.__get_track_cache_filename(track_id, provider)
- if os.path.isfile(cachefile):
- # get track length from cachefile
- args = 'soxi -d "%s"' % cachefile
- process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE)
- stdout, stderr = await process.communicate()
- timestr = stdout.split()[0].decode()
- hours = int(timestr.split(":")[0])
- minutes = int(timestr.split(":")[1])
- seconds = int(float(timestr.split(":")[2]))
- total_chunks = hours*60*60 + minutes*60 + seconds
- else:
- total_chunks = int(queue_track.duration)
-
- # report start stream of current queue index
- self.mass.event_loop.create_task(self.mass.player.player_queue_stream_move(player_id, queue_index))
- queue_index += 1
fade_in_part = b''
cur_chunk = 0
+ prev_chunk = None
- async for chunk in self.__get_raw_audio(track_id, provider, sample_rate):
+ async for is_last_chunk, chunk in self.__get_raw_audio(track_id, provider, sample_rate, fade_bytes):
cur_chunk += 1
-
- if cur_chunk <= fade_length and not last_fadeout_data:
+ if cur_chunk == 1 and not last_fadeout_data:
# fade-in part but this is the first track so just pass it to the final file
sox_proc.stdin.write(chunk)
await sox_proc.stdin.drain()
- elif (cur_chunk < fade_length) and last_fadeout_data:
- # need to have fade_length of chunks for the fade-in data
- fade_in_part += chunk
- elif fade_in_part and last_fadeout_data:
- fade_in_part += chunk
- # perform crossfade with previous fadeout samples
+ elif cur_chunk == 1 and last_fadeout_data:
+ # create fade-out part
args = 'sox --ignore-length -t %s - -t %s - reverse fade t %s reverse' % (pcm_args, pcm_args, fade_length)
process = await asyncio.create_subprocess_shell(args,
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
last_fadeout_data, stderr = await process.communicate(last_fadeout_data)
- LOGGER.info("Got %s bytes in memory for fade_out_part after sox" % len(last_fadeout_data))
+ LOGGER.debug("Got %s bytes in memory for fade_out_part after sox" % len(last_fadeout_data))
+ # create fade-in part
args = 'sox --ignore-length -t %s - -t %s - fade t %s' % (pcm_args, pcm_args, fade_length)
process = await asyncio.create_subprocess_shell(args,
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
- fade_in_part, stderr = await process.communicate(fade_in_part)
- LOGGER.info("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part))
+ fade_in_part, stderr = await process.communicate(chunk)
+ LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part))
+ # create crossfade using sox and some temp files
+ # TODO: figure out how to make this less complex and without the tempfiles
fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
fadeinfile.write(fade_in_part)
fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
process = await asyncio.create_subprocess_shell(args,
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
crossfade_part, stderr = await process.communicate(fade_in_part)
- LOGGER.info("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part))
+ LOGGER.debug("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part))
+ # write the crossfade part to the sox player
sox_proc.stdin.write(crossfade_part)
await sox_proc.stdin.drain()
fadeinfile.close()
del crossfade_part
fade_in_part = None
last_fadeout_data = b''
- elif (cur_chunk > fade_length) and (cur_chunk < (total_chunks-fade_length)):
- # middle part of the track
- sox_proc.stdin.write(chunk)
+ elif prev_chunk and is_last_chunk:
+ # last chunk received so create the fadeout with the previous chunk and this chunk
+ last_part = prev_chunk + chunk
+ last_fadeout_data = last_part[-fade_bytes:]
+ bytes_remaining = last_part[:-fade_bytes]
+ sox_proc.stdin.write(bytes_remaining)
await sox_proc.stdin.drain()
else:
- # fade out part
- last_fadeout_data += chunk
-
- LOGGER.info("Queued Streaming queue track: %s - %s" % (track_id, queue_track.name))
-
- #wait for the queue to consume the data
- while buffer.qsize() > 1 and not cancelled.is_set():
- await asyncio.sleep(1)
- if cancelled.is_set():
- break
+ # middle part of the track
+ # keep previous chunk in memory so we have enough samples to perform the crossfade
+ if prev_chunk:
+ sox_proc.stdin.write(prev_chunk)
+ await sox_proc.stdin.drain()
+ prev_chunk = chunk
+ else:
+ prev_chunk = chunk
+ # wait for the queue to consume the data
+ # this prevents that the entire track is sitting in memory
+ # and it helps a bit in the quest to follow where we are in the queue
+ while buffer.qsize() > 1 and not cancelled.is_set():
+ await asyncio.sleep(1)
+ # break out the loop if the http session is cancelled
+ if cancelled.is_set():
+ break
+ if cur_chunk == 1:
+ # report start stream of current queue index
+ self.mass.event_loop.create_task(self.mass.player.player_queue_stream_move(player_id, queue_index))
+ # end of the track reached
LOGGER.info("Finished Streaming queue track: %s - %s" % (track_id, queue_track.name))
+ queue_index += 1
# end of queue reached, pass last fadeout bits to final output
if last_fadeout_data:
await sox_proc.wait()
LOGGER.info("streaming of queue for player %s completed" % player_id)
- async def __get_raw_audio(self, track_id, provider, sample_rate=96000):
+ async def __get_raw_audio(self, track_id, provider, sample_rate, chunksize):
''' get raw pcm data for a track upsampled to given sample_rate packed as wav '''
cachefile = self.__get_track_cache_filename(track_id, provider)
pcm_args = 'raw -b 32 -c 2 -e signed-integer'
asyncio.get_event_loop().create_task(
self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
# put chunks from stdout into queue
- chunksize = int(sample_rate * (32/8) * 2) # 1 second
+ # we keep 1 chunk behind to detect end of stream properly
+ prev_chunk = None
while not process.stdout.at_eof():
try:
chunk = await process.stdout.readexactly(chunksize)
chunk = await process.stdout.read(chunksize)
if not chunk:
break
- yield chunk
+ if prev_chunk:
+ yield (False, prev_chunk)
+ prev_chunk = chunk
+ # yield last chunk
+ yield (True, prev_chunk)
await process.wait()
LOGGER.info("__get_raw_audio for track_id %s completed" % (track_id))