fix chromecast streaming
authormarcelveldt <marcelvanderveldt@MacBook-Pro.local>
Tue, 21 May 2019 13:24:31 +0000 (15:24 +0200)
committermarcelveldt <marcelvanderveldt@MacBook-Pro.local>
Tue, 21 May 2019 13:24:31 +0000 (15:24 +0200)
add sox options and downsampling

music_assistant/models.py
music_assistant/modules/musicproviders/qobuz.py
music_assistant/modules/player.py
music_assistant/modules/playerproviders/chromecast.py
music_assistant/modules/web.py

index 5f0a534871891c821d24dda4378139a73cab00ff..a033720df8bc5968d30171f3e8c45e70940dcebd 100755 (executable)
@@ -45,7 +45,7 @@ class TrackQuality(IntEnum):
     LOSSY_OGG = 1
     LOSSY_AAC = 2
     FLAC_LOSSLESS = 6 # 44.1/48khz 16 bits HI-RES
-    FLAC_LOSSLES_HI_RES_1 = 7 # 44.1/48khz 24 bits HI-RES
+    FLAC_LOSSLESS_HI_RES_1 = 7 # 44.1/48khz 24 bits HI-RES
     FLAC_LOSSLESS_HI_RES_2 = 8 # 88.2/96khz 24 bits HI-RES
     FLAC_LOSSLESS_HI_RES_3 = 9 # 176/192khz 24 bits HI-RES
     FLAC_LOSSLESS_HI_RES_4 = 10 # above 192khz 24 bits HI-RES
index fc12348fdbc736a2b9c518efa133479c1187a223..51ec96352ff9c6ada40258877b8039791ea4fc0d 100644 (file)
@@ -400,7 +400,7 @@ class QobuzProvider(MusicProvider):
         elif track_obj['maximum_sampling_rate'] > 48:
             quality = TrackQuality.FLAC_LOSSLESS_HI_RES_2
         elif track_obj['maximum_bit_depth'] > 16:
-            quality = TrackQuality.FLAC_LOSSLES_HI_RES_1
+            quality = TrackQuality.FLAC_LOSSLESS_HI_RES_1
         elif track_obj.get('format_id',0) == 5:
             quality = TrackQuality.LOSSY_AAC
         else:
index 1ec7739a17c70674f851c679952dc20995b6b538..87529ff4da76b9ac66ee0431dd01933329118e01 100755 (executable)
@@ -6,7 +6,7 @@ import os
 from utils import run_periodic, LOGGER, try_parse_int, try_parse_float, get_ip, run_async_background_task
 import aiohttp
 from difflib import SequenceMatcher as Matcher
-from models import MediaType, PlayerState, MusicPlayer
+from models import MediaType, PlayerState, MusicPlayer, TrackQuality
 from typing import List
 import toolz
 import operator
@@ -57,6 +57,7 @@ class Player():
             ("apply_group_power", False, "player_group_pow"),
             ("play_power_on", False, "player_power_play"),
             ("sox_effects", '', "http_streamer_sox_effects"),
+            ("max_sample_rate", '96000', "max_sample_rate"),
             ("force_http_streamer", False, "force_http_streamer")
         ]
         # config for the http streamer
@@ -370,23 +371,25 @@ class Player():
         ''' get audio stream from provider and apply additional effects/processing where/if needed'''
         input_content_type = await self.mass.music.providers[provider].get_stream_content_type(track_id)
         cachefile = self.__get_track_cache_filename(track_id, provider)
-        sox_effects = []
+        sox_effects = ''
+         # sox settings
         if self.mass.config['base']['http_streamer']['volume_normalisation']:
             gain_correct = await self.__get_track_gain_correct(track_id, provider)
             LOGGER.info("apply gain correction of %s" % gain_correct)
