Some small bugfixes and enhancements (#1117)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 26 Feb 2024 23:44:24 +0000 (00:44 +0100)
committerGitHub <noreply@github.com>
Mon, 26 Feb 2024 23:44:24 +0000 (00:44 +0100)
* Prevent race condition on streamreader when stopping raop streamer

* Handle prevent playback for airplay players

* enforce auth for airport express

* ensure player is powered before considering queue active

* clear previous player queue state

music_assistant/server/controllers/player_queues.py
music_assistant/server/providers/airplay/__init__.py

index 0b0390259b6f832f7ecf47dc8858d2c5ff166229..25e5c3f6e8582e46ee48448a274d7d38cac6ec67 100644 (file)
@@ -712,9 +712,10 @@ class PlayerQueuesController(CoreController):
         queue.available = player.available
         queue.items = len(self._queue_items[queue_id])
         # determine if this queue is currently active for this player
-        queue.active = player.active_source == queue.queue_id
+        queue.active = player.powered and player.active_source == queue.queue_id
         if not queue.active:
             queue.state = PlayerState.IDLE
+            self._prev_states.pop(queue_id, None)
             return
         # update current item from player report
         if queue.flow_mode:
index af8b7f32b2ff6cc59957b7e63b2f13b69d184d07..89e19c694eee2e981e7a3c4ab26978464b92b490 100644 (file)
@@ -177,6 +177,7 @@ class AirplayStreamJob:
         # with the named pipe used to send commands
         self.active_remote_id: str = str(randint(1000, 8000))
         self.start_ntp: int | None = None  # use as checksum
+        self.prevent_playback: bool = False
         self._audio_buffer = asyncio.Queue(2)
         self._log_reader_task: asyncio.Task | None = None
         self._audio_reader_task: asyncio.Task | None = None
@@ -202,6 +203,9 @@ class AirplayStreamJob:
             extra_args += ["-e"]
         if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True):
             extra_args += ["-a"]
+        if "airport" in mass_player.device_info.model.lower():
+            # enforce auth on airport express
+            extra_args += ["-auth"]
         sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0)
         if device_password := self.mass.config.get_raw_player_config_value(
             player_id, CONF_PASSWORD, None
@@ -247,7 +251,7 @@ class AirplayStreamJob:
         self._log_reader_task = asyncio.create_task(self._log_watcher())
         self._audio_reader_task = asyncio.create_task(self._audio_reader())
 
-    async def stop(self):
+    async def stop(self, force=False):
         """Stop playback and cleanup."""
         if not self.running:
             return
@@ -255,9 +259,15 @@ class AirplayStreamJob:
         self._stop_requested = True
         # stop background tasks
         if self._log_reader_task and not self._log_reader_task.done():
-            self._log_reader_task.cancel()
+            if force:
+                self._log_reader_task.cancel()
+            with suppress(asyncio.CancelledError):
+                await self._log_reader_task
         if self._audio_reader_task and not self._audio_reader_task.done():
-            self._audio_reader_task.cancel()
+            if force:
+                self._audio_reader_task.cancel()
+            with suppress(asyncio.CancelledError):
+                await self._audio_reader_task
 
         empty_queue(self._audio_buffer)
         await asyncio.wait_for(self._cliraop_proc.communicate(), 30)
@@ -491,7 +501,7 @@ class AirplayProvider(PlayerProvider):
 
         async def stop_player(airplay_player: AirPlayPlayer) -> None:
             if airplay_player.active_stream:
-                await airplay_player.active_stream.stop()
+                await airplay_player.active_stream.stop(force=False)
             mass_player = self.mass.players.get(airplay_player.player_id)
             mass_player.state = PlayerState.IDLE
             self.mass.players.update(airplay_player.player_id)
@@ -551,7 +561,7 @@ class AirplayProvider(PlayerProvider):
             existing_stream.cancel()
         for airplay_player in self._get_sync_clients(player_id):
             if airplay_player.active_stream and airplay_player.active_stream.running:
-                self.mass.create_task(airplay_player.active_stream.stop())
+                self.mass.create_task(airplay_player.active_stream.stop(force=True))
         # start streaming the queue (pcm) audio in a background task
         queue = self.mass.player_queues.get_active_queue(player_id)
         self._stream_tasks[player_id] = asyncio.create_task(
@@ -588,7 +598,7 @@ class AirplayProvider(PlayerProvider):
         async with asyncio.TaskGroup() as tg:
             for airplay_player in self._get_sync_clients(player_id):
                 if airplay_player.active_stream and airplay_player.active_stream.running:
-                    tg.create_task(airplay_player.active_stream.stop())
+                    tg.create_task(airplay_player.active_stream.stop(force=True))
         if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 44100:
             # TODO: resample on the fly here ?
             raise RuntimeError("Unsupported PCM format")
@@ -629,9 +639,6 @@ class AirplayProvider(PlayerProvider):
 
         # setup Raop process for player and its sync childs
         for airplay_player in self._get_sync_clients(player_id):
-            # make sure that existing stream is stopped
-            if airplay_player.active_stream:
-                await airplay_player.active_stream.stop()
             airplay_player.active_stream = AirplayStreamJob(self, airplay_player)
             await airplay_player.active_stream.init_cliraop(start_ntp)
         prev_metadata_checksum: str = ""
@@ -941,8 +948,17 @@ class AirplayProvider(PlayerProvider):
                 volume = int(path.split("dmcp.volume=", 1)[-1])
                 if abs(volume - mass_player.volume_level) > 2:
                     self.mass.create_task(self.cmd_volume_set(player_id, volume))
+            elif "device-prevent-playback=1" in path:
+                # device switched to another source (or is powered off)
+                if active_stream := airplay_player.active_stream:
+                    active_stream.prevent_playback = True
+                    self.mass.create_task(self.monitor_prevent_playback(player_id))
+            elif "device-prevent-playback=0" in path:
+                # device reports that its ready for playback again
+                if active_stream := airplay_player.active_stream:
+                    active_stream.prevent_playback = False
             else:
-                self.logger.debug(
+                self.logger.info(
                     "Unknown DACP request for %s: %s",
                     airplay_player.discovery_info.name,
                     path,
@@ -1014,3 +1030,27 @@ class AirplayProvider(PlayerProvider):
             if not airplay_player.active_stream or not airplay_player.active_stream.running:
                 continue
             await airplay_player.active_stream.send_cli_command(f"PROGRESS={progress}\n")
+
+    async def monitor_prevent_playback(self, player_id: str):
+        """Monitor the prevent playback state of an airplay player."""
+        count = 0
+        if not (airplay_player := self._players.get(player_id)):
+            return
+        prev_ntp = airplay_player.active_stream.start_ntp
+        while count < 40:
+            count += 1
+            if not (airplay_player := self._players.get(player_id)):
+                return
+            if not airplay_player.active_stream:
+                return
+            if airplay_player.active_stream.start_ntp != prev_ntp:
+                # checksum
+                return
+            if not airplay_player.active_stream.prevent_playback:
+                return
+            await asyncio.sleep(0.25)
+
+        airplay_player.logger.info(
+            "Player has been in prevent playback mode for too long, powering off.",
+        )
+        await self.mass.players.cmd_power(airplay_player.player_id, False)