From 3fb8421309553e13ef092a72b25907eae207caad Mon Sep 17 00:00:00 2001 From: marcelveldt Date: Tue, 21 May 2019 15:24:31 +0200 Subject: [PATCH] fix chromecast streaming add sox options and downsampling --- music_assistant/models.py | 2 +- .../modules/musicproviders/qobuz.py | 2 +- music_assistant/modules/player.py | 46 +++- .../modules/playerproviders/chromecast.py | 258 +++++++++--------- music_assistant/modules/web.py | 31 ++- 5 files changed, 189 insertions(+), 150 deletions(-) diff --git a/music_assistant/models.py b/music_assistant/models.py index 5f0a5348..a033720d 100755 --- a/music_assistant/models.py +++ b/music_assistant/models.py @@ -45,7 +45,7 @@ class TrackQuality(IntEnum): LOSSY_OGG = 1 LOSSY_AAC = 2 FLAC_LOSSLESS = 6 # 44.1/48khz 16 bits HI-RES - FLAC_LOSSLES_HI_RES_1 = 7 # 44.1/48khz 24 bits HI-RES + FLAC_LOSSLESS_HI_RES_1 = 7 # 44.1/48khz 24 bits HI-RES FLAC_LOSSLESS_HI_RES_2 = 8 # 88.2/96khz 24 bits HI-RES FLAC_LOSSLESS_HI_RES_3 = 9 # 176/192khz 24 bits HI-RES FLAC_LOSSLESS_HI_RES_4 = 10 # above 192khz 24 bits HI-RES diff --git a/music_assistant/modules/musicproviders/qobuz.py b/music_assistant/modules/musicproviders/qobuz.py index fc12348f..51ec9635 100644 --- a/music_assistant/modules/musicproviders/qobuz.py +++ b/music_assistant/modules/musicproviders/qobuz.py @@ -400,7 +400,7 @@ class QobuzProvider(MusicProvider): elif track_obj['maximum_sampling_rate'] > 48: quality = TrackQuality.FLAC_LOSSLESS_HI_RES_2 elif track_obj['maximum_bit_depth'] > 16: - quality = TrackQuality.FLAC_LOSSLES_HI_RES_1 + quality = TrackQuality.FLAC_LOSSLESS_HI_RES_1 elif track_obj.get('format_id',0) == 5: quality = TrackQuality.LOSSY_AAC else: diff --git a/music_assistant/modules/player.py b/music_assistant/modules/player.py index 1ec7739a..87529ff4 100755 --- a/music_assistant/modules/player.py +++ b/music_assistant/modules/player.py @@ -6,7 +6,7 @@ import os from utils import run_periodic, LOGGER, try_parse_int, try_parse_float, get_ip, run_async_background_task import aiohttp from difflib import SequenceMatcher as Matcher -from models import MediaType, PlayerState, MusicPlayer +from models import MediaType, PlayerState, MusicPlayer, TrackQuality from typing import List import toolz import operator @@ -57,6 +57,7 @@ class Player(): ("apply_group_power", False, "player_group_pow"), ("play_power_on", False, "player_power_play"), ("sox_effects", '', "http_streamer_sox_effects"), + ("max_sample_rate", '96000', "max_sample_rate"), ("force_http_streamer", False, "force_http_streamer") ] # config for the http streamer @@ -370,23 +371,25 @@ class Player(): ''' get audio stream from provider and apply additional effects/processing where/if needed''' input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id) cachefile = self.__get_track_cache_filename(track_id, provider) - sox_effects = [] + sox_effects = '' + # sox settings if self.mass.config['base']['http_streamer']['volume_normalisation']: gain_correct = await self.__get_track_gain_correct(track_id, provider) LOGGER.info("apply gain correction of %s" % gain_correct) - sox_effects += ['vol', '%s dB' % gain_correct] - if player_id and self.mass.config['player_settings'][player_id]['sox_effects']: - sox_effects += self.mass.config['player_settings'][player_id]['sox_effects'].split('/') + sox_effects += ' vol %s dB ' % gain_correct + sox_effects += await self.__get_player_sox_options(track_id, provider, player_id) if os.path.isfile(cachefile): # we have a cache file for this track which we can use - args = ['-t', 'flac', cachefile, '-t', 'flac', '-', *sox_effects] - process = await asyncio.create_subprocess_exec('sox', *args, + args = 'sox -t flac %s -t flac -C 0 - %s' % (cachefile, sox_effects) + LOGGER.info("Running sox with args: %s" % args) + process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE) buffer_task = None else: # stream from provider - args = ['-t', input_content_type, '-', '-t', 'flac', '-', *sox_effects] - process = await asyncio.create_subprocess_exec('sox', *args, + args = 'sox -t %s - -t flac -C 0 - %s' % (input_content_type, sox_effects) + LOGGER.info("Running sox with args: %s" % args) + process = await asyncio.create_subprocess_shell(args, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) buffer_task = asyncio.create_task( self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type)) @@ -399,6 +402,31 @@ class Player(): await process.wait() LOGGER.info("streaming of track_id %s completed" % track_id) + async def __get_player_sox_options(self, track_id, provider, player_id): + ''' get player specific sox options ''' + sox_effects = ' ' + if not player_id: + return '' + if self.mass.config['player_settings'][player_id]['max_sample_rate']: + # downsample if needed + max_sample_rate = try_parse_int(self.mass.config['player_settings'][player_id]['max_sample_rate']) + if max_sample_rate: + quality = TrackQuality.LOSSY_MP3 + track = await self.mass.music.track(track_id, provider) + for item in track.provider_ids: + if item['provider'] == provider and item['item_id'] == track_id: + quality = item['quality'] + break + if quality > TrackQuality.FLAC_LOSSLESS_HI_RES_3 and max_sample_rate == 192000: + sox_effects += 'rate -v 192000' + elif quality > TrackQuality.FLAC_LOSSLESS_HI_RES_2 and max_sample_rate == 96000: + sox_effects += 'rate -v 96000' + elif quality > TrackQuality.FLAC_LOSSLESS_HI_RES_1 and max_sample_rate == 48000: + sox_effects += 'rate -v 48000' + if self.mass.config['player_settings'][player_id]['sox_effects']: + sox_effects += self.mass.config['player_settings'][player_id]['sox_effects'] + return sox_effects + ' ' + async def __analyze_audio(self, tmpfile, track_id, provider, content_type): ''' analyze track audio, for now we only calculate EBU R128 loudness ''' LOGGER.info('Start analyzing file %s' % tmpfile) diff --git a/music_assistant/modules/playerproviders/chromecast.py b/music_assistant/modules/playerproviders/chromecast.py index 5de5c0a8..c53d8e48 100644 --- a/music_assistant/modules/playerproviders/chromecast.py +++ b/music_assistant/modules/playerproviders/chromecast.py @@ -50,6 +50,7 @@ class ChromecastProvider(PlayerProvider): self.mass = mass self._players = {} self._chromecasts = {} + self._player_queue = {} self.supported_musicproviders = ['http'] self.http_session = aiohttp.ClientSession(loop=mass.event_loop) asyncio.ensure_future(self.__discover_chromecasts()) @@ -60,7 +61,7 @@ class ChromecastProvider(PlayerProvider): async def player_command(self, player_id, cmd:str, cmd_args=None): ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) ''' if cmd == 'play': - if self._chromecasts[player_id].media_controller.status.media_session_id: + if self._chromecasts[player_id].media_controller.status.player_is_paused: self._chromecasts[player_id].media_controller.play() else: await self.__resume_queue(player_id) @@ -87,154 +88,155 @@ class ChromecastProvider(PlayerProvider): self._chromecasts[player_id].set_volume_muted(True) async def player_queue(self, player_id, offset=0, limit=50): - ''' return the items in the player's queue ''' - items = [] - for item in self._chromecasts[player_id].queue[offset:limit]: - track = await self.__track_from_uri(item['media']['contentId']) - if track: - items.append(track) - return items - - async def create_queue_item(self, track): - '''create queue item from track info ''' - return { - 'autoplay' : True, - 'preloadTime' : 10, - 'playbackDuration': int(track.duration), - 'startTime' : 0, - 'activeTrackIds' : [], - 'media': { - 'contentId': track.uri, - 'customData': {'provider': track.provider}, - 'contentType': "audio/flac", - 'streamType': 'BUFFERED', - 'metadata': { - 'title': track.name, - 'artist': track.artists[0].name, - }, - 'duration': int(track.duration) - } - } + ''' return the current items in the player's queue ''' + return self._player_queue[player_id][offset:limit] async def play_media(self, player_id, media_items, queue_opt='play'): ''' play media on a player ''' castplayer = self._chromecasts[player_id] - player = self._players[player_id] - media_controller = castplayer.media_controller - receiver_ctrl = media_controller._socket_client.receiver_controller - cur_queue_index = 0 - if media_controller.queue_cur_id != None: - for item in media_controller.queue_items: - # status queue may contain at max 3 tracks (previous, current and next) - if item['itemId'] == media_controller.queue_cur_id: - cur_queue_item = item - # find out the current index - for counter, value in enumerate(castplayer.queue): - if value['media']['contentId'] == cur_queue_item['media']['contentId']: - cur_queue_index = counter - break - break - if (not media_controller.queue_cur_id or not media_controller.status.media_session_id or not castplayer.queue): - queue_opt = 'replace' - - new_queue_items = [] - for track in media_items: - queue_item = await self.create_queue_item(track) - new_queue_items.append(queue_item) - - if (queue_opt in ['replace', 'play'] or not media_controller.queue_cur_id or - not media_controller.status.media_session_id or not castplayer.queue): - # load new Chromecast queue with items - if queue_opt == 'add': - # append items to queue - castplayer.queue = castplayer.queue + new_queue_items - startindex = cur_queue_index - elif queue_opt == 'play': - # keep current queue but append new items at begin and start playing first item - castplayer.queue = new_queue_items + castplayer.queue[cur_queue_index:] + castplayer.queue[:cur_queue_index] - startindex = 0 - elif queue_opt == 'next': - # play the new items after the current playing item (insert before current next item) - castplayer.queue = new_queue_items + castplayer.queue[cur_queue_index:] + castplayer.queue[:cur_queue_index] - startindex = cur_queue_index + cur_queue_index = await self.__get_cur_queue_index(player_id) + + if queue_opt == 'replace' or not self._player_queue[player_id]: + # overwrite queue with new items + self._player_queue[player_id] = media_items + await self.__queue_load(player_id, self._player_queue[player_id], 0) + elif queue_opt == 'play': + # replace current item with new item(s) + self._player_queue[player_id] = self._player_queue[player_id][:cur_queue_index] + media_items + self._player_queue[player_id][cur_queue_index+1:] + await self.__queue_load(player_id, self._player_queue[player_id], cur_queue_index) + elif queue_opt == 'next': + # insert new items at current index +1 + if len(self._player_queue[player_id]) > cur_queue_index: + old_next_uri = self._player_queue[player_id][cur_queue_index+1].uri else: - # overwrite the whole queue with new item(s) - castplayer.queue = new_queue_items - startindex = 0 - # load first 10 items as soon as possible - queuedata = { - "type": 'QUEUE_LOAD', - "repeatMode": "REPEAT_ALL" if player.repeat_enabled else "REPEAT_OFF", - "shuffle": player.shuffle_enabled, - "queueType": "PLAYLIST", - "startIndex": startindex, # Item index to play after this request or keep same item if undefined - "items": castplayer.queue[:10] - } - await self.__send_player_queue(receiver_ctrl, media_controller, queuedata) - await asyncio.sleep(1) - # append the rest of the items in the queue in chunks - for chunk in chunks(castplayer.queue[10:], 100): - queuedata = { "type": 'QUEUE_INSERT', "items": chunk } - await self.__send_player_queue(receiver_ctrl, media_controller, queuedata) - await asyncio.sleep(0.1) + old_next_uri = None + self._player_queue[player_id] = self._player_queue[player_id][:cur_queue_index+1] + media_items + self._player_queue[player_id][cur_queue_index+1:] + # find out the itemID of the next item in CC queue + insert_at_item_id = None + if old_next_uri: + for item in castplayer.media_controller.queue_items: + if item['media']['contentId'] == old_next_uri: + insert_at_item_id = item['itemId'] + await self.__queue_insert(player_id, media_items, insert_at_item_id) elif queue_opt == 'add': - # existing queue is playing: simply append items to the end of the queue (in small chunks) - castplayer.queue = castplayer.queue + new_queue_items - insertbefore = None - for chunk in chunks(new_queue_items, 100): - queuedata = { "type": 'QUEUE_INSERT', "items": chunk } - await self.__send_player_queue(receiver_ctrl, media_controller, queuedata) - await asyncio.sleep(0.1) - elif queue_opt == 'next': - # play the new items after the current playing item (insert before current next item) - player.queue = castplayer.queue[:cur_queue_index] + new_queue_items + castplayer.queue[cur_queue_index:] - queuedata = { - "type": 'QUEUE_INSERT', - "insertBefore": media_controller.queue_cur_id+1, - "items": new_queue_items[:200] # limit of the queue message - } - await self.__send_player_queue(receiver_ctrl, media_controller, queuedata) - + # add new items at end of queue + self._player_queue[player_id] = self._player_queue[player_id] + media_items + await self.__queue_insert(player_id, media_items) + ### Provider specific (helper) methods ##### - async def __resume_queue(self, player_id): - ''' resume queue play after power off ''' - player = self._players[player_id] + async def __get_cur_queue_index(self, player_id): + ''' retrieve index of current item in the player queue ''' + cur_index = 0 + for index, track in enumerate(self._player_queue[player_id]): + if track.uri == self._chromecasts[player_id].media_controller.status.content_id: + cur_index = index + break + return cur_index + + async def __queue_load(self, player_id, new_tracks, startindex=None): + ''' load queue on player with given queue items ''' castplayer = self._chromecasts[player_id] + player = self._players[player_id] media_controller = castplayer.media_controller receiver_ctrl = media_controller._socket_client.receiver_controller - startindex = 0 - if player.cur_item and player.cur_item.name: - for index, item in enumerate(castplayer.queue): - if item['media']['metadata']['title'] == player.cur_item.name: - startindex = index - break + queue_items = await self.__create_queue_items(new_tracks[:50]) queuedata = { "type": 'QUEUE_LOAD', "repeatMode": "REPEAT_ALL" if player.repeat_enabled else "REPEAT_OFF", "shuffle": player.shuffle_enabled, "queueType": "PLAYLIST", "startIndex": startindex, # Item index to play after this request or keep same item if undefined - "items": castplayer.queue[:10] + "items": queue_items # only load 50 tracks at once or the socket will crash } await self.__send_player_queue(receiver_ctrl, media_controller, queuedata) - await asyncio.sleep(1) - # append the rest of the items in the queue in chunks - for chunk in chunks(castplayer.queue[10:], 100): - await asyncio.sleep(0.1) - queuedata = { "type": 'QUEUE_INSERT', "items": chunk } + if len(new_tracks) > 50: + await self.__queue_insert(player_id, new_tracks[51:]) + + async def __queue_insert(self, player_id, new_tracks, insert_before=None): + ''' insert item into the player queue ''' + castplayer = self._chromecasts[player_id] + queue_items = await self.__create_queue_items(new_tracks) + media_controller = castplayer.media_controller + receiver_ctrl = media_controller._socket_client.receiver_controller + for chunk in chunks(queue_items, 50): + queuedata = { + "type": 'QUEUE_INSERT', + "insertBefore": insert_before, + "items": chunk + } await self.__send_player_queue(receiver_ctrl, media_controller, queuedata) + + async def __queue_update(self, player_id, queue_items_to_update): + ''' update the cast player queue ''' + castplayer = self._chromecasts[player_id] + media_controller = castplayer.media_controller + receiver_ctrl = media_controller._socket_client.receiver_controller + queuedata = { + "type": 'QUEUE_UPDATE', + "items": queue_items_to_update + } + await self.__send_player_queue(receiver_ctrl, media_controller, queuedata) + + async def __queue_remove(self, player_id, queue_item_ids): + ''' remove items from the cast player queue ''' + media_controller = self._chromecasts[player_id].media_controller + receiver_ctrl = media_controller._socket_client.receiver_controller + queuedata = { + "type": 'QUEUE_REMOVE', + "items": queue_item_ids + } + await self.__send_player_queue(receiver_ctrl, media_controller, queuedata) + + async def __resume_queue(self, player_id): + ''' resume queue play after power off ''' + + player = self._players[player_id] + queue_index = await self.__get_cur_queue_index(player_id) + print('resume queue at index %s' % queue_index) + tracks = self._player_queue[player_id] + await self.__queue_load(player_id, tracks, queue_index) + + async def __create_queue_items(self, tracks): + ''' create list of CC queue items from tracks ''' + queue_items = [] + for track in tracks: + queue_item = await self.__create_queue_item(track) + queue_items.append(queue_item) + return queue_items + + async def __create_queue_item(self, track): + '''create queue item from track info ''' + return { + 'autoplay' : True, + 'preloadTime' : 10, + 'playbackDuration': int(track.duration), + 'startTime' : 0, + 'activeTrackIds' : [], + 'media': { + 'contentId': track.uri, + 'customData': { + 'provider': track.provider, + 'uri': track.uri, + 'item_id': track.item_id + }, + 'contentType': "audio/flac", + 'streamType': 'BUFFERED', + 'metadata': { + 'title': track.name, + 'artist': track.artists[0].name, + }, + 'duration': int(track.duration) + } + } async def __send_player_queue(self, receiver_ctrl, media_controller, queuedata): '''send new data to the CC queue''' def app_launched_callback(): - LOGGER.info("app_launched_callback") """Plays media after chromecast has switched to requested app.""" queuedata['mediaSessionId'] = media_controller.status.media_session_id - LOGGER.info('') - LOGGER.info('') media_controller.send_message(queuedata, inc_session_id=False) receiver_ctrl.launch_app(media_controller.app_id, callback_function=app_launched_callback) @@ -255,7 +257,7 @@ class ChromecastProvider(PlayerProvider): player.powered = True elif mediastatus.player_state == 'PAUSED': player.state = PlayerState.Paused - player.powered = True + player.powered = not chromecast.is_idle else: player.state = PlayerState.Stopped player.powered = player.powered @@ -315,12 +317,15 @@ class ChromecastProvider(PlayerProvider): chromecasts = await asyncio.gather(bg_task) for chromecast in chromecasts[0]: player_id = str(chromecast.uuid) - if not player_id in self._players: + ip_change = False + if player_id in self._chromecasts and chromecast.uri != self._chromecasts[player_id].uri: + LOGGER.warning('Chromecast uri changed ?! - old: %s - new: %s' %(self._chromecasts[player_id].uri, chromecast.uri)) + ip_change = True + if not player_id in self._players or ip_change: player = MusicPlayer() player.player_id = player_id player.name = chromecast.name player.player_provider = self.prov_id - chromecast.start() # patch the receive message method for handling queue status updates chromecast.queue = [] chromecast.media_controller.queue_items = [] @@ -336,9 +341,12 @@ class ChromecastProvider(PlayerProvider): mz.register_listener(MZListener(mz, self.__handle_group_members_update, self.mass.event_loop)) chromecast.register_handler(mz) chromecast.register_connection_listener(MZConnListener(mz)) - chromecast.wait() self._chromecasts[player_id] = chromecast self._players[player_id] = player + if not player_id in self._player_queue: + # TODO: persistant storage of player queue ? + self._player_queue[player_id] = [] + chromecast.wait() LOGGER.info('Chromecast discovery done...') def chunks(l, n): diff --git a/music_assistant/modules/web.py b/music_assistant/modules/web.py index 7e18a63c..9930b4d8 100755 --- a/music_assistant/modules/web.py +++ b/music_assistant/modules/web.py @@ -269,17 +269,20 @@ class Web(): resp = web.StreamResponse(status=200, reason='OK', headers={'Content-Type': 'audio/flac'}) - await resp.prepare(request) - if request.method.upper() == 'HEAD': - return resp - cancelled = False - async for chunk in self.mass.player.get_audio_stream(track_id, provider): - if cancelled: - continue # just consume all bytes in stream to prevent deadlocks in the subprocess based iterators - try: - await resp.write(chunk) - except (asyncio.CancelledError, concurrent.futures._base.CancelledError, ConnectionResetError): - LOGGER.error('client disconnect?') - cancelled = True - if not cancelled: - return resp \ No newline at end of file + try: + await resp.prepare(request) + if request.method.upper() == 'HEAD': + return resp + cancelled = False + async for chunk in self.mass.player.get_audio_stream(track_id, provider, player_id): + if cancelled: + continue # just consume all bytes in stream to prevent deadlocks in the subprocess based iterators + try: + await resp.write(chunk) + except (asyncio.CancelledError, concurrent.futures._base.CancelledError, ConnectionResetError): + LOGGER.error('client disconnect?') + cancelled = True + if not cancelled: + return resp + except AttributeError: + LOGGER.error('client disconnect?') \ No newline at end of file -- 2.34.1