Some small bugfixes and enhancements (#1106)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 24 Feb 2024 09:27:54 +0000 (10:27 +0100)
committerGitHub <noreply@github.com>
Sat, 24 Feb 2024 09:27:54 +0000 (10:27 +0100)
* Ignore the closed message in the websocket

* Unify sync logic of airplay and slimproto

* Better handling of metadata and commands in airplay

* updated cliraop binaries

* mark player(s) as powered on sync

* skip player conf entries for syncgroups

* use timeout for cliraop stop

* Add some extra logging

* Fix weird group power behavior

music_assistant/server/controllers/players.py
music_assistant/server/controllers/webserver.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64
music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64
music_assistant/server/providers/airplay/bin/cliraop-macos-arm64
music_assistant/server/providers/slimproto/__init__.py
music_assistant/server/providers/sonos/__init__.py

index f6f580d6c4faf65819e9be80b311bba9474d0b9c..56ebaf0b73e62c76c9d83a35494bc7ead2e35de8 100644 (file)
@@ -400,15 +400,17 @@ class PlayerController(CoreController):
         - player_id: player_id of the player to handle the command.
         - powered: bool if player should be powered on or off.
         """
+        # forward to syncgroup if needed
         if player_id.startswith(SYNCGROUP_PREFIX):
             await self.cmd_group_power(player_id, powered)
             return
+
         player = self.get(player_id, True)
 
         if player.powered == powered:
             return  # nothing to do
 
-        # stop player at power off
+        # always stop player at power off
         if (
             not powered
             and player.state in (PlayerState.PLAYING, PlayerState.PAUSED)
@@ -416,6 +418,7 @@ class PlayerController(CoreController):
             and player.powered
         ):
             await self.cmd_stop(player_id)
+
         # unsync player at power off
         if not powered:
             if player.synced_to is not None:
@@ -520,9 +523,12 @@ class PlayerController(CoreController):
         """Handle power command for a PlayerGroup."""
         group_player = self.get(player_id, True)
 
-        group_player.powered = power
-        if not power:
-            group_player.state = PlayerState.IDLE
+        if group_player.powered == power:
+            return  # nothing to do
+
+        # always stop (group/master)player at power off
+        if not power and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
+            await self.cmd_stop(player_id)
 
         async with asyncio.TaskGroup() as tg:
             members_powered = False
index 0917c96917576d5daf07b09e802e0a590d010abf..10cbf285e4a34f419ce4f2eecd4b8bae4f5df46f 100644 (file)
@@ -287,7 +287,7 @@ class WebsocketClientHandler:
             while not wsock.closed:
                 msg = await wsock.receive()
 
-                if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING):
+                if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
                     break
 
                 if msg.type != WSMsgType.TEXT:
index 387bdbf48223d08854490fa2b735f04160e106c2..ae299cbac40777fcacd37a3c7e0da097855f6de5 100644 (file)
@@ -255,10 +255,18 @@ class AirplayStreamJob:
         # prefer interactive command to our streamer
         await self.send_cli_command("ACTION=STOP")
         # use communicate to clear stdin/stdout and wait for exit
-        await self._cliraop_proc.wait()
-        # stop background task
-        if self._log_reader_task and not self._log_reader_task.done():
-            self._log_reader_task.cancel()
+        try:
+            await asyncio.wait_for(self._cliraop_proc.wait(), 5)
+        except TimeoutError:
+            self.airplay_player.logger.error(  # noqa: TRY400
+                "RAOP process did not stop on time, attempting forced close."
+            )
+            self._cliraop_proc.kill()
+            await asyncio.wait_for(self._cliraop_proc.wait(), 5)
+        finally:
+            # stop background task
+            if self._log_reader_task and not self._log_reader_task.done():
+                self._log_reader_task.cancel()
 
     async def send_cli_command(self, command: str) -> None:
         """Send an interactive command to the running CLIRaop binary."""
@@ -286,23 +294,27 @@ class AirplayStreamJob:
             line = line.decode().strip()  # noqa: PLW2901
             if not line:
                 continue
-            logger.debug(line)
-            if "set pause" in line:
+            if "elapsed milliseconds:" in line:
+                millis = int(line.split("elapsed milliseconds: ")[1])
+                mass_player.elapsed_time = millis / 1000
+                mass_player.elapsed_time_last_updated = time.time()
+                continue  # do not log this line, its too verbose
+            if "set pause" in line or "Pause at" in line:
                 mass_player.state = PlayerState.PAUSED
                 self.mass.players.update(airplay_player.player_id)
-            elif "Restarted at" in line:
+            elif "Restarted at" in line or "restarting w/ pause" in line:
                 mass_player.state = PlayerState.PLAYING
                 self.mass.players.update(airplay_player.player_id)
-            elif "after start), played" in line:
-                millis = int(line.split("played ")[1].split(" ")[0])
-                mass_player.elapsed_time = millis / 1000
-                mass_player.elapsed_time_last_updated = time.time()
+            elif "Stopped at" in line:
+                mass_player.state = PlayerState.IDLE
+                self.mass.players.update(airplay_player.player_id)
             elif "restarting w/o pause" in line:
                 # streaming has started
                 mass_player.state = PlayerState.PLAYING
                 mass_player.elapsed_time = 0
                 mass_player.elapsed_time_last_updated = time.time()
                 self.mass.players.update(airplay_player.player_id)
+            logger.debug(line)
 
         # if we reach this point, the process exited
         airplay_player.logger.debug("Log watcher task finished...")
@@ -386,6 +398,7 @@ class AirplayProvider(PlayerProvider):
             server=f"{socket.gethostname()}.local",
         )
         await self.mass.aiozc.async_register_service(self._dacp_info)
+        self._resync_handle: asyncio.TimerHandle | None = None
 
     async def on_mdns_service_state_change(
         self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
@@ -439,8 +452,11 @@ class AirplayProvider(PlayerProvider):
 
     async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
         """Return all (provider/player specific) Config Entries for the given player (if any)."""
-        entries = await super().get_player_config_entries(player_id)
-        return entries + PLAYER_CONFIG_ENTRIES
+        base_entries = await super().get_player_config_entries(player_id)
+        if player_id not in self._players:
+            # most probably a syncgroup
+            return base_entries
+        return base_entries + PLAYER_CONFIG_ENTRIES
 
     async def cmd_stop(self, player_id: str) -> None:
         """Send STOP command to given player.
@@ -501,6 +517,10 @@ class AirplayProvider(PlayerProvider):
             - seek_position: Optional seek to this position.
             - fade_in: Optionally fade in the item at playback start.
         """
+        # fix race condition where resync and play media are called at more or less the same time
+        if self._resync_handle:
+            self._resync_handle.cancel()
+            self._resync_handle = None
         # always stop existing stream first
         await self.cmd_stop(player_id)
         # start streaming the queue (pcm) audio in a background task
@@ -529,6 +549,10 @@ class AirplayProvider(PlayerProvider):
 
         This is a special feature from the Universal Group provider.
         """
+        # fix race condition where resync and play media are called at more or less the same time
+        if self._resync_handle:
+            self._resync_handle.cancel()
+            self._resync_handle = None
         # always stop existing stream first
         await self.cmd_stop(player_id)
         if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 44100:
@@ -552,6 +576,12 @@ class AirplayProvider(PlayerProvider):
         if player.synced_to:
             # should not happen, but just in case
             raise RuntimeError("Player is synced")
+        synced_player_ids = [x.player_id for x in self._get_sync_clients(player_id)]
+        self.logger.info(
+            "Starting RAOP stream for Queue %s to %s",
+            queue.display_name,
+            "/".join(synced_player_ids),
+        )
 
         # Python is not suitable for realtime audio streaming.
         # So, I've decided to go the fancy route here. I've created a small binary
@@ -571,6 +601,7 @@ class AirplayProvider(PlayerProvider):
             airplay_player.active_stream = AirplayStreamJob(self, airplay_player)
             await airplay_player.active_stream.init_cliraop(start_ntp)
         prev_metadata_checksum: str = ""
+        prev_progress_report: float = 0
         async for pcm_chunk in audio_iterator:
             # send audio chunk to player(s)
             available_clients = 0
@@ -585,12 +616,15 @@ class AirplayProvider(PlayerProvider):
                         continue
                     available_clients += 1
                     tg.create_task(airplay_player.active_stream.write_chunk(pcm_chunk))
-                    # always send the progress
-                    tg.create_task(
-                        airplay_player.active_stream.send_cli_command(
-                            f"PROGRESS={int(queue.elapsed_time)}\n"
+                    # send the progress report every 5 seconds
+                    now = time.time()
+                    if now - prev_progress_report >= 5:
+                        prev_progress_report = now
+                        tg.create_task(
+                            airplay_player.active_stream.send_cli_command(
+                                f"PROGRESS={int(queue.elapsed_time)}\n"
+                            )
                         )
-                    )
             if not available_clients:
                 # this streamjob is no longer active
                 return
@@ -640,18 +674,40 @@ class AirplayProvider(PlayerProvider):
             - player_id: player_id of the player to handle the command.
             - target_player: player_id of the syncgroup master or group player.
         """
-        player = self.mass.players.get(player_id, raise_unavailable=True)
-        group_leader = self.mass.players.get(target_player, raise_unavailable=True)
-        if group_leader.synced_to:
+        child_player = self.mass.players.get(player_id)
+        assert child_player  # guard
+        parent_player = self.mass.players.get(target_player)
+        assert parent_player  # guard
+        if parent_player.synced_to:
             raise RuntimeError("Player is already synced")
-        player.synced_to = target_player
-        group_leader.group_childs.add(player_id)
-        self.mass.players.update(target_player)
-        if group_leader.powered:
-            await self.mass.players.cmd_power(player_id, True)
-        active_queue = self.mass.player_queues.get_active_queue(group_leader.player_id)
+        if child_player.synced_to and child_player.synced_to != target_player:
+            raise RuntimeError("Player is already synced to another player")
+        # always make sure that the parent player is part of the sync group
+        parent_player.group_childs.add(parent_player.player_id)
+        parent_player.group_childs.add(child_player.player_id)
+        child_player.synced_to = parent_player.player_id
+        # mark players as powered
+        parent_player.powered = True
+        child_player.powered = True
+        # check if we should (re)start or join a stream session
+        active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id)
         if active_queue.state == PlayerState.PLAYING:
-            self.mass.create_task(self.mass.player_queues.resume(active_queue.queue_id))
+            # playback needs to be restarted to form a new multi client stream session
+            def resync() -> None:
+                self._resync_handle = None
+                self.mass.create_task(
+                    self.mass.player_queues.resume(active_queue.queue_id, fade_in=False)
+                )
+
+            # this could potentially be called by multiple players at the exact same time
+            # so we debounce the resync a bit here with a timer
+            if self._resync_handle:
+                self._resync_handle.cancel()
+            self._resync_handle = self.mass.loop.call_later(0.5, resync)
+        else:
+            # make sure that the player manager gets an update
+            self.mass.players.update(child_player.player_id, skip_forward=True)
+            self.mass.players.update(parent_player.player_id, skip_forward=True)
 
     async def cmd_unsync(self, player_id: str) -> None:
         """Handle UNSYNC command for given player.
index f80b3debab8da3802857958238b0dfe593126c91..09d268195c3277cec67d35b98167bf457143d247 100755 (executable)
Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 differ
index de944030f1ac5889bdd1f84c538720a78904ba7a..0ed63539b2ac75f1565946bb504f529f5c655f25 100755 (executable)
Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 differ
index f66534a5321e56f9bf702a58c155140854172b67..20689e636cebef286c4b0957906abdbf75eb986e 100755 (executable)
Binary files a/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 differ
index bbf9388fe49abb24d4252cc8d72c1a74445ade31..0ca6ed55150f9187f7e204a194657fbb3f86db61 100644 (file)
@@ -459,6 +459,10 @@ class SlimprotoProvider(PlayerProvider):
 
         This is a special feature from the Universal Group provider.
         """
+        # fix race condition where resync and play media are called at more or less the same time
+        if self._resync_handle:
+            self._resync_handle.cancel()
+            self._resync_handle = None
         # forward command to player and any connected sync members
         sync_clients = self._get_sync_clients(player_id)
         async with asyncio.TaskGroup() as tg:
@@ -551,13 +555,17 @@ class SlimprotoProvider(PlayerProvider):
         assert child_player  # guard
         parent_player = self.mass.players.get(target_player)
         assert parent_player  # guard
+        if parent_player.synced_to:
+            raise RuntimeError("Player is already synced")
+        if child_player.synced_to and child_player.synced_to != target_player:
+            raise RuntimeError("Player is already synced to another player")
         # always make sure that the parent player is part of the sync group
         parent_player.group_childs.add(parent_player.player_id)
         parent_player.group_childs.add(child_player.player_id)
         child_player.synced_to = parent_player.player_id
         # check if we should (re)start or join a stream session
         active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id)
-        if parent_player.state == PlayerState.PLAYING:
+        if active_queue.state == PlayerState.PLAYING:
             # playback needs to be restarted to form a new multi client stream session
             def resync() -> None:
                 self._resync_handle = None
@@ -572,8 +580,8 @@ class SlimprotoProvider(PlayerProvider):
             self._resync_handle = self.mass.loop.call_later(0.5, resync)
         else:
             # make sure that the player manager gets an update
-            self.mass.players.update(child_player.player_id)
-            self.mass.players.update(parent_player.player_id)
+            self.mass.players.update(child_player.player_id, skip_forward=True)
+            self.mass.players.update(parent_player.player_id, skip_forward=True)
 
     async def cmd_unsync(self, player_id: str) -> None:
         """Handle UNSYNC command for given player."""
index b80e00f8693b389c6ccdcae7b3fcba9fb7392efa..13edb6c68b04e3d458e7764a8dd7ca24859ac672 100644 (file)
@@ -181,6 +181,7 @@ class SonosPlayerProvider(PlayerProvider):
         """Return Config Entries for the given player."""
         base_entries = await super().get_player_config_entries(player_id)
         if not (sonos_player := self.sonosplayers.get(player_id)):
+            # most probably a syncgroup
             return base_entries
         return (
             *base_entries,