-            sox_effects += ['vol', '%s dB' % gain_correct]
-        if player_id and self.mass.config['player_settings'][player_id]['sox_effects']:
-            sox_effects += self.mass.config['player_settings'][player_id]['sox_effects'].split('/')
+            sox_effects += ' vol %s dB ' % gain_correct
+        sox_effects += await self.__get_player_sox_options(track_id, provider, player_id)
         if os.path.isfile(cachefile):
             # we have a cache file for this track which we can use
-            args = ['-t', 'flac', cachefile, '-t', 'flac', '-', *sox_effects]
-            process = await asyncio.create_subprocess_exec('sox', *args, 
+            args = 'sox -t flac %s -t flac -C 0 - %s' % (cachefile, sox_effects)
+            LOGGER.info("Running sox with args: %s" % args)
+            process = await asyncio.create_subprocess_shell(args, 
                     stdout=asyncio.subprocess.PIPE)
             buffer_task = None
         else:
             # stream from provider
-            args = ['-t', input_content_type, '-', '-t', 'flac', '-', *sox_effects]
-            process = await asyncio.create_subprocess_exec('sox', *args, 
+            args = 'sox -t %s - -t flac -C 0 - %s' % (input_content_type, sox_effects)
+            LOGGER.info("Running sox with args: %s" % args)
+            process = await asyncio.create_subprocess_shell(args,
                     stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
             buffer_task = asyncio.create_task(
                     self.__fill_audio_buffer(process.stdin, track_id, provider, input_content_type))
@@ -399,6 +402,31 @@ class Player():
         await process.wait()
         LOGGER.info("streaming of track_id %s completed" % track_id)
 
+    async def __get_player_sox_options(self, track_id, provider, player_id):
+        ''' get player specific sox options '''
+        sox_effects = ' '
+        if not player_id:
+            return ''
+        if self.mass.config['player_settings'][player_id]['max_sample_rate']:
+            # downsample if needed
+            max_sample_rate = try_parse_int(self.mass.config['player_settings'][player_id]['max_sample_rate'])
+            if max_sample_rate:
+                quality = TrackQuality.LOSSY_MP3
+                track = await self.mass.music.track(track_id, provider)
+                for item in track.provider_ids:
+                    if item['provider'] == provider and item['item_id'] == track_id:
+                        quality = item['quality']
+                        break
+                if quality > TrackQuality.FLAC_LOSSLESS_HI_RES_3 and max_sample_rate == 192000:
+                    sox_effects += 'rate -v 192000'
+                elif quality > TrackQuality.FLAC_LOSSLESS_HI_RES_2 and max_sample_rate == 96000:
+                    sox_effects += 'rate -v 96000'
+                elif quality > TrackQuality.FLAC_LOSSLESS_HI_RES_1 and max_sample_rate == 48000:
+                    sox_effects += 'rate -v 48000'
+        if self.mass.config['player_settings'][player_id]['sox_effects']:
+            sox_effects += self.mass.config['player_settings'][player_id]['sox_effects']
+        return sox_effects + ' '
+        
     async def __analyze_audio(self, tmpfile, track_id, provider, content_type):
         ''' analyze track audio, for now we only calculate EBU R128 loudness '''
         LOGGER.info('Start analyzing file %s' % tmpfile)
index 5de5c0a8262541ea69c44c0cad6947c75d9e109e..c53d8e48686135ff93ce946d5a608c86614f55eb 100644 (file)
@@ -50,6 +50,7 @@ class ChromecastProvider(PlayerProvider):
         self.mass = mass
         self._players = {}
         self._chromecasts = {}
+        self._player_queue = {}
         self.supported_musicproviders = ['http']
         self.http_session = aiohttp.ClientSession(loop=mass.event_loop)
         asyncio.ensure_future(self.__discover_chromecasts())
@@ -60,7 +61,7 @@ class ChromecastProvider(PlayerProvider):
     async def player_command(self, player_id, cmd:str, cmd_args=None):
         ''' issue command on player (play, pause, next, previous, stop, power, volume, mute) '''
         if cmd == 'play':
-            if self._chromecasts[player_id].media_controller.status.media_session_id:
+            if self._chromecasts[player_id].media_controller.status.player_is_paused:
                 self._chromecasts[player_id].media_controller.play()
             else:
                 await self.__resume_queue(player_id)
@@ -87,154 +88,155 @@ class ChromecastProvider(PlayerProvider):
             self._chromecasts[player_id].set_volume_muted(True)
 
     async def player_queue(self, player_id, offset=0, limit=50):
-        ''' return the items in the player's queue '''
-        items = []
-        for item in self._chromecasts[player_id].queue[offset:limit]:
-            track = await self.__track_from_uri(item['media']['contentId'])
-            if track:
-                items.append(track)
-        return items
-    
-    async def create_queue_item(self, track):
-        '''create queue item from track info '''
-        return {
-            'autoplay' : True,
-            'preloadTime' : 10,
-            'playbackDuration': int(track.duration),
-            'startTime' : 0,
-            'activeTrackIds' : [],
-            'media': {
-                'contentId':  track.uri,
-                'customData': {'provider': track.provider},
-                'contentType': "audio/flac",
-                'streamType': 'BUFFERED',
-                'metadata': {
-                    'title': track.name,
-                    'artist': track.artists[0].name,
-                },
-                'duration': int(track.duration)
-            }
-        }
+        ''' return the current items in the player's queue '''
+        return self._player_queue[player_id][offset:limit]
     
     async def play_media(self, player_id, media_items, queue_opt='play'):
         ''' 
             play media on a player
         '''
         castplayer = self._chromecasts[player_id]
-        player = self._players[player_id]
-        media_controller = castplayer.media_controller
-        receiver_ctrl = media_controller._socket_client.receiver_controller
-        cur_queue_index = 0
-        if media_controller.queue_cur_id != None:
-            for item in media_controller.queue_items:
-                # status queue may contain at max 3 tracks (previous, current and next)
-                if item['itemId'] == media_controller.queue_cur_id:
-                    cur_queue_item = item
-                    # find out the current index
-                    for counter, value in enumerate(castplayer.queue):
-                        if value['media']['contentId'] == cur_queue_item['media']['contentId']:
-                            cur_queue_index = counter
-                            break
-                    break
-        if (not media_controller.queue_cur_id or not media_controller.status.media_session_id or not castplayer.queue):
-            queue_opt = 'replace'
-
-        new_queue_items = []
-        for track in media_items:
-            queue_item = await self.create_queue_item(track)
-            new_queue_items.append(queue_item)
-
-        if (queue_opt in ['replace', 'play'] or not media_controller.queue_cur_id or 
-                not media_controller.status.media_session_id or not castplayer.queue):
-            # load new Chromecast queue with items
-            if queue_opt == 'add':
-                # append items to queue
-                castplayer.queue = castplayer.queue + new_queue_items
-                startindex = cur_queue_index
-            elif queue_opt == 'play':
-                # keep current queue but append new items at begin and start playing first item
-                castplayer.queue = new_queue_items + castplayer.queue[cur_queue_index:] + castplayer.queue[:cur_queue_index]
-                startindex = 0
-            elif queue_opt == 'next':
-                # play the new items after the current playing item (insert before current next item)
-                castplayer.queue = new_queue_items + castplayer.queue[cur_queue_index:] + castplayer.queue[:cur_queue_index]
-                startindex = cur_queue_index
+        cur_queue_index = await self.__get_cur_queue_index(player_id)
+
+        if queue_opt == 'replace' or not self._player_queue[player_id]:
+            # overwrite queue with new items
+            self._player_queue[player_id] = media_items
+            await self.__queue_load(player_id, self._player_queue[player_id], 0)
+        elif queue_opt == 'play':
+            # replace current item with new item(s)
+            self._player_queue[player_id] = self._player_queue[player_id][:cur_queue_index] + media_items + self._player_queue[player_id][cur_queue_index+1:]
+            await self.__queue_load(player_id, self._player_queue[player_id], cur_queue_index)
+        elif queue_opt == 'next':
+            # insert new items at current index +1
+            if len(self._player_queue[player_id]) > cur_queue_index:
+                old_next_uri = self._player_queue[player_id][cur_queue_index+1].uri
             else:
-                # overwrite the whole queue with new item(s)
-                castplayer.queue = new_queue_items
-                startindex = 0
-            # load first 10 items as soon as possible
-            queuedata = { 
-                    "type": 'QUEUE_LOAD',
-                    "repeatMode":  "REPEAT_ALL" if player.repeat_enabled else "REPEAT_OFF",
-                    "shuffle": player.shuffle_enabled,
-                    "queueType": "PLAYLIST",
-                    "startIndex":    startindex,    # Item index to play after this request or keep same item if undefined
-                    "items": castplayer.queue[:10]
-            }
-            await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
-            await asyncio.sleep(1)
-            # append the rest of the items in the queue in chunks
-            for chunk in chunks(castplayer.queue[10:], 100):
-                queuedata = { "type": 'QUEUE_INSERT', "items": chunk }
-                await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
-                await asyncio.sleep(0.1)
+                old_next_uri = None
+            self._player_queue[player_id] = self._player_queue[player_id][:cur_queue_index+1] + media_items + self._player_queue[player_id][cur_queue_index+1:]
+            # find out the itemID of the next item in CC queue
+            insert_at_item_id = None
+            if old_next_uri:
+                for item in castplayer.media_controller.queue_items:
+                    if item['media']['contentId'] == old_next_uri:
+                        insert_at_item_id = item['itemId']
+            await self.__queue_insert(player_id, media_items, insert_at_item_id)
         elif queue_opt == 'add':
-            # existing queue is playing: simply append items to the end of the queue (in small chunks)
-            castplayer.queue = castplayer.queue + new_queue_items
-            insertbefore = None
-            for chunk in chunks(new_queue_items, 100):
-                queuedata = { "type": 'QUEUE_INSERT', "items": chunk }
-                await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
-                await asyncio.sleep(0.1)
-        elif queue_opt == 'next':
-            # play the new items after the current playing item (insert before current next item)
-            player.queue = castplayer.queue[:cur_queue_index] + new_queue_items + castplayer.queue[cur_queue_index:]
-            queuedata = { 
-                        "type": 'QUEUE_INSERT',
-                        "insertBefore":     media_controller.queue_cur_id+1,
-                        "items":            new_queue_items[:200] # limit of the queue message
-                }
-            await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
-            
+            # add new items at end of queue
+            self._player_queue[player_id] = self._player_queue[player_id] + media_items
+            await self.__queue_insert(player_id, media_items)
+
     ### Provider specific (helper) methods #####
 
-    async def __resume_queue(self, player_id):
-        ''' resume queue play after power off '''
-        player = self._players[player_id]
+    async def __get_cur_queue_index(self, player_id):
+        ''' retrieve index of current item in the player queue '''
+        cur_index = 0
+        for index, track in enumerate(self._player_queue[player_id]):
+            if track.uri == self._chromecasts[player_id].media_controller.status.content_id:
+                cur_index = index
+                break
+        return cur_index
+
+    async def __queue_load(self, player_id, new_tracks, startindex=None):
+        ''' load queue on player with given queue items '''
         castplayer = self._chromecasts[player_id]
+        player = self._players[player_id]
         media_controller = castplayer.media_controller
         receiver_ctrl = media_controller._socket_client.receiver_controller
-        startindex = 0
-        if player.cur_item and player.cur_item.name:
-            for index, item in enumerate(castplayer.queue):
-                if item['media']['metadata']['title'] == player.cur_item.name:
-                    startindex = index
-                    break
+        queue_items = await self.__create_queue_items(new_tracks[:50])
         queuedata = { 
                 "type": 'QUEUE_LOAD',
                 "repeatMode":  "REPEAT_ALL" if player.repeat_enabled else "REPEAT_OFF",
                 "shuffle": player.shuffle_enabled,
                 "queueType": "PLAYLIST",
                 "startIndex":    startindex,    # Item index to play after this request or keep same item if undefined
-                "items": castplayer.queue[:10]
+                "items": queue_items # only load 50 tracks at once or the socket will crash
         }
         await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
-        await asyncio.sleep(1)
-        # append the rest of the items in the queue in chunks
-        for chunk in chunks(castplayer.queue[10:], 100):
-            await asyncio.sleep(0.1)
-            queuedata = { "type": 'QUEUE_INSERT', "items": chunk }
+        if len(new_tracks) > 50:
+            await self.__queue_insert(player_id, new_tracks[51:])
+
+    async def __queue_insert(self, player_id, new_tracks, insert_before=None):
+        ''' insert item into the player queue '''
+        castplayer = self._chromecasts[player_id]
+        queue_items = await self.__create_queue_items(new_tracks)
+        media_controller = castplayer.media_controller
+        receiver_ctrl = media_controller._socket_client.receiver_controller
+        for chunk in chunks(queue_items, 50):
+            queuedata = { 
+                        "type": 'QUEUE_INSERT',
+                        "insertBefore":     insert_before,
+                        "items":            chunk
+                }
             await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
+
+    async def __queue_update(self, player_id, queue_items_to_update):
+        ''' update the cast player queue '''
+        castplayer = self._chromecasts[player_id]
+        media_controller = castplayer.media_controller
+        receiver_ctrl = media_controller._socket_client.receiver_controller
+        queuedata = { 
+                    "type": 'QUEUE_UPDATE',
+                    "items": queue_items_to_update
+            }
+        await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
+
+    async def __queue_remove(self, player_id, queue_item_ids):
+        ''' remove items from the cast player queue '''
+        media_controller = self._chromecasts[player_id].media_controller
+        receiver_ctrl = media_controller._socket_client.receiver_controller
+        queuedata = { 
+                    "type": 'QUEUE_REMOVE',
+                    "items": queue_item_ids
+            }
+        await self.__send_player_queue(receiver_ctrl, media_controller, queuedata)
+
+    async def __resume_queue(self, player_id):
+        ''' resume queue play after power off '''
+        
+        player = self._players[player_id]
+        queue_index = await self.__get_cur_queue_index(player_id)
+        print('resume queue at index %s' % queue_index)
+        tracks = self._player_queue[player_id]
+        await self.__queue_load(player_id, tracks, queue_index)
+
+    async def __create_queue_items(self, tracks):
+        ''' create list of CC queue items from tracks '''
+        queue_items = []
+        for track in tracks:
+            queue_item = await self.__create_queue_item(track)
+            queue_items.append(queue_item)
+        return queue_items
+
+    async def __create_queue_item(self, track):
+        '''create queue item from track info '''
+        return {
+            'autoplay' : True,
+            'preloadTime' : 10,
+            'playbackDuration': int(track.duration),
+            'startTime' : 0,
+            'activeTrackIds' : [],
+            'media': {
+                'contentId':  track.uri,
+                'customData': {
+                    'provider': track.provider, 
+                    'uri': track.uri, 
+                    'item_id': track.item_id
+                },
+                'contentType': "audio/flac",
+                'streamType': 'BUFFERED',
+                'metadata': {
+                    'title': track.name,
+                    'artist': track.artists[0].name,
+                },
+                'duration': int(track.duration)
+            }
+        }
         
     async def __send_player_queue(self, receiver_ctrl, media_controller, queuedata):
         '''send new data to the CC queue'''
         def app_launched_callback():
-                LOGGER.info("app_launched_callback")
                 """Plays media after chromecast has switched to requested app."""
                 queuedata['mediaSessionId'] = media_controller.status.media_session_id
-                LOGGER.info('')
-                LOGGER.info('')
                 media_controller.send_message(queuedata, inc_session_id=False)
         receiver_ctrl.launch_app(media_controller.app_id,
                                 callback_function=app_launched_callback)
@@ -255,7 +257,7 @@ class ChromecastProvider(PlayerProvider):
                 player.powered = True
             elif mediastatus.player_state == 'PAUSED':
                 player.state = PlayerState.Paused
-                player.powered = True
+                player.powered = not chromecast.is_idle
             else:
                 player.state = PlayerState.Stopped
                 player.powered = player.powered
@@ -315,12 +317,15 @@ class ChromecastProvider(PlayerProvider):
         chromecasts = await asyncio.gather(bg_task)
         for chromecast in chromecasts[0]:
             player_id = str(chromecast.uuid)
-            if not player_id in self._players:
+            ip_change = False
+            if player_id in self._chromecasts and chromecast.uri != self._chromecasts[player_id].uri:
+                LOGGER.warning('Chromecast uri changed ?! - old: %s - new: %s' %(self._chromecasts[player_id].uri, chromecast.uri))
+                ip_change = True
+            if not player_id in self._players or ip_change:
                 player = MusicPlayer()
                 player.player_id = player_id
                 player.name = chromecast.name
                 player.player_provider = self.prov_id
-                chromecast.start()
                 # patch the receive message method for handling queue status updates
                 chromecast.queue = []
                 chromecast.media_controller.queue_items = []
@@ -336,9 +341,12 @@ class ChromecastProvider(PlayerProvider):
                     mz.register_listener(MZListener(mz, self.__handle_group_members_update, self.mass.event_loop))
                     chromecast.register_handler(mz)
                     chromecast.register_connection_listener(MZConnListener(mz))
-                    chromecast.wait()
                 self._chromecasts[player_id] = chromecast
                 self._players[player_id] = player
+                if not player_id in self._player_queue:
+                    # TODO: persistant storage of player queue ?
+                    self._player_queue[player_id] = []
+                chromecast.wait()
         LOGGER.info('Chromecast discovery done...')
 
 def chunks(l, n):
index 7e18a63c022eed0cfd3199ac9c82fa7ef0925dec..9930b4d80cfe92e0bd78324a4ca47d6d692ef0b3 100755 (executable)
@@ -269,17 +269,20 @@ class Web():
         resp = web.StreamResponse(status=200,
                                  reason='OK',
                                  headers={'Content-Type': 'audio/flac'})
-        await resp.prepare(request)
-        if request.method.upper() == 'HEAD':
-            return resp
-        cancelled = False
-        async for chunk in self.mass.player.get_audio_stream(track_id, provider):
-            if cancelled:
-                continue # just consume all bytes in stream to prevent deadlocks in the subprocess based iterators
-            try:
-                await resp.write(chunk)
-            except (asyncio.CancelledError, concurrent.futures._base.CancelledError, ConnectionResetError):
-                LOGGER.error('client disconnect?')
-                cancelled = True
-        if not cancelled:
-            return resp
\ No newline at end of file
+        try:
+            await resp.prepare(request)
+            if request.method.upper() == 'HEAD':
+                return resp
+            cancelled = False
+            async for chunk in self.mass.player.get_audio_stream(track_id, provider, player_id):
+                if cancelled:
+                    continue # just consume all bytes in stream to prevent deadlocks in the subprocess based iterators
+                try:
+                    await resp.write(chunk)
+                except (asyncio.CancelledError, concurrent.futures._base.CancelledError, ConnectionResetError):
+                    LOGGER.error('client disconnect?')
+                    cancelled = True
+            if not cancelled:
+                return resp
+        except AttributeError:
+            LOGGER.error('client disconnect?')
\ No newline at end of file