From: marcelveldt Date: Thu, 17 Oct 2019 22:04:15 +0000 (+0200) Subject: fix playback reporting stuff X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=af4e037ff93fdd0702d9cb46a0a028320abab3b2;p=music-assistant-server.git fix playback reporting stuff fix chromecast stability due to threading --- diff --git a/music_assistant/constants.py b/music_assistant/constants.py index a83bf0f9..d848e47a 100755 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -20,3 +20,5 @@ EVENT_PLAYER_CHANGED = "player changed" EVENT_STREAM_STARTED = "streaming started" EVENT_STREAM_ENDED = "streaming ended" EVENT_CONFIG_CHANGED = "config changed" +EVENT_PLAYBACK_STARTED = "playback started" +EVENT_PLAYBACK_STOPPED = "playback stopped" diff --git a/music_assistant/http_streamer.py b/music_assistant/http_streamer.py index 9f5b6fd4..1c9638d3 100755 --- a/music_assistant/http_streamer.py +++ b/music_assistant/http_streamer.py @@ -296,29 +296,29 @@ class HTTPStreamer(): yield (True, b'') return # get sox effects and resample options - sox_effects = await self.__get_player_sox_options(player, queue_item) + sox_options = await self.__get_player_sox_options(player, queue_item) outputfmt = 'flac -C 0' if resample: outputfmt = 'raw -b 32 -c 2 -e signed-integer' - sox_effects += ' rate -v %s' % resample + sox_options += ' rate -v %s' % resample + streamdetails['sox_options'] = sox_options # determine how to proceed based on input file ype if streamdetails["content_type"] == 'aac': # support for AAC created with ffmpeg in between - args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_effects) + args = 'ffmpeg -v quiet -i "%s" -f flac - | sox -t flac - -t %s - %s' % (streamdetails["path"], outputfmt, sox_options) elif streamdetails['type'] == 'url': args = 'sox -t %s "%s" -t %s - %s' % (streamdetails["content_type"], - streamdetails["path"], outputfmt, sox_effects) + streamdetails["path"], outputfmt, sox_options) elif streamdetails['type'] == 'executable': args = '%s | sox -t %s - -t %s - %s' % (streamdetails["path"], - streamdetails["content_type"], outputfmt, sox_effects) + streamdetails["content_type"], outputfmt, sox_options) # start sox process # we use normal subprocess instead of asyncio because of bug with executor # this should be fixed with python 3.8 process = subprocess.Popen(args, shell=True, stdout=subprocess.PIPE) - # fire event that streaming has started for this track (needed by some streaming providers) + # fire event that streaming has started for this track asyncio.run_coroutine_threadsafe( - self.mass.signal_event(EVENT_STREAM_STARTED, - streamdetails), self.mass.event_loop) + self.mass.signal_event(EVENT_STREAM_STARTED, queue_item), self.mass.event_loop) # yield chunks from stdout # we keep 1 chunk behind to detect end of stream properly bytes_sent = 0 @@ -343,44 +343,37 @@ class HTTPStreamer(): else: buf += data del buf - # fire event that streaming has ended for this track (needed by some streaming providers) - if resample: - bytes_per_second = resample * (32/8) * 2 - bytes_per_second = (resample * 32 * 2) / 8 - seconds_streamed = int(bytes_sent/bytes_per_second) - else: - seconds_streamed = queue_item.duration - streamdetails["seconds"] = seconds_streamed + # fire event that streaming has ended asyncio.run_coroutine_threadsafe( - self.mass.signal_event(EVENT_STREAM_ENDED, streamdetails), self.mass.event_loop) + self.mass.signal_event(EVENT_STREAM_ENDED, queue_item), self.mass.event_loop) # send task to background to analyse the audio self.mass.event_loop.call_soon_threadsafe( asyncio.ensure_future, self.__analyze_audio(queue_item)) async def __get_player_sox_options(self, player, queue_item): ''' get player specific sox effect options ''' - sox_effects = [] + sox_options = [] # volume normalisation gain_correct = asyncio.run_coroutine_threadsafe( self.mass.players.get_gain_correct( player.player_id, queue_item.item_id, queue_item.provider), self.mass.event_loop).result() if gain_correct != 0: - sox_effects.append('vol %s dB ' % gain_correct) + sox_options.append('vol %s dB ' % gain_correct) # downsample if needed if player.settings['max_sample_rate']: max_sample_rate = try_parse_int(player.settings['max_sample_rate']) if max_sample_rate: quality = queue_item.quality if quality > TrackQuality.FLAC_LOSSLESS_HI_RES_3 and max_sample_rate == 192000: - sox_effects.append('rate -v 192000') + sox_options.append('rate -v 192000') elif quality > TrackQuality.FLAC_LOSSLESS_HI_RES_2 and max_sample_rate == 96000: - sox_effects.append('rate -v 96000') + sox_options.append('rate -v 96000') elif quality > TrackQuality.FLAC_LOSSLESS_HI_RES_1 and max_sample_rate == 48000: - sox_effects.append('rate -v 48000') - if player.settings.get('sox_effects'): - sox_effects.append(player.settings['sox_effects']) - return " ".join(sox_effects) + sox_options.append('rate -v 48000') + if player.settings.get('sox_options'): + sox_options.append(player.settings['sox_options']) + return " ".join(sox_options) async def __analyze_audio(self, queue_item): ''' analyze track audio, for now we only calculate EBU R128 loudness ''' @@ -422,7 +415,6 @@ class HTTPStreamer(): args = 'sox --ignore-length -t %s - -t %s %s fade t %s' % (pcm_args, pcm_args, fadeinfile.name, fade_length) process = subprocess.Popen(args, shell=True, stdin=subprocess.PIPE) process.communicate(fade_in_part) - fadeinfile.close() # create fade-out part fadeoutfile = MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0) args = 'sox --ignore-length -t %s - -t %s %s reverse fade t %s reverse' % (pcm_args, pcm_args, fadeoutfile.name, fade_length) diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index be2df942..ab75dbf4 100755 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -10,7 +10,7 @@ import os import pickle from ..utils import LOGGER, json, filename_from_string -from ..constants import CONF_ENABLED +from ..constants import CONF_ENABLED, EVENT_PLAYBACK_STARTED, EVENT_PLAYBACK_STOPPED from .media_types import Track, TrackQuality from .playerstate import PlayerState @@ -42,13 +42,14 @@ class PlayerQueue(): self._repeat_enabled = False self._cur_index = 0 self._cur_item_time = 0 - self._last_cur_item_time = 0 - self._last_index = 0 + self._last_item_time = 0 + self._last_queue_startindex = 0 self._last_player_state = PlayerState.Stopped self._save_busy_ = False + self._last_track = None # load previous queue settings from disk - self.__load_from_file() - + self.mass.event_loop.create_task(self.__load_from_file()) + @property def shuffle_enabled(self): return self._shuffle_enabled @@ -78,10 +79,7 @@ class PlayerQueue(): @property def cur_item_time(self): - if self.use_queue_stream: - return self._cur_item_time - else: - return self._player._cur_time + return self._cur_item_time @property def next_index(self): @@ -174,9 +172,12 @@ class PlayerQueue(): ''' resume previous queue ''' if self.items: prev_index = self.cur_index - await self.load(self._items) - if prev_index: + if self.use_queue_stream: await self.play_index(prev_index) + else: + # at this point we don't know if the queue is synced with the player + # so just to be safe we only send the queue_items from the index + await self.load(self._items[prev_index:]) else: LOGGER.warning("resume queue requested for %s but queue is empty" % self._player.name) @@ -185,7 +186,7 @@ class PlayerQueue(): if not len(self.items) > index: return if self.use_queue_stream: - self._cur_index = index + self._last_queue_startindex = index queue_stream_uri = 'http://%s:%s/stream/%s'% ( self.mass.web.local_ip, self.mass.web.http_port, self._player.player_id) return await self._player.cmd_play_uri(queue_stream_uri) @@ -199,11 +200,10 @@ class PlayerQueue(): if self._shuffle_enabled: queue_items = await self.__shuffle_items(queue_items) self._items = queue_items - self._cur_index = 0 if self.use_queue_stream or not self._player.supports_queue: - return await self.play_index(0) + await self.play_index(0) else: - return await self._player.cmd_queue_load(queue_items) + await self._player.cmd_queue_load(queue_items) async def insert(self, queue_items:List[QueueItem], offset=0): ''' @@ -248,81 +248,95 @@ class PlayerQueue(): async def update(self): ''' update queue details, called when player updates ''' cur_index = self._cur_index - # determine queue index and cur_time for queue stream + track_time = self._cur_item_time + # handle queue stream if self.use_queue_stream and self._player.state == PlayerState.Playing: - # player is playing a constant stream of the queue so we need to do this the hard way - cur_time_queue = self._player._cur_time - total_time = 0 - track_time = 0 - if self.items and len(self.items) > self._last_index: - queue_index = self._last_index # holds the last starting position - queue_track = None - while len(self.items) > queue_index: - queue_track = self.items[queue_index] - if cur_time_queue > (queue_track.duration + total_time): - total_time += queue_track.duration - queue_index += 1 - else: - track_time = cur_time_queue - total_time - break - cur_index = queue_index - self._cur_item_time = track_time + cur_index, track_time = await self.__get_queue_stream_index() # normal queue based approach elif not self.use_queue_stream: - if 'queue_item_id' in self._player.cur_uri: - queue_item_id = self._player.cur_uri.split('queue_item_id=')[1] + track_time = self._player._cur_time for index, queue_item in enumerate(self.items): if queue_item.uri == self._player.cur_uri: cur_index = index break # process new index - await self.__update_index(cur_index) + await self.__process_queue_update(cur_index, track_time) async def start_queue_stream(self): ''' called by the queue streamer when it starts playing the queue stream ''' - self._last_index = self.cur_index - return await self.get_item(self.cur_index) + return await self.get_item(self._last_queue_startindex) - async def __update_index(self, new_index): + async def __get_queue_stream_index(self): + # player is playing a constant stream of the queue so we need to do this the hard way + queue_index = 0 + cur_time_queue = self._player._cur_time + total_time = 0 + track_time = 0 + if self.items and len(self.items) > self._last_queue_startindex: + queue_index = self._last_queue_startindex # holds the last starting position + queue_track = None + while len(self.items) > queue_index: + queue_track = self.items[queue_index] + if cur_time_queue > (queue_track.duration + total_time): + total_time += queue_track.duration + queue_index += 1 + else: + track_time = cur_time_queue - total_time + break + return queue_index, track_time + + async def __process_queue_update(self, new_index, track_time): ''' compare the queue index to determine if playback changed ''' - if new_index != self._last_index: - LOGGER.info("new track loaded in queue") - self._cur_index = new_index - elif (self._last_player_state == PlayerState.Stopped and - self._player.state == PlayerState.Playing and - self.cur_item): - LOGGER.info("Player %s started playing %s" % self.cur_item.name) - elif (self._last_player_state == PlayerState.Playing and - self._player.state == PlayerState.Stopped and - self.cur_item): - LOGGER.info("Player %s stopped playing %s" % self.cur_item.name) - # always update timestamp - self._last_cur_item_time = self.cur_item_time - + new_track = await self.get_item(new_index) + if (not self._last_track and new_track) or self._last_track != new_track: + # queue track updated + # account for track changing state so trigger track change after 1 second + if self._last_track: + self._last_track.seconds_played = self._last_item_time + self.mass.event_loop.create_task( + self.mass.signal_event(EVENT_PLAYBACK_STOPPED, self._last_track)) + if new_track: + self.mass.event_loop.create_task( + self.mass.signal_event(EVENT_PLAYBACK_STARTED, new_track)) + self._last_track = new_track + await self.__save_to_file() + if self._last_player_state != self._player.state: + self._last_player_state = self._player.state + if (self._player.cur_time == 0 and + self._player.state in [PlayerState.Stopped, PlayerState.Off]): + # player stopped playing + self._last_queue_startindex = self.cur_index + # update vars + if track_time > 2: + # account for track changing state so keep this a few seconds behind + self._last_item_time = track_time + self._cur_item_time = track_time + self._cur_index = new_index + async def __shuffle_items(self, queue_items): ''' shuffle a list of tracks ''' # for now we use default python random function # can be extended with some more magic last last_played and stuff return random.sample(queue_items, len(queue_items)) - def __load_from_file(self): + async def __load_from_file(self): ''' try to load the saved queue for this player from file ''' player_safe_str = filename_from_string(self._player.player_id) settings_dir = os.path.join(self.mass.datapath, 'queue') player_file = os.path.join(settings_dir, player_safe_str) if os.path.isfile(player_file): try: - with open(player_file) as f: + with open(player_file, 'rb') as f: data = pickle.load(f) self._shuffle_enabled = data["shuffle_enabled"] self._repeat_enabled = data["repeat_enabled"] self._items = data["items"] self._cur_index = data["cur_item"] - self._last_index = data["last_index"] + self._last_queue_startindex = data["last_index"] except Exception as exc: LOGGER.debug("Could not load queue from disk - %s" % str(exc)) - def __save_to_file(self): + async def __save_to_file(self): ''' save current queue settings to file ''' if self._save_busy_: return @@ -335,12 +349,12 @@ class PlayerQueue(): "repeat_enabled": self._repeat_enabled, "items": self._items, "cur_item": self._cur_index, - "last_index": self._last_index + "last_index": self._cur_index } if not os.path.isdir(settings_dir): os.mkdir(settings_dir) - with open(player_file, 'w+') as f: - pickle.dump(data, f) + with open(player_file, 'wb') as f: + data = pickle.dump(data, f) self._save_busy_ = False diff --git a/music_assistant/musicproviders/qobuz.py b/music_assistant/musicproviders/qobuz.py index 8979a446..7d98cd67 100644 --- a/music_assistant/musicproviders/qobuz.py +++ b/music_assistant/musicproviders/qobuz.py @@ -17,7 +17,7 @@ from ..app_vars import get_app_var from ..models import MusicProvider, MediaType, TrackQuality, \ AlbumType, Artist, Album, Track, Playlist from ..constants import CONF_USERNAME, CONF_PASSWORD, CONF_ENABLED, \ - CONF_TYPE_PASSWORD, EVENT_STREAM_STARTED, EVENT_STREAM_ENDED + CONF_TYPE_PASSWORD, EVENT_PLAYBACK_STARTED, EVENT_PLAYBACK_STOPPED PROV_ID = 'qobuz' PROV_NAME = 'Qobuz' @@ -50,8 +50,8 @@ class QobuzProvider(MusicProvider): self.http_session = aiohttp.ClientSession( loop=self.mass.event_loop, connector=aiohttp.TCPConnector()) self.throttler = Throttler(rate_limit=2, period=1) - await self.mass.add_event_listener(self.mass_event, EVENT_STREAM_STARTED) - await self.mass.add_event_listener(self.mass_event, EVENT_STREAM_ENDED) + await self.mass.add_event_listener(self.mass_event, EVENT_PLAYBACK_STARTED) + await self.mass.add_event_listener(self.mass_event, EVENT_PLAYBACK_STOPPED) async def search(self, searchstring, media_types=List[MediaType], limit=5): ''' perform search on the provider ''' @@ -278,30 +278,28 @@ class QobuzProvider(MusicProvider): async def mass_event(self, msg, msg_details): ''' received event from mass ''' # TODO: need to figure out if the streamed track is purchased - if msg == "streaming_started" and msg_details['provider'] == self.prov_id: + if msg == EVENT_PLAYBACK_STARTED and msg_details.provider == self.prov_id: # report streaming started to qobuz - LOGGER.debug("streaming_started %s" % msg_details["track_id"]) device_id = self.__user_auth_info["user"]["device"]["id"] credential_id = self.__user_auth_info["user"]["credential"]["id"] user_id = self.__user_auth_info["user"]["id"] - format_id = msg_details["details"]["format_id"] + format_id = msg_details.streamdetails["details"]["format_id"] timestamp = int(time.time()) events=[{"online": True, "sample": False, "intent": "stream", "device_id": device_id, - "track_id": msg_details["track_id"], "purchase": False, "date": timestamp, + "track_id": msg_details.item_id, "purchase": False, "date": timestamp, "credential_id": credential_id, "user_id": user_id, "local": False, "format_id":format_id}] await self.__post_data("track/reportStreamingStart", data=events) - elif msg == "streaming_ended" and msg_details['provider'] == self.prov_id: + elif msg == EVENT_PLAYBACK_STOPPED and msg_details.provider == self.prov_id: # report streaming ended to qobuz - LOGGER.debug("streaming_ended %s - seconds played: %s" %(msg_details["track_id"], msg_details["seconds"]) ) device_id = self.__user_auth_info["user"]["device"]["id"] credential_id = self.__user_auth_info["user"]["credential"]["id"] user_id = self.__user_auth_info["user"]["id"] - format_id = msg_details["details"]["format_id"] + format_id = msg_details.streamdetails["details"]["format_id"] timestamp = int(time.time()) events=[{"online": True, "sample": False, "intent": "stream", "device_id": device_id, - "track_id": msg_details["track_id"], "purchase": False, "date": timestamp, "duration": msg_details["seconds"], + "track_id": msg_details.item_id, "purchase": False, "date": timestamp, "duration": int(msg_details.seconds_played), "credential_id": credential_id, "user_id": user_id, "local": False, "format_id":format_id}] - await self.__post_data("track/reportStreamingStart", data=events) + await self.__post_data("track/reportStreamingEnd", data=events) async def __parse_artist(self, artist_obj): ''' parse qobuz artist object to generic layout ''' diff --git a/music_assistant/playerproviders/chromecast.py b/music_assistant/playerproviders/chromecast.py index 79a19ec1..a0e57463 100644 --- a/music_assistant/playerproviders/chromecast.py +++ b/music_assistant/playerproviders/chromecast.py @@ -34,12 +34,15 @@ class ChromecastPlayer(Player): async def try_chromecast_command(self, cmd:types.MethodType, *args, **kwargs): ''' guard for disconnected socket client ''' - try: - cmd(*args, **kwargs) - except (pychromecast.error.NotConnected, AttributeError): - LOGGER.warning("Chromecast %s is not connected!" % self.name) - except Exception as exc: - LOGGER.warning(exc) + def _try_chromecast_command(_cmd:types.MethodType, *_args, **_kwargs): + try: + _cmd(*_args, **_kwargs) + except (pychromecast.error.NotConnected, AttributeError): + LOGGER.warning("Chromecast %s is not connected!" % self.name) + except Exception as exc: + LOGGER.warning(exc) + return self.mass.event_loop.call_soon_threadsafe( + _try_chromecast_command, cmd, *args, **kwargs) async def cmd_stop(self): ''' send stop command to player ''' @@ -175,22 +178,31 @@ class ChromecastPlayer(Player): else: send_queue() + def __update_group_members(self): + ''' update group members ''' + if not self.mz: + return + try: + self.mz.update_members() + except Exception as exc: + LOGGER.exception(exc) + async def handle_player_state(self, caststatus=None, mediastatus=None, connection_status=None): ''' handle a player state message from the socket ''' # handle connection status if connection_status: if self.mz and connection_status.status == CONNECTION_STATUS_CONNECTED: - return self.mz.update_members() + self.mass.event_loop.call_soon_threadsafe(self.__update_group_members) elif connection_status.status == CONNECTION_STATUS_DISCONNECTED: # schedule a new scan which will handle group parent changes - return self.mass.event_loop.create_task( + self.mass.event_loop.create_task( self.mass.players.providers[self.player_provider].start_chromecast_discovery()) # handle generic cast status if caststatus: - self.name = self.cc.name self.muted = caststatus.volume_muted self.volume_level = caststatus.volume_level * 100 + self.name = self.cc.name # handle media status if mediastatus: if mediastatus.player_state in ['PLAYING', 'BUFFERING']: @@ -211,8 +223,6 @@ class ChromecastPlayer(Player): self.poll_task = False if not self.poll_task and self.state == PlayerState.Playing: self.mass.event_loop.create_task(poll_task()) - # we are called from socket client thread so do this threadsafe! - #asyncio.run_coroutine_threadsafe(self.update(), self.mass.event_loop) class ChromecastProvider(PlayerProvider): ''' support for ChromeCast Audio ''' @@ -236,7 +246,7 @@ class ChromecastProvider(PlayerProvider): player = await self.get_player(added_player) group_player = await self.get_player(str(mz._uuid)) if player and group_player: - player.group_parent = str(mz._uuid) + player.group_parent = group_player.player_id LOGGER.debug("player %s added to group %s" %(player.name, group_player.name)) elif removed_player: player = await self.get_player(added_player) @@ -248,6 +258,7 @@ class ChromecastProvider(PlayerProvider): for member in mz.members: player = await self.get_player(member) if player: + LOGGER.debug("player %s added to group %s" %(player.name, str(mz._uuid))) player.group_parent = str(mz._uuid) @run_periodic(1800) diff --git a/music_assistant/web.py b/music_assistant/web.py index 7da9aedc..e855e9d8 100755 --- a/music_assistant/web.py +++ b/music_assistant/web.py @@ -255,6 +255,8 @@ class Web(): result = await player_cmd(cmd_args) elif player_cmd: result = await player_cmd() + except (Exception, AssertionError) as exc: + LOGGER.warning("Websocket disconnected - %s" % str(exc)) finally: await self.mass.remove_event_listener(cb_id) LOGGER.debug('websocket connection closed')