import asyncio
import os
from utils import LOGGER, try_parse_int, get_ip, run_async_background_task, run_periodic, get_folder_size
-from models import TrackQuality, MediaType
+from models import TrackQuality, MediaType, PlayerState
import shutil
import xml.etree.ElementTree as ET
import random
use case is enable crossfade support for chromecast devices
'''
player_id = http_request.query.get('player_id')
+ startindex = int(http_request.query.get('startindex',0))
cancelled = threading.Event()
resp = web.StreamResponse(status=200,
reason='OK',
cancelled = threading.Event()
run_async_background_task(
self.mass.bg_executor,
- self.__stream_queue, player_id, queue, cancelled)
+ self.__stream_queue, player_id, startindex, queue, cancelled)
try:
while True:
chunk = await queue.get()
raise asyncio.CancelledError()
return resp
- async def __stream_queue(self, player_id, buffer, cancelled):
+ async def __stream_queue_org(self, player_id, startindex, buffer, cancelled):
''' start streaming all queue tracks '''
# TODO: get correct queue index and implement reporting of position
sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate']
await buffer.put(b'') # indicate EOF
asyncio.create_task(fill_buffer())
+ queue_index = startindex
last_fadeout_data = None
while True:
- # get current track in queue
- queue_tracks = await self.mass.player.player_queue(player_id, 0, 10000)
- player = self.mass.player._players[player_id]
- queue_index = player.cur_queue_index
+ # get the (next) track in queue
try:
- queue_track = queue_tracks[queue_index]
+ queue_tracks = await self.mass.player.player_queue(player_id, queue_index, queue_index+1)
+ queue_track = queue_tracks[0]
except IndexError:
LOGGER.info("queue index out of range or end reached")
break
params = urllib.parse.parse_qs(queue_track.uri.split('?')[1])
track_id = params['track_id'][0]
provider = params['provider'][0]
- LOGGER.info("Stream queue track: %s - %s" % (track_id, queue_track.name))
+ LOGGER.info("Start Streaming queue track: %s - %s" % (track_id, queue_track.name))
audiodata = await self.__get_raw_audio(track_id, provider, sample_rate)
fade_bytes = int(sample_rate * 4 * 2 * fade_length)
LOGGER.debug("total bytes in audio_data: %s - fade_bytes: %s" % (len(audiodata),fade_bytes))
-
- # get fade in part
- args = 'sox --ignore-length -t %s - -t %s - fade t %s' % (pcm_args, pcm_args, fade_length)
- process = await asyncio.create_subprocess_shell(args,
- stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
- fade_in_part, stderr = await process.communicate(audiodata[:fade_bytes])
- LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part))
+
+ # report start stream of current queue index
+ self.mass.event_loop.create_task(self.mass.player.player_queue_stream_move(player_id, queue_index))
+ queue_index += 1
+
if last_fadeout_data:
+ # get fade in part
+ args = 'sox --ignore-length -t %s - -t %s - fade t %s' % (pcm_args, pcm_args, fade_length)
+ process = await asyncio.create_subprocess_shell(args,
+ stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+ fade_in_part, stderr = await process.communicate(audiodata[:fade_bytes])
+ LOGGER.debug("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part))
# perform crossfade with previous fadeout samples
fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
fadeinfile.write(fade_in_part)
last_fadeout_data = None
else:
# simply put the fadein part in the final file
- sox_proc.stdin.write(fade_in_part)
+ sox_proc.stdin.write(audiodata[:fade_bytes])
await sox_proc.stdin.drain()
- del fade_in_part
# feed the middle part into the main sox
sox_proc.stdin.write(audiodata[fade_bytes:-fade_bytes])
# cleanup audio data
del audiodata
+ LOGGER.info("Queued Streaming queue track: %s - %s" % (track_id, queue_track.name))
+
# wait for the queue to consume the data
- while buffer.qsize() > 5 and not cancelled.is_set():
+ while buffer.qsize() > 1 and not cancelled.is_set():
await asyncio.sleep(1)
if cancelled.is_set():
break
- # assume end of track and increase queue_index
- player.cur_queue_index += 1
- await self.mass.player.trigger_update(player_id)
+ LOGGER.info("Finished Streaming queue track: %s - %s" % (track_id, queue_track.name))
# end of queue reached, pass last fadeout bits to final output
if last_fadeout_data:
await sox_proc.wait()
LOGGER.info("streaming of queue for player %s completed" % player_id)
- async def __get_raw_audio(self, track_id, provider, sample_rate=96000):
+ async def __get_raw_audio_org(self, track_id, provider, sample_rate=96000):
''' get raw pcm data for a track upsampled to given sample_rate packed as wav '''
audiodata = b''
cachefile = self.__get_track_cache_filename(track_id, provider)
stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
asyncio.get_event_loop().create_task(
self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
- #await process.wait()
audiodata, stderr = await process.communicate()
LOGGER.debug("__get_raw_audio for track_id %s completed" % (track_id))
return audiodata
+ async def __stream_queue(self, player_id, startindex, buffer, cancelled):
+ ''' start streaming all queue tracks '''
+ # TODO: get correct queue index and implement reporting of position
+ sample_rate = self.mass.config['player_settings'][player_id]['max_sample_rate']
+ fade_length = self.mass.config['player_settings'][player_id]["crossfade_duration"]
+ pcm_args = 'raw -b 32 -c 2 -e signed-integer -r %s' % sample_rate
+ args = 'sox -t %s - -t flac -C 2 -' % pcm_args
+ sox_proc = await asyncio.create_subprocess_shell(args,
+ stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
+
+ async def fill_buffer():
+ while not sox_proc.stdout.at_eof():
+ chunk = await sox_proc.stdout.read(256000)
+ if not chunk:
+ break
+ await buffer.put(chunk)
+ await buffer.put(b'') # indicate EOF
+ asyncio.create_task(fill_buffer())
+
+ queue_index = startindex
+ last_fadeout_data = b''
+ while True:
+ # get the (next) track in queue
+ try:
+ queue_tracks = await self.mass.player.player_queue(player_id, queue_index, queue_index+1)
+ queue_track = queue_tracks[0]
+ except IndexError:
+ LOGGER.info("queue index out of range or end reached")
+ break
+
+ params = urllib.parse.parse_qs(queue_track.uri.split('?')[1])
+ track_id = params['track_id'][0]
+ provider = params['provider'][0]
+ LOGGER.info("Start Streaming queue track: %s - %s" % (track_id, queue_track.name))
+ fade_bytes = int(sample_rate * 4 * 2 * fade_length)
+ cachefile = self.__get_track_cache_filename(track_id, provider)
+ if os.path.isfile(cachefile):
+ # get track length from cachefile
+ args = 'soxi -d "%s"' % cachefile
+ process = await asyncio.create_subprocess_shell(args,
+ stderr=asyncio.subprocess.PIPE)
+ stdout, stderr = await process.communicate()
+ hours = stderr.split(":")[0]
+ minutes = stderr.split(":")[1]
+ seconds = stderr.split(":")[2]
+ total_chunks = hours*60*60 + minutes*60 + seconds
+ else:
+ total_chunks = int(queue_track.duration)
+ cur_chunk = 0
+
+ # report start stream of current queue index
+ self.mass.event_loop.create_task(self.mass.player.player_queue_stream_move(player_id, queue_index))
+ queue_index += 1
+ fade_in_part = b''
+
+ async for chunk in self.__get_raw_audio(track_id, provider, sample_rate):
+ cur_chunk += 1
+
+ if cur_chunk <= fade_length and not last_fadeout_data:
+ # fade-in part but this is the first track so just pass it to the final file
+ sox_proc.stdin.write(chunk)
+ await sox_proc.stdin.drain()
+ elif (cur_chunk < fade_length) and last_fadeout_data:
+ # need to have fade_length of chunks for the fade-in data
+ fade_in_part += chunk
+ elif fade_in_part and last_fadeout_data:
+ fade_in_part += chunk
+ # perform crossfade with previous fadeout samples
+ args = 'sox --ignore-length -t %s - -t %s - reverse fade t %s reverse' % (pcm_args, pcm_args, fade_length)
+ process = await asyncio.create_subprocess_shell(args,
+ stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+ last_fadeout_data, stderr = await process.communicate(last_fadeout_data)
+ LOGGER.info("Got %s bytes in memory for fade_out_part after sox" % len(last_fadeout_data))
+ args = 'sox --ignore-length -t %s - -t %s - fade t %s' % (pcm_args, pcm_args, fade_length)
+ process = await asyncio.create_subprocess_shell(args,
+ stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+ fade_in_part, stderr = await process.communicate(fade_in_part)
+ LOGGER.info("Got %s bytes in memory for fadein_part after sox" % len(fade_in_part))
+ fadeinfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
+ fadeinfile.write(fade_in_part)
+ fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
+ fadeoutfile.write(last_fadeout_data)
+ args = 'sox -m -v 1.0 -t %s %s -v 1.0 -t %s %s -t %s -' % (pcm_args, fadeoutfile.name, pcm_args, fadeinfile.name, pcm_args)
+ process = await asyncio.create_subprocess_shell(args,
+ stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
+ crossfade_part, stderr = await process.communicate(fade_in_part)
+ LOGGER.info("Got %s bytes in memory for crossfade_part after sox" % len(crossfade_part))
+ sox_proc.stdin.write(crossfade_part)
+ await sox_proc.stdin.drain()
+ fadeinfile.close()
+ fadeoutfile.close()
+ del crossfade_part
+ fade_in_part = None
+ last_fadeout_data = b''
+ elif (cur_chunk > fade_length) and (cur_chunk < (total_chunks-fade_length)):
+ # middle part of the track
+ sox_proc.stdin.write(chunk)
+ await sox_proc.stdin.drain()
+ else:
+ # fade out part
+ last_fadeout_data += chunk
+
+ LOGGER.info("Queued Streaming queue track: %s - %s" % (track_id, queue_track.name))
+
+ #wait for the queue to consume the data
+ while buffer.qsize() > 1 and not cancelled.is_set():
+ await asyncio.sleep(1)
+ if cancelled.is_set():
+ break
+ LOGGER.info("Finished Streaming queue track: %s - %s" % (track_id, queue_track.name))
+
+ # end of queue reached, pass last fadeout bits to final output
+ if last_fadeout_data:
+ sox_proc.stdin.write(last_fadeout_data)
+ await sox_proc.stdin.drain()
+ sox_proc.stdin.close()
+ await sox_proc.wait()
+ LOGGER.info("streaming of queue for player %s completed" % player_id)
+
+ async def __get_raw_audio(self, track_id, provider, sample_rate=96000):
+ ''' get raw pcm data for a track upsampled to given sample_rate packed as wav '''
+ cachefile = self.__get_track_cache_filename(track_id, provider)
+ pcm_args = 'raw -b 32 -c 2 -e signed-integer'
+ if self.mass.config['base']['http_streamer']['volume_normalisation']:
+ gain_correct = await self.__get_track_gain_correct(track_id, provider)
+ else:
+ gain_correct = -6 # always need some headroom for upsampling and crossfades
+ if os.path.isfile(cachefile):
+ # we have a cache file for this track which we can use
+ args = 'sox -t flac "%s" -t %s - vol %s dB rate -v %s' % (cachefile, pcm_args, gain_correct, sample_rate)
+ process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE)
+ else:
+ # stream from provider
+ input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
+ assert(input_content_type)
+ args = 'sox -t %s - -t %s - vol %s dB rate -v %s' % (input_content_type, pcm_args, gain_correct, sample_rate)
+ process = await asyncio.create_subprocess_shell(args,
+ stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
+ asyncio.get_event_loop().create_task(
+ self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
+ # put chunks from stdout into queue
+ chunksize = int(sample_rate * (32/8) * 2) # 1 second
+ while not process.stdout.at_eof():
+ try:
+ chunk = await process.stdout.readexactly(chunksize)
+ except asyncio.streams.IncompleteReadError:
+ chunk = await process.stdout.read(chunksize)
+ if not chunk:
+ break
+ yield chunk
+ await process.wait()
+ LOGGER.info("__get_raw_audio for track_id %s completed" % (track_id))
+
async def __get_audio_stream(self, audioqueue, track_id, provider, player_id=None, cancelled=None):
''' get audio stream from provider and apply additional effects/processing where/if needed'''
cachefile = self.__get_track_cache_filename(track_id, provider)
self.supported_musicproviders = ['http']
asyncio.ensure_future(self.__discover_chromecasts())
-
### Provider specific implementation #####
async def player_config_entries(self):
elif cmd == 'stop':
self._chromecasts[player_id].media_controller.stop()
elif cmd == 'next':
- self.mass.player._players[player_id].cur_queue_index +=1
- self._chromecasts[player_id].media_controller.queue_next()
+ enable_crossfade = self.mass.config['player_settings'][player_id]["crossfade_duration"] > 0
+ if enable_crossfade:
+ await self.__play_stream_queue(player_id, self._players[player_id].cur_queue_index+1)
+ else:
+ self._chromecasts[player_id].media_controller.queue_next()
elif cmd == 'previous':
- self.mass.player._players[player_id].cur_queue_index -=1
- self._chromecasts[player_id].media_controller.queue_prev()
+ if enable_crossfade:
+ await self.__play_stream_queue(player_id, self._players[player_id].cur_queue_index-1)
+ else:
+ self._chromecasts[player_id].media_controller.queue_prev()
elif cmd == 'power' and cmd_args == 'off':
self._players[player_id].powered = False
self._chromecasts[player_id].media_controller.stop() # power is not supported so send stop instead
'''
castplayer = self._chromecasts[player_id]
cur_queue_index = await self.__get_cur_queue_index(player_id)
+ enable_crossfade = self.mass.config['player_settings'][player_id]["crossfade_duration"] > 0
if queue_opt == 'replace' or not self._player_queue[player_id]:
# overwrite queue with new items
self._player_queue[player_id] = media_items
- await self.__queue_load(player_id, self._player_queue[player_id], 0)
+ if enable_crossfade:
+ await self.__play_stream_queue(player_id, cur_queue_index)
+ else:
+ await self.__queue_load(player_id, self._player_queue[player_id], 0)
elif queue_opt == 'play':
# replace current item with new item(s)
self._player_queue[player_id] = self._player_queue[player_id][:cur_queue_index] + media_items + self._player_queue[player_id][cur_queue_index+1:]
- await self.__queue_load(player_id, self._player_queue[player_id], cur_queue_index)
+ if enable_crossfade:
+ await self.__play_stream_queue(player_id, cur_queue_index)
+ else:
+ await self.__queue_load(player_id, self._player_queue[player_id], cur_queue_index)
elif queue_opt == 'next':
# insert new items at current index +1
if len(self._player_queue[player_id]) > cur_queue_index+1:
else:
old_next_uri = None
self._player_queue[player_id] = self._player_queue[player_id][:cur_queue_index+1] + media_items + self._player_queue[player_id][cur_queue_index+1:]
- # find out the itemID of the next item in CC queue
- insert_at_item_id = None
- if old_next_uri:
- for item in castplayer.media_controller.queue_items:
- if item['media']['contentId'] == old_next_uri:
- insert_at_item_id = item['itemId']
- await self.__queue_insert(player_id, media_items, insert_at_item_id)
+ if not enable_crossfade:
+ # find out the itemID of the next item in CC queue
+ insert_at_item_id = None
+ if old_next_uri:
+ for item in castplayer.media_controller.queue_items:
+ if item['media']['contentId'] == old_next_uri:
+ insert_at_item_id = item['itemId']
+ await self.__queue_insert(player_id, media_items, insert_at_item_id)
elif queue_opt == 'add':
# add new items at end of queue
self._player_queue[player_id] = self._player_queue[player_id] + media_items
- await self.__queue_insert(player_id, media_items)
+ if not enable_crossfade:
+ await self.__queue_insert(player_id, media_items)
+
+ async def player_queue_stream_move(self, player_id, new_index):
+ ''' called by the queue streamer when it's loading a new track '''
+ self._players[player_id].cur_queue_index = new_index
+ # trigger update
+ chromecast = self._chromecasts[player_id]
+ mediastatus = chromecast.media_controller.status
+ await self.__handle_player_state(chromecast, mediastatus=mediastatus)
+ LOGGER.info("player_queue_stream_move")
### Provider specific (helper) methods #####
async def __get_cur_queue_index(self, player_id):
''' retrieve index of current item in the player queue '''
enable_crossfade = self.mass.config['player_settings'][player_id]["crossfade_duration"] > 0
- if enable_crossfade and player_id in self.mass.player._players:
- return self.mass.player._players[player_id].cur_queue_index
+ if enable_crossfade:
+ return self._players[player_id].cur_queue_index
cur_index = 0
for index, track in enumerate(self._player_queue[player_id]):
if track.uri == self._chromecasts[player_id].media_controller.status.content_id:
''' load queue on player with given queue items '''
castplayer = self._chromecasts[player_id]
player = self._players[player_id]
- queue_items = await self.__create_queue_items(new_tracks[:50], player_id=player_id)
+ queue_items = await self.__create_queue_items(new_tracks[:50])
self.mass.player._players[player_id].cur_queue_index = 0
queuedata = {
"type": 'QUEUE_LOAD',
await self.__queue_insert(player_id, new_tracks[51:])
await asyncio.sleep(0.2)
+ async def __play_stream_queue(self, player_id, startindex=0):
+ ''' tell the cast player to stream our special queue (crossfaded) stream '''
+ castplayer = self._chromecasts[player_id]
+ uri = 'http://%s:%s/stream_queue?player_id=%s&startindex=%s'% (
+ self.mass.player.local_ip, self.mass.config['base']['web']['http_port'], player_id, startindex)
+ castplayer.play_media(uri, 'audio/flac')
+
async def __queue_insert(self, player_id, new_tracks, insert_before=None):
''' insert item into the player queue '''
castplayer = self._chromecasts[player_id]
- queue_items = await self.__create_queue_items(new_tracks, player_id=player_id)
+ queue_items = await self.__create_queue_items(new_tracks)
for chunk in chunks(queue_items, 50):
queuedata = {
"type": 'QUEUE_INSERT',
async def __resume_queue(self, player_id):
''' resume queue play after power off '''
- queue_index = await self.__get_cur_queue_index(player_id)
- LOGGER.info('resume queue at index %s' % queue_index)
+ LOGGER.info('resuming queue....')
tracks = self._player_queue[player_id]
- await self.__queue_load(player_id, tracks, queue_index)
+ await self.play_media(player_id, tracks)
- async def __create_queue_items(self, tracks, player_id):
+ async def __create_queue_items(self, tracks):
''' create list of CC queue items from tracks '''
queue_items = []
for track in tracks:
- queue_item = await self.__create_queue_item(track, player_id)
+ queue_item = await self.__create_queue_item(track)
queue_items.append(queue_item)
return queue_items
- async def __create_queue_item(self, track, player_id):
+ async def __create_queue_item(self, track):
'''create queue item from track info '''
- enable_crossfade = self.mass.config['player_settings'][player_id]["crossfade_duration"] > 0
- if enable_crossfade:
- uri = 'http://%s:%s/stream_queue?player_id=%s'% (self.mass.player.local_ip, self.mass.config['base']['web']['http_port'], player_id)
- else:
- uri = track.uri
return {
'autoplay' : True,
'preloadTime' : 10,
'startTime' : 0,
'activeTrackIds' : [],
'media': {
- 'contentId': uri,
+ 'contentId': track.uri,
'customData': {
'provider': track.provider,
'uri': track.uri,
player.state = PlayerState.Paused
else:
player.state = PlayerState.Stopped
- player.cur_item = await self.__parse_track(mediastatus)
- player.cur_item_time = chromecast.media_controller.status.adjusted_current_time
- player.cur_queue_index = await self.__get_cur_queue_index(player_id)
+ if not 'stream_queue' in mediastatus.content_id:
+ player.cur_item = await self.__parse_track(mediastatus)
+ player.cur_item_time = mediastatus.adjusted_current_time
+ player.cur_queue_index = await self.__get_cur_queue_index(player_id)
+ else:
+ # try to work out the current time
+ # player is playing a constant stream of the queue so we need to do this the hard way
+ cur_queue_index = player.cur_queue_index
+ player.cur_item = self._player_queue[player_id][cur_queue_index]
+ cur_time = mediastatus.adjusted_current_time
+ while cur_time > player.cur_item.duration:
+ cur_queue_index -=1
+ prev_track = self._player_queue[player_id][cur_queue_index]
+ cur_time -= prev_track.duration
+ player.cur_item_time = cur_time
await self.mass.player.update_player(player)
async def __parse_track(self, mediastatus):
track_id = params['track_id'][0]
provider = params['provider'][0]
track = await self.mass.music.providers[provider].track(track_id)
- elif uri.startswith('http') and '/stream_queue' in uri:
- params = urllib.parse.parse_qs(uri.split('?')[1])
- player_id = params['player_id'][0]
- queue_index = await self.__get_cur_queue_index(player_id)
- track = self._player_queue[player_id][queue_index]
return track
async def __handle_group_members_update(self, mz, added_player=None, removed_player=None):