from utils import run_periodic, LOGGER, try_parse_int, try_parse_float, get_ip, run_async_background_task
import aiohttp
from difflib import SequenceMatcher as Matcher
-from models import MediaType, PlayerState, MusicPlayer
+from models import MediaType, PlayerState, MusicPlayer, TrackQuality
from typing import List
import toolz
import operator
("apply_group_power", False, "player_group_pow"),
("play_power_on", False, "player_power_play"),
("sox_effects", '', "http_streamer_sox_effects"),
+ ("max_sample_rate", '96000', "max_sample_rate"),
("force_http_streamer", False, "force_http_streamer")
]
# config for the http streamer
''' get audio stream from provider and apply additional effects/processing where/if needed'''
input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
cachefile = self.__get_track_cache_filename(track_id, provider)
- sox_effects = []
+ sox_effects = ''
+ # sox settings
if self.mass.config['base']['http_streamer']['volume_normalisation']:
gain_correct = await self.__get_track_gain_correct(track_id, provider)
LOGGER.info("apply gain correction of %s" % gain_correct)
- sox_effects += ['vol', '%s dB' % gain_correct]
- if player_id and self.mass.config['player_settings'][player_id]['sox_effects']:
- sox_effects += self.mass.config['player_settings'][player_id]['sox_effects'].split('/')
+ sox_effects += ' vol %s dB ' % gain_correct
+ sox_effects += await self.__get_player_sox_options(track_id, provider, player_id)
if os.path.isfile(cachefile):
# we have a cache file for this track which we can use
- args = ['-t', 'flac', cachefile, '-t', 'flac', '-', *sox_effects]
- process = await asyncio.create_subprocess_exec('sox', *args,
+ args = 'sox -t flac %s -t flac -C 0 - %s' % (cachefile, sox_effects)
+ LOGGER.info("Running sox with args: %s" % args)
+ process = await asyncio.create_subprocess_shell(args,
stdout=asyncio.subprocess.PIPE)
buffer_task = None
else:
# stream from provider
- args = ['-t', input_content_type, '-', '-t', 'flac', '-', *sox_effects]
- process = await asyncio.create_subprocess_exec('sox', *args,
+ args = 'sox -t %s - -t flac -C 0 - %s' % (input_content_type, sox_effects)
+ LOGGER.info("Running sox with args: %s" % args)
+ process = await asyncio.create_subprocess_shell(args,
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
buffer_task = asyncio.create_task(
self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
await process.wait()
LOGGER.info("streaming of track_id %s completed" % track_id)
+ async def __get_player_sox_options(self, track_id, provider, player_id):
+ ''' get player specific sox options '''
+ sox_effects = ' '
+ if not player_id:
+ return ''
+ if self.mass.config['player_settings'][player_id]['max_sample_rate']:
+ # downsample if needed
+ max_sample_rate = try_parse_int(self.mass.config['player_settings'][player_id]['max_sample_rate'])
+ if max_sample_rate:
+ quality = TrackQuality.LOSSY_MP3
+ track = await self.mass.music.track(track_id, provider)
+ for item in track.provider_ids:
+ if item['provider'] == provider and item['item_id'] == track_id:
+ quality = item['quality']
+ break
+ if quality > TrackQuality.FLAC_LOSSLESS_HI_RES_3 and max_sample_rate == 192000:
+ sox_effects += 'rate -v 192000'
+ elif quality > TrackQuality.FLAC_LOSSLESS_HI_RES_2 and max_sample_rate == 96000:
+ sox_effects += 'rate -v 96000'
+ elif quality > TrackQuality.FLAC_LOSSLESS_HI_RES_1 and max_sample_rate == 48000:
+ sox_effects += 'rate -v 48000'
+ if self.mass.config['player_settings'][player_id]['sox_effects']:
+ sox_effects += self.mass.config['player_settings'][player_id]['sox_effects']
+ return sox_effects + ' '
+
async def __analyze_audio(self, tmpfile, track_id, provider, content_type):
''' analyze track audio, for now we only calculate EBU R128 loudness '''
LOGGER.info('Start analyzing file %s' % tmpfile)
self.mass = mass
self._players = {}
self._chromecasts = {}
+ self._player_queue = {}
self.supported_musicproviders = ['http']
self.http_session = aiohttp.ClientSession(loop=mass.event_loop)
asyncio.ensure_future(self.__discover_chromecasts())
async def player_command(self, player_id, cmd:str, cmd_args=None):
''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
if cmd == 'play':
- if self._chromecasts[player_id].media_controller.status.media_session_id:
+ if self._chromecasts[player_id].media_controller.status.player_is_paused:
self._chromecasts[player_id].media_controller.play()
else:
await self.__resume_queue(player_id)
self._chromecasts[player_id].set_volume_muted(True)
async def player_queue(self, player_id, offset=0, limit=50):
- ''' return the items in the player's queue '''
- items = []
- for item in self._chromecasts[player_id].queue[offset:limit]:
- track = await self.__track_from_uri(item['media']['contentId'])
- if track:
- items.append(track)
- return items
-
- async def create_queue_item(self, track):
- '''create queue item from track info '''
- return {
- 'autoplay' : True,
- 'preloadTime' : 10,
- 'playbackDuration': int(track.duration),
- 'startTime' : 0,
- 'activeTrackIds' : [],
- 'media': {
- 'contentId': track.uri,
- 'customData': {'provider': track.provider},
- 'contentType': "audio/flac",
- 'streamType': 'BUFFERED',
- 'metadata': {
- 'title': track.name,
- 'artist': track.artists[0].name,
- },
- 'duration': int(track.duration)
- }
- }
+ ''' return the current items in the player's queue '''
+ return self._player_queue[player_id][offset:limit]
async def play_media(self, player_id, media_items, queue_opt='play'):
'''
play media on a player
'''
castplayer = self._chromecasts[player_id]
- player = self._players[player_id]
- media_controller = castplayer.media_controller
- receiver_ctrl = media_controller._socket_client.receiver_controller
- cur_queue_index = 0
- if media_controller.queue_cur_id != None:
- for item in media_controller.queue_items:
- # status queue may contain at max 3 tracks (previous, current and next)
- if item['itemId'] == media_controller.queue_cur_id:
- cur_queue_item = item
- # find out the current index
- for counter, value in enumerate(castplayer.queue):
- if value['media']['contentId'] == cur_queue_item['media']['contentId']:
- cur_queue_index = counter
- break
- break
- if (not media_controller.queue_cur_id or not media_controller.status.media_session_id or not castplayer.queue):
- queue_opt = 'replace'
-
- new_queue_items = []
- for track in media_items:
- queue_item = await self.create_queue_item(track)
- new_queue_items.append(queue_item)
-
- if (queue_opt in ['replace', 'play'] or not media_controller.queue_cur_id or
- not media_controller.status.media_session_id or not castplayer.queue):
- # load new Chromecast queue with items
- if queue_opt == 'add':
- # append items to queue
- castplayer.queue = castplayer.queue + new_queue_items
- startindex = cur_queue_index
- elif queue_opt == 'play':
- # keep current queue but append new items at begin and start playing first item
- castplayer.queue = new_queue_items + castplayer.queue[cur_queue_index:] + castplayer.queue[:cur_queue_index]
- startindex = 0
- elif queue_opt == 'next':
- # play the new items after the current playing item (insert before current next item)
- castplayer.queue = new_queue_items + castplayer.queue[cur_queue_index:] + castplayer.queue[:cur_queue_index]
- startindex = cur_queue_index
+ cur_queue_index = await self.__get_cur_queue_index(player_id)
+
+ if queue_opt == 'replace' or not self._player_queue[player_id]:
+ # overwrite queue with new items
+ self._player_queue[player_id] = media_items
+ await self.__queue_load(player_id, self._player_queue[player_id], 0)
+ elif queue_opt == 'play':
+ # replace current item with new item(s)
+ self._player_queue[player_id] = self._player_queue[player_id][:cur_queue_index] + media_items + self._player_queue[player_id][cur_queue_index+1:]
+ await self.__queue_load(player_id, self._player_queue[player_id], cur_queue_index)
+ elif queue_opt == 'next':
+ # insert new items at current index +1
+ if len(self._player_queue[player_id]) > cur_queue_index:
+ old_next_uri = self._player_queue[player_id][cur_queue_index+1].uri
else:
- # overwrite the whole queue with new item(s)
- castplayer.queue = new_queue_items
- startindex = 0
- # load first 10 items as soon as possible
- queuedata = {
- "type": 'QUEUE_LOAD',
- "repeatMode": "REPEAT_ALL" if player.repeat_enabled else "REPEAT_OFF",
- "shuffle": player.shuffle_enabled,
- "queueType": "PLAYLIST",
- "startIndex": startindex, # Item index to play after this request or keep same item if undefined
- "items": castplayer.queue[:10]
- }
- await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
- await asyncio.sleep(1)
- # append the rest of the items in the queue in chunks
- for chunk in chunks(castplayer.queue[10:], 100):
- queuedata = { "type": 'QUEUE_INSERT', "items": chunk }
- await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
- await asyncio.sleep(0.1)
+ old_next_uri = None
+ self._player_queue[player_id] = self._player_queue[player_id][:cur_queue_index+1] + media_items + self._player_queue[player_id][cur_queue_index+1:]
+ # find out the itemID of the next item in CC queue
+ insert_at_item_id = None
+ if old_next_uri:
+ for item in castplayer.media_controller.queue_items:
+ if item['media']['contentId'] == old_next_uri:
+ insert_at_item_id = item['itemId']
+ await self.__queue_insert(player_id, media_items, insert_at_item_id)
elif queue_opt == 'add':
- # existing queue is playing: simply append items to the end of the queue (in small chunks)
- castplayer.queue = castplayer.queue + new_queue_items
- insertbefore = None
- for chunk in chunks(new_queue_items, 100):
- queuedata = { "type": 'QUEUE_INSERT', "items": chunk }
- await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
- await asyncio.sleep(0.1)
- elif queue_opt == 'next':
- # play the new items after the current playing item (insert before current next item)
- player.queue = castplayer.queue[:cur_queue_index] + new_queue_items + castplayer.queue[cur_queue_index:]
- queuedata = {
- "type": 'QUEUE_INSERT',
- "insertBefore": media_controller.queue_cur_id+1,
- "items": new_queue_items[:200] # limit of the queue message
- }
- await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
-
+ # add new items at end of queue
+ self._player_queue[player_id] = self._player_queue[player_id] + media_items
+ await self.__queue_insert(player_id, media_items)
+
### Provider specific (helper) methods #####
- async def __resume_queue(self, player_id):
- ''' resume queue play after power off '''
- player = self._players[player_id]
+ async def __get_cur_queue_index(self, player_id):
+ ''' retrieve index of current item in the player queue '''
+ cur_index = 0
+ for index, track in enumerate(self._player_queue[player_id]):
+ if track.uri == self._chromecasts[player_id].media_controller.status.content_id:
+ cur_index = index
+ break
+ return cur_index
+
+ async def __queue_load(self, player_id, new_tracks, startindex=None):
+ ''' load queue on player with given queue items '''
castplayer = self._chromecasts[player_id]
+ player = self._players[player_id]
media_controller = castplayer.media_controller
receiver_ctrl = media_controller._socket_client.receiver_controller
- startindex = 0
- if player.cur_item and player.cur_item.name:
- for index, item in enumerate(castplayer.queue):
- if item['media']['metadata']['title'] == player.cur_item.name:
- startindex = index
- break
+ queue_items = await self.__create_queue_items(new_tracks[:50])
queuedata = {
"type": 'QUEUE_LOAD',
"repeatMode": "REPEAT_ALL" if player.repeat_enabled else "REPEAT_OFF",
"shuffle": player.shuffle_enabled,
"queueType": "PLAYLIST",
"startIndex": startindex, # Item index to play after this request or keep same item if undefined
- "items": castplayer.queue[:10]
+ "items": queue_items # only load 50 tracks at once or the socket will crash
}
await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
- await asyncio.sleep(1)
- # append the rest of the items in the queue in chunks
- for chunk in chunks(castplayer.queue[10:], 100):
- await asyncio.sleep(0.1)
- queuedata = { "type": 'QUEUE_INSERT', "items": chunk }
+ if len(new_tracks) > 50:
+ await self.__queue_insert(player_id, new_tracks[51:])
+
+ async def __queue_insert(self, player_id, new_tracks, insert_before=None):
+ ''' insert item into the player queue '''
+ castplayer = self._chromecasts[player_id]
+ queue_items = await self.__create_queue_items(new_tracks)
+ media_controller = castplayer.media_controller
+ receiver_ctrl = media_controller._socket_client.receiver_controller
+ for chunk in chunks(queue_items, 50):
+ queuedata = {
+ "type": 'QUEUE_INSERT',
+ "insertBefore": insert_before,
+ "items": chunk
+ }
await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
+
+ async def __queue_update(self, player_id, queue_items_to_update):
+ ''' update the cast player queue '''
+ castplayer = self._chromecasts[player_id]
+ media_controller = castplayer.media_controller
+ receiver_ctrl = media_controller._socket_client.receiver_controller
+ queuedata = {
+ "type": 'QUEUE_UPDATE',
+ "items": queue_items_to_update
+ }
+ await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
+
+ async def __queue_remove(self, player_id, queue_item_ids):
+ ''' remove items from the cast player queue '''
+ media_controller = self._chromecasts[player_id].media_controller
+ receiver_ctrl = media_controller._socket_client.receiver_controller
+ queuedata = {
+ "type": 'QUEUE_REMOVE',
+ "items": queue_item_ids
+ }
+ await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
+
+ async def __resume_queue(self, player_id):
+ ''' resume queue play after power off '''
+
+ player = self._players[player_id]
+ queue_index = await self.__get_cur_queue_index(player_id)
+ print('resume queue at index %s' % queue_index)
+ tracks = self._player_queue[player_id]
+ await self.__queue_load(player_id, tracks, queue_index)
+
+ async def __create_queue_items(self, tracks):
+ ''' create list of CC queue items from tracks '''
+ queue_items = []
+ for track in tracks:
+ queue_item = await self.__create_queue_item(track)
+ queue_items.append(queue_item)
+ return queue_items
+
+ async def __create_queue_item(self, track):
+ '''create queue item from track info '''
+ return {
+ 'autoplay' : True,
+ 'preloadTime' : 10,
+ 'playbackDuration': int(track.duration),
+ 'startTime' : 0,
+ 'activeTrackIds' : [],
+ 'media': {
+ 'contentId': track.uri,
+ 'customData': {
+ 'provider': track.provider,
+ 'uri': track.uri,
+ 'item_id': track.item_id
+ },
+ 'contentType': "audio/flac",
+ 'streamType': 'BUFFERED',
+ 'metadata': {
+ 'title': track.name,
+ 'artist': track.artists[0].name,
+ },
+ 'duration': int(track.duration)
+ }
+ }
async def __send_player_queue(self, receiver_ctrl, media_controller, queuedata):
'''send new data to the CC queue'''
def app_launched_callback():
- LOGGER.info("app_launched_callback")
"""Plays media after chromecast has switched to requested app."""
queuedata['mediaSessionId'] = media_controller.status.media_session_id
- LOGGER.info('')
- LOGGER.info('')
media_controller.send_message(queuedata, inc_session_id=False)
receiver_ctrl.launch_app(media_controller.app_id,
callback_function=app_launched_callback)
player.powered = True
elif mediastatus.player_state == 'PAUSED':
player.state = PlayerState.Paused
- player.powered = True
+ player.powered = not chromecast.is_idle
else:
player.state = PlayerState.Stopped
player.powered = player.powered
chromecasts = await asyncio.gather(bg_task)
for chromecast in chromecasts[0]:
player_id = str(chromecast.uuid)
- if not player_id in self._players:
+ ip_change = False
+ if player_id in self._chromecasts and chromecast.uri != self._chromecasts[player_id].uri:
+ LOGGER.warning('Chromecast uri changed ?! - old: %s - new: %s' %(self._chromecasts[player_id].uri, chromecast.uri))
+ ip_change = True
+ if not player_id in self._players or ip_change:
player = MusicPlayer()
player.player_id = player_id
player.name = chromecast.name
player.player_provider = self.prov_id
- chromecast.start()
# patch the receive message method for handling queue status updates
chromecast.queue = []
chromecast.media_controller.queue_items = []
mz.register_listener(MZListener(mz, self.__handle_group_members_update, self.mass.event_loop))
chromecast.register_handler(mz)
chromecast.register_connection_listener(MZConnListener(mz))
- chromecast.wait()
self._chromecasts[player_id] = chromecast
self._players[player_id] = player
+ if not player_id in self._player_queue:
+ # TODO: persistant storage of player queue ?
+ self._player_queue[player_id] = []
+ chromecast.wait()
LOGGER.info('Chromecast discovery done...')
def chunks(l, n):