From: Marcel van der Veldt Date: Sat, 24 Feb 2024 09:27:54 +0000 (+0100) Subject: Some small bugfixes and enhancements (#1106) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=9286492f067052a9b3ab208524a3c75be08f5742;p=music-assistant-server.git Some small bugfixes and enhancements (#1106) * Ignore the closed message in the websocket * Unify sync logic of airplay and slimproto * Better handling of metadata and commands in airplay * updated cliraop binaries * mark player(s) as powered on sync * skip player conf entries for syncgroups * use timeout for cliraop stop * Add some extra logging * Fix weird group power behavior --- diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index f6f580d6..56ebaf0b 100644 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -400,15 +400,17 @@ class PlayerController(CoreController): - player_id: player_id of the player to handle the command. - powered: bool if player should be powered on or off. """ + # forward to syncgroup if needed if player_id.startswith(SYNCGROUP_PREFIX): await self.cmd_group_power(player_id, powered) return + player = self.get(player_id, True) if player.powered == powered: return # nothing to do - # stop player at power off + # always stop player at power off if ( not powered and player.state in (PlayerState.PLAYING, PlayerState.PAUSED) @@ -416,6 +418,7 @@ class PlayerController(CoreController): and player.powered ): await self.cmd_stop(player_id) + # unsync player at power off if not powered: if player.synced_to is not None: @@ -520,9 +523,12 @@ class PlayerController(CoreController): """Handle power command for a PlayerGroup.""" group_player = self.get(player_id, True) - group_player.powered = power - if not power: - group_player.state = PlayerState.IDLE + if group_player.powered == power: + return # nothing to do + + # always stop (group/master)player at power off + if not power and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED): + await self.cmd_stop(player_id) async with asyncio.TaskGroup() as tg: members_powered = False diff --git a/music_assistant/server/controllers/webserver.py b/music_assistant/server/controllers/webserver.py index 0917c969..10cbf285 100644 --- a/music_assistant/server/controllers/webserver.py +++ b/music_assistant/server/controllers/webserver.py @@ -287,7 +287,7 @@ class WebsocketClientHandler: while not wsock.closed: msg = await wsock.receive() - if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING): + if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED): break if msg.type != WSMsgType.TEXT: diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 387bdbf4..ae299cba 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -255,10 +255,18 @@ class AirplayStreamJob: # prefer interactive command to our streamer await self.send_cli_command("ACTION=STOP") # use communicate to clear stdin/stdout and wait for exit - await self._cliraop_proc.wait() - # stop background task - if self._log_reader_task and not self._log_reader_task.done(): - self._log_reader_task.cancel() + try: + await asyncio.wait_for(self._cliraop_proc.wait(), 5) + except TimeoutError: + self.airplay_player.logger.error( # noqa: TRY400 + "RAOP process did not stop on time, attempting forced close." + ) + self._cliraop_proc.kill() + await asyncio.wait_for(self._cliraop_proc.wait(), 5) + finally: + # stop background task + if self._log_reader_task and not self._log_reader_task.done(): + self._log_reader_task.cancel() async def send_cli_command(self, command: str) -> None: """Send an interactive command to the running CLIRaop binary.""" @@ -286,23 +294,27 @@ class AirplayStreamJob: line = line.decode().strip() # noqa: PLW2901 if not line: continue - logger.debug(line) - if "set pause" in line: + if "elapsed milliseconds:" in line: + millis = int(line.split("elapsed milliseconds: ")[1]) + mass_player.elapsed_time = millis / 1000 + mass_player.elapsed_time_last_updated = time.time() + continue # do not log this line, its too verbose + if "set pause" in line or "Pause at" in line: mass_player.state = PlayerState.PAUSED self.mass.players.update(airplay_player.player_id) - elif "Restarted at" in line: + elif "Restarted at" in line or "restarting w/ pause" in line: mass_player.state = PlayerState.PLAYING self.mass.players.update(airplay_player.player_id) - elif "after start), played" in line: - millis = int(line.split("played ")[1].split(" ")[0]) - mass_player.elapsed_time = millis / 1000 - mass_player.elapsed_time_last_updated = time.time() + elif "Stopped at" in line: + mass_player.state = PlayerState.IDLE + self.mass.players.update(airplay_player.player_id) elif "restarting w/o pause" in line: # streaming has started mass_player.state = PlayerState.PLAYING mass_player.elapsed_time = 0 mass_player.elapsed_time_last_updated = time.time() self.mass.players.update(airplay_player.player_id) + logger.debug(line) # if we reach this point, the process exited airplay_player.logger.debug("Log watcher task finished...") @@ -386,6 +398,7 @@ class AirplayProvider(PlayerProvider): server=f"{socket.gethostname()}.local", ) await self.mass.aiozc.async_register_service(self._dacp_info) + self._resync_handle: asyncio.TimerHandle | None = None async def on_mdns_service_state_change( self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None @@ -439,8 +452,11 @@ class AirplayProvider(PlayerProvider): async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]: """Return all (provider/player specific) Config Entries for the given player (if any).""" - entries = await super().get_player_config_entries(player_id) - return entries + PLAYER_CONFIG_ENTRIES + base_entries = await super().get_player_config_entries(player_id) + if player_id not in self._players: + # most probably a syncgroup + return base_entries + return base_entries + PLAYER_CONFIG_ENTRIES async def cmd_stop(self, player_id: str) -> None: """Send STOP command to given player. @@ -501,6 +517,10 @@ class AirplayProvider(PlayerProvider): - seek_position: Optional seek to this position. - fade_in: Optionally fade in the item at playback start. """ + # fix race condition where resync and play media are called at more or less the same time + if self._resync_handle: + self._resync_handle.cancel() + self._resync_handle = None # always stop existing stream first await self.cmd_stop(player_id) # start streaming the queue (pcm) audio in a background task @@ -529,6 +549,10 @@ class AirplayProvider(PlayerProvider): This is a special feature from the Universal Group provider. """ + # fix race condition where resync and play media are called at more or less the same time + if self._resync_handle: + self._resync_handle.cancel() + self._resync_handle = None # always stop existing stream first await self.cmd_stop(player_id) if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 44100: @@ -552,6 +576,12 @@ class AirplayProvider(PlayerProvider): if player.synced_to: # should not happen, but just in case raise RuntimeError("Player is synced") + synced_player_ids = [x.player_id for x in self._get_sync_clients(player_id)] + self.logger.info( + "Starting RAOP stream for Queue %s to %s", + queue.display_name, + "/".join(synced_player_ids), + ) # Python is not suitable for realtime audio streaming. # So, I've decided to go the fancy route here. I've created a small binary @@ -571,6 +601,7 @@ class AirplayProvider(PlayerProvider): airplay_player.active_stream = AirplayStreamJob(self, airplay_player) await airplay_player.active_stream.init_cliraop(start_ntp) prev_metadata_checksum: str = "" + prev_progress_report: float = 0 async for pcm_chunk in audio_iterator: # send audio chunk to player(s) available_clients = 0 @@ -585,12 +616,15 @@ class AirplayProvider(PlayerProvider): continue available_clients += 1 tg.create_task(airplay_player.active_stream.write_chunk(pcm_chunk)) - # always send the progress - tg.create_task( - airplay_player.active_stream.send_cli_command( - f"PROGRESS={int(queue.elapsed_time)}\n" + # send the progress report every 5 seconds + now = time.time() + if now - prev_progress_report >= 5: + prev_progress_report = now + tg.create_task( + airplay_player.active_stream.send_cli_command( + f"PROGRESS={int(queue.elapsed_time)}\n" + ) ) - ) if not available_clients: # this streamjob is no longer active return @@ -640,18 +674,40 @@ class AirplayProvider(PlayerProvider): - player_id: player_id of the player to handle the command. - target_player: player_id of the syncgroup master or group player. """ - player = self.mass.players.get(player_id, raise_unavailable=True) - group_leader = self.mass.players.get(target_player, raise_unavailable=True) - if group_leader.synced_to: + child_player = self.mass.players.get(player_id) + assert child_player # guard + parent_player = self.mass.players.get(target_player) + assert parent_player # guard + if parent_player.synced_to: raise RuntimeError("Player is already synced") - player.synced_to = target_player - group_leader.group_childs.add(player_id) - self.mass.players.update(target_player) - if group_leader.powered: - await self.mass.players.cmd_power(player_id, True) - active_queue = self.mass.player_queues.get_active_queue(group_leader.player_id) + if child_player.synced_to and child_player.synced_to != target_player: + raise RuntimeError("Player is already synced to another player") + # always make sure that the parent player is part of the sync group + parent_player.group_childs.add(parent_player.player_id) + parent_player.group_childs.add(child_player.player_id) + child_player.synced_to = parent_player.player_id + # mark players as powered + parent_player.powered = True + child_player.powered = True + # check if we should (re)start or join a stream session + active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id) if active_queue.state == PlayerState.PLAYING: - self.mass.create_task(self.mass.player_queues.resume(active_queue.queue_id)) + # playback needs to be restarted to form a new multi client stream session + def resync() -> None: + self._resync_handle = None + self.mass.create_task( + self.mass.player_queues.resume(active_queue.queue_id, fade_in=False) + ) + + # this could potentially be called by multiple players at the exact same time + # so we debounce the resync a bit here with a timer + if self._resync_handle: + self._resync_handle.cancel() + self._resync_handle = self.mass.loop.call_later(0.5, resync) + else: + # make sure that the player manager gets an update + self.mass.players.update(child_player.player_id, skip_forward=True) + self.mass.players.update(parent_player.player_id, skip_forward=True) async def cmd_unsync(self, player_id: str) -> None: """Handle UNSYNC command for given player. diff --git a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 index f80b3deb..09d26819 100755 Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 differ diff --git a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 index de944030..0ed63539 100755 Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 differ diff --git a/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 index f66534a5..20689e63 100755 Binary files a/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 differ diff --git a/music_assistant/server/providers/slimproto/__init__.py b/music_assistant/server/providers/slimproto/__init__.py index bbf9388f..0ca6ed55 100644 --- a/music_assistant/server/providers/slimproto/__init__.py +++ b/music_assistant/server/providers/slimproto/__init__.py @@ -459,6 +459,10 @@ class SlimprotoProvider(PlayerProvider): This is a special feature from the Universal Group provider. """ + # fix race condition where resync and play media are called at more or less the same time + if self._resync_handle: + self._resync_handle.cancel() + self._resync_handle = None # forward command to player and any connected sync members sync_clients = self._get_sync_clients(player_id) async with asyncio.TaskGroup() as tg: @@ -551,13 +555,17 @@ class SlimprotoProvider(PlayerProvider): assert child_player # guard parent_player = self.mass.players.get(target_player) assert parent_player # guard + if parent_player.synced_to: + raise RuntimeError("Player is already synced") + if child_player.synced_to and child_player.synced_to != target_player: + raise RuntimeError("Player is already synced to another player") # always make sure that the parent player is part of the sync group parent_player.group_childs.add(parent_player.player_id) parent_player.group_childs.add(child_player.player_id) child_player.synced_to = parent_player.player_id # check if we should (re)start or join a stream session active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id) - if parent_player.state == PlayerState.PLAYING: + if active_queue.state == PlayerState.PLAYING: # playback needs to be restarted to form a new multi client stream session def resync() -> None: self._resync_handle = None @@ -572,8 +580,8 @@ class SlimprotoProvider(PlayerProvider): self._resync_handle = self.mass.loop.call_later(0.5, resync) else: # make sure that the player manager gets an update - self.mass.players.update(child_player.player_id) - self.mass.players.update(parent_player.player_id) + self.mass.players.update(child_player.player_id, skip_forward=True) + self.mass.players.update(parent_player.player_id, skip_forward=True) async def cmd_unsync(self, player_id: str) -> None: """Handle UNSYNC command for given player.""" diff --git a/music_assistant/server/providers/sonos/__init__.py b/music_assistant/server/providers/sonos/__init__.py index b80e00f8..13edb6c6 100644 --- a/music_assistant/server/providers/sonos/__init__.py +++ b/music_assistant/server/providers/sonos/__init__.py @@ -181,6 +181,7 @@ class SonosPlayerProvider(PlayerProvider): """Return Config Entries for the given player.""" base_entries = await super().get_player_config_entries(player_id) if not (sonos_player := self.sonosplayers.get(player_id)): + # most probably a syncgroup return base_entries return ( *base_entries,