Fix AirPlay playback gets mangled up between different streams (#1555)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 11 Aug 2024 22:35:52 +0000 (00:35 +0200)
committerGitHub <noreply@github.com>
Sun, 11 Aug 2024 22:35:52 +0000 (00:35 +0200)
music_assistant/common/models/player_queue.py
music_assistant/server/controllers/player_queues.py
music_assistant/server/controllers/players.py
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/util.py
music_assistant/server/providers/airplay/__init__.py

index 87e37e8e01adcccc14cb5238de036ee5a4015e2d..8f16281a101d324f8a5b15724f6a502c3ae936c3 100644 (file)
@@ -53,7 +53,6 @@ class PlayerQueue(DataClassDictMixin):
         d.pop("current_item", None)
         d.pop("next_item", None)
         d.pop("index_in_buffer", None)
-        d.pop("announcement_in_progress", None)
         d.pop("flow_mode", None)
         d.pop("flow_mode_start_index", None)
         return d
@@ -64,7 +63,6 @@ class PlayerQueue(DataClassDictMixin):
         d.pop("current_item", None)
         d.pop("next_item", None)
         d.pop("index_in_buffer", None)
-        d.pop("announcement_in_progress", None)
         d.pop("flow_mode", None)
         d.pop("flow_mode_start_index", None)
         return cls.from_dict(d)
index e4643910e183da1e2493666685e7037bd6d6faf6..d3a6e6bdacc6f9568f0a33642df1079cb4cad85f 100644 (file)
@@ -743,10 +743,15 @@ class PlayerQueuesController(CoreController):
             self.mass, queue_item, seek_position=seek_position, fade_in=fade_in
         )
         # send play_media request to player
-        await self.mass.players.play_media(
+        # NOTE that we debounce this a bit to account for someone hitting the next button
+        # like a madman. This will prevent the player from being overloaded with requests.
+        self.mass.call_later(
+            0.25,
+            self.mass.players.play_media,
             player_id=queue_id,
             # transform into PlayerMedia to send to the actual player implementation
             media=self.player_media_from_queue_item(queue_item, queue.flow_mode),
+            task_id=f"play_media_{queue_id}",
         )
 
     # Interaction with player
index c9beeafa35a3c0d26f397e67cbe3e507113906e2..6e0597549bb9cdae2377ee7e70945ece4a90f713 100644 (file)
@@ -106,6 +106,7 @@ class PlayerController(CoreController):
         )
         self.manifest.icon = "speaker-multiple"
         self._poll_task: asyncio.Task | None = None
+        self._player_locks: dict[str, asyncio.Lock] = {}
 
     async def setup(self, config: CoreConfig) -> None:
         """Async initialize of module."""
@@ -199,7 +200,8 @@ class PlayerController(CoreController):
             await self.mass.player_queues.play(player_id)
             return
         player_provider = self.get_player_provider(player_id)
-        await player_provider.cmd_play(player_id)
+        async with self._player_locks[player_id]:
+            await player_provider.cmd_play(player_id)
 
     @api_command("players/cmd/pause")
     @handle_player_command
@@ -242,7 +244,6 @@ class PlayerController(CoreController):
         self.mass.create_task(_watch_pause(player_id))
 
     @api_command("players/cmd/play_pause")
-    @handle_player_command
     async def cmd_play_pause(self, player_id: str) -> None:
         """Toggle play/pause on given player.
 
@@ -292,7 +293,8 @@ class PlayerController(CoreController):
         if PlayerFeature.POWER in player.supported_features:
             # forward to player provider
             player_provider = self.get_player_provider(player_id)
-            await player_provider.cmd_power(player_id, powered)
+            async with self._player_locks[player_id]:
+                await player_provider.cmd_power(player_id, powered)
         else:
             # allow the stop command to process and prevent race conditions
             await asyncio.sleep(0.2)
@@ -332,7 +334,8 @@ class PlayerController(CoreController):
             msg = f"Player {player.display_name} does not support volume_set"
             raise UnsupportedFeaturedException(msg)
         player_provider = self.get_player_provider(player_id)
-        await player_provider.cmd_volume_set(player_id, volume_level)
+        async with self._player_locks[player_id]:
+            await player_provider.cmd_volume_set(player_id, volume_level)
 
     @api_command("players/cmd/volume_up")
     @handle_player_command
@@ -438,7 +441,8 @@ class PlayerController(CoreController):
             msg = f"Player {player.display_name} does not support muting"
             raise UnsupportedFeaturedException(msg)
         player_provider = self.get_player_provider(player_id)
-        await player_provider.cmd_volume_mute(player_id, muted)
+        async with self._player_locks[player_id]:
+            await player_provider.cmd_volume_mute(player_id, muted)
 
     @api_command("players/cmd/seek")
     async def cmd_seek(self, player_id: str, position: int) -> None:
@@ -466,8 +470,8 @@ class PlayerController(CoreController):
     ) -> None:
         """Handle playback of an announcement (url) on given player."""
         player = self.get(player_id, True)
-        if player.announcement_in_progress:
-            return
+        while player.announcement_in_progress:
+            await asyncio.sleep(0.5)
         if not url.startswith("http"):
             raise PlayerCommandFailed("Only URLs are supported for announcements")
         try:
@@ -576,7 +580,8 @@ class PlayerController(CoreController):
                 )
             return
         player_prov = self.mass.players.get_player_provider(player_id)
-        await player_prov.enqueue_next_media(player_id=player_id, media=media)
+        async with self._player_locks[player_id]:
+            await player_prov.enqueue_next_media(player_id=player_id, media=media)
 
     @api_command("players/cmd/sync")
     @handle_player_command
@@ -628,7 +633,7 @@ class PlayerController(CoreController):
                 continue
             if child_player.synced_to and child_player.synced_to == target_player:
                 continue  # already synced to this target
-            if child_player.synced_to and child_player.synced_to != target_player:
+            elif child_player.synced_to:
                 # player already synced to another player, unsync first
                 self.logger.warning(
                     "Player %s is already synced, unsyncing first", child_player.name
@@ -649,7 +654,8 @@ class PlayerController(CoreController):
 
         # forward command to the player provider after all (base) sanity checks
         player_provider = self.get_player_provider(target_player)
-        await player_provider.cmd_sync_many(target_player, child_player_ids)
+        async with self._player_locks[target_player]:
+            await player_provider.cmd_sync_many(target_player, child_player_ids)
 
     @api_command("players/cmd/unsync_many")
     async def cmd_unsync_many(self, player_ids: list[str]) -> None:
@@ -735,6 +741,9 @@ class PlayerController(CoreController):
         # register playerqueue for this player
         self.mass.create_task(self.mass.player_queues.on_player_register(player))
 
+        # register lock for this player
+        self._player_locks[player_id] = asyncio.Lock()
+
         self._players[player_id] = player
 
         # ignore disabled players
index d84900fa3a523732133ea6aed539e07af6b9da3c..44dfd1269493dc65ce5f8132b35a41db95756a70 100644 (file)
@@ -498,6 +498,9 @@ class StreamsController(CoreController):
         use_crossfade = await self.mass.config.get_player_config_value(
             queue.queue_id, CONF_CROSSFADE
         )
+        if not start_queue_item:
+            # this can happen in some (edge case) race conditions
+            return
         if start_queue_item.media_type != MediaType.TRACK:
             use_crossfade = False
         pcm_sample_size = int(
@@ -775,6 +778,9 @@ class StreamsController(CoreController):
                     # del chunk
                 finished = True
         finally:
+            if "ffmpeg_proc" not in locals():
+                # edge case: ffmpeg process was not yet started
+                return  # noqa: B012
             if finished and not ffmpeg_proc.closed:
                 await asyncio.wait_for(ffmpeg_proc.wait(), 60)
             elif not ffmpeg_proc.closed:
index b5afcc263bd635b2d00c1630a57430e21f37255f..b1be4202e1e0c15d0060499cdd792655dab38efc 100644 (file)
@@ -149,9 +149,9 @@ class TaskManager:
         self.mass = mass
         self._tasks: list[asyncio.Task] = []
 
-    def create_task(self, coro: Coroutine) -> None:
+    def create_task(self, coro: Coroutine, eager_start: bool = False) -> None:
         """Create a new task and add it to the manager."""
-        task = self.mass.create_task(coro)
+        task = self.mass.create_task(coro, eager_start=eager_start)
         self._tasks.append(task)
 
     async def __aenter__(self) -> Self:
index db046689001c13d0b0c3f8a8078e47037bfc18a1..b1af1eb254165bc1a4e7f4b55cc0cb42f963c51b 100644 (file)
@@ -9,7 +9,6 @@ import platform
 import socket
 import time
 from contextlib import suppress
-from dataclasses import dataclass
 from random import randint, randrange
 from typing import TYPE_CHECKING
 
@@ -17,7 +16,7 @@ from zeroconf import IPVersion, ServiceStateChange
 from zeroconf.asyncio import AsyncServiceInfo
 
 from music_assistant.common.helpers.datetime import utc
-from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port
+from music_assistant.common.helpers.util import get_ip_pton, select_free_port
 from music_assistant.common.models.config_entries import (
     CONF_ENTRY_CROSSFADE,
     CONF_ENTRY_CROSSFADE_DURATION,
@@ -29,6 +28,7 @@ from music_assistant.common.models.config_entries import (
     CONF_ENTRY_SYNC_ADJUST,
     ConfigEntry,
     ConfigValueType,
+    ProviderConfig,
     create_sample_rates_config_entry,
 )
 from music_assistant.common.models.enums import (
@@ -43,7 +43,9 @@ from music_assistant.common.models.enums import (
 from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
 from music_assistant.common.models.player_queue import PlayerQueue
+from music_assistant.common.models.provider import ProviderManifest
 from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
+from music_assistant.server import MusicAssistant
 from music_assistant.server.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params
 from music_assistant.server.helpers.process import AsyncProcess, check_output
 from music_assistant.server.helpers.util import TaskManager
@@ -202,13 +204,13 @@ def get_primary_ip_address(discovery_info: AsyncServiceInfo) -> str | None:
     return None
 
 
-class AirplayStream:
-    """Object that holds the details of a stream job."""
+class RaopStream:
+    """Object that holds the details of a (RAOP) stream job."""
 
     def __init__(
         self, prov: AirplayProvider, airplay_player: AirPlayPlayer, input_format: AudioFormat
     ) -> None:
-        """Initialize AirplayStream."""
+        """Initialize RaopStream."""
         self.prov = prov
         self.mass = prov.mass
         self.airplay_player = airplay_player
@@ -299,7 +301,7 @@ class AirplayStream:
         await self._cliraop_proc.start()
         await asyncio.to_thread(os.close, read)
         self._started.set()
-        self._log_reader_task = asyncio.create_task(self._log_watcher())
+        self._log_reader_task = self.mass.create_task(self._log_watcher())
 
     async def stop(self):
         """Stop playback and cleanup."""
@@ -310,20 +312,12 @@ class AirplayStream:
         if self._cliraop_proc.proc and not self._cliraop_proc.closed:
             await self.send_cli_command("ACTION=STOP")
         self._stopped = True  # set after send_cli command!
-        if self._cliraop_proc.proc:
-            try:
-                await asyncio.wait_for(self._cliraop_proc.wait(), 5)
-            except TimeoutError:
-                self.prov.logger.warning(
-                    "Raop process for %s did not stop in time, is the player offline?",
-                    self.airplay_player.player_id,
-                )
-                await self._cliraop_proc.close(True)
-
-        # ffmpeg can sometimes hang due to the connected pipes
-        # we handle closing it but it can be a bit slow so do that in the background
-        if not self._ffmpeg_proc.closed:
-            self.mass.create_task(self._ffmpeg_proc.close(True))
+        if self._cliraop_proc.proc and not self._cliraop_proc.closed:
+            await self._cliraop_proc.close(True)
+        if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
+            await self._ffmpeg_proc.close(True)
+        self._cliraop_proc = None
+        self._ffmpeg_proc = None
 
     async def write_chunk(self, chunk: bytes) -> None:
         """Write a (pcm) audio chunk."""
@@ -354,7 +348,7 @@ class AirplayStream:
 
         named_pipe = f"/tmp/raop-{self.active_remote_id}"  # noqa: S108
         self.airplay_player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
-        await self.mass.create_task(send_data)
+        await asyncio.to_thread(send_data)
 
     async def _log_watcher(self) -> None:
         """Monitor stderr for the running CLIRaop process."""
@@ -415,7 +409,7 @@ class AirplayStream:
             logger.log(VERBOSE_LOG_LEVEL, line)
 
         # if we reach this point, the process exited
-        if airplay_player.active_stream == self:
+        if airplay_player.raop_stream == self:
             mass_player.state = PlayerState.IDLE
             self.mass.players.update(airplay_player.player_id)
         # ensure we're cleaned up afterwards (this also logs the returncode)
@@ -467,15 +461,39 @@ class AirplayStream:
         await self.send_cli_command(f"PROGRESS={progress}\n")
 
 
-@dataclass
 class AirPlayPlayer:
     """Holds the details of the (discovered) Airplay (RAOP) player."""
 
-    player_id: str
-    discovery_info: AsyncServiceInfo
-    address: str
-    logger: logging.Logger
-    active_stream: AirplayStream | None = None
+    def __init__(
+        self, prov: AirplayProvider, player_id: str, discovery_info: AsyncServiceInfo, address: str
+    ) -> None:
+        """Initialize AirPlayPlayer."""
+        self.prov = prov
+        self.mass = prov.mass
+        self.player_id = player_id
+        self.discovery_info = discovery_info
+        self.address = address
+        self.logger = prov.logger.getChild(player_id)
+        self.raop_stream: RaopStream | None = None
+
+    async def cmd_stop(self, update_state: bool = True) -> None:
+        """Send STOP command to player."""
+        if self.raop_stream:
+            await self.raop_stream.stop()
+        if update_state and (mass_player := self.mass.players.get(self.player_id)):
+            mass_player.state = PlayerState.IDLE
+            self.mass.players.update(mass_player.player_id)
+
+    async def cmd_play(self) -> None:
+        """Send PLAY (unpause) command to player."""
+        if self.raop_stream and self.raop_stream.running:
+            await self.raop_stream.send_cli_command("ACTION=PLAY")
+
+    async def cmd_pause(self) -> None:
+        """Send PAUSE command to player."""
+        if not self.raop_stream or not self.raop_stream.running:
+            return
+        await self.raop_stream.send_cli_command("ACTION=PAUSE")
 
 
 class AirplayProvider(PlayerProvider):
@@ -486,6 +504,7 @@ class AirplayProvider(PlayerProvider):
     _discovery_running: bool = False
     _dacp_server: asyncio.Server = None
     _dacp_info: AsyncServiceInfo = None
+    _play_media_lock: asyncio.Lock = asyncio.Lock()
 
     @property
     def supported_features(self) -> tuple[ProviderFeature, ...]:
@@ -593,11 +612,7 @@ class AirplayProvider(PlayerProvider):
         # forward command to player and any connected sync members
         async with TaskManager(self.mass) as tg:
             for airplay_player in self._get_sync_clients(player_id):
-                if airplay_player.active_stream:
-                    tg.create_task(airplay_player.active_stream.stop())
-                if mass_player := self.mass.players.get(airplay_player.player_id):
-                    mass_player.state = PlayerState.IDLE
-                    self.mass.players.update(mass_player.player_id)
+                tg.create_task(airplay_player.cmd_stop())
 
     async def cmd_play(self, player_id: str) -> None:
         """Send PLAY (unpause) command to given player.
@@ -607,9 +622,7 @@ class AirplayProvider(PlayerProvider):
         # forward command to player and any connected sync members
         async with TaskManager(self.mass) as tg:
             for airplay_player in self._get_sync_clients(player_id):
-                if airplay_player.active_stream and airplay_player.active_stream.running:
-                    # prefer interactive command to our streamer
-                    tg.create_task(airplay_player.active_stream.send_cli_command("ACTION=PLAY"))
+                tg.create_task(airplay_player.cmd_play())
 
     async def cmd_pause(self, player_id: str) -> None:
         """Send PAUSE command to given player.
@@ -625,9 +638,7 @@ class AirplayProvider(PlayerProvider):
             await self.cmd_stop(player_id)
             return
         airplay_player = self._players[player_id]
-        if airplay_player.active_stream and airplay_player.active_stream.running:
-            # prefer interactive command to our streamer
-            await airplay_player.active_stream.send_cli_command("ACTION=PAUSE")
+        await airplay_player.cmd_pause()
 
     async def play_media(
         self,
@@ -635,6 +646,7 @@ class AirplayProvider(PlayerProvider):
         media: PlayerMedia,
     ) -> None:
         """Handle PLAY MEDIA on given player."""
+        await self._play_media_lock.acquire()
         player = self.mass.players.get(player_id)
         if player.synced_to:
             # should not happen, but just in case
@@ -642,8 +654,7 @@ class AirplayProvider(PlayerProvider):
         # always stop existing stream first
         async with TaskManager(self.mass) as tg:
             for airplay_player in self._get_sync_clients(player_id):
-                if active_stream := airplay_player.active_stream:
-                    tg.create_task(active_stream.stop())
+                tg.create_task(airplay_player.cmd_stop(update_state=False))
         # select audio source
         if media.media_type == MediaType.ANNOUNCEMENT:
             # special case: stream announcement
@@ -692,57 +703,35 @@ class AirplayProvider(PlayerProvider):
         # timestamped playback, which reads pcm audio from stdin
         # and we can send some interactive commands using a named pipe.
 
-        # setup AirplayStream for player and its sync childs
+        # setup RaopStream for player and its sync childs
         sync_clients = self._get_sync_clients(player_id)
         for airplay_player in sync_clients:
-            airplay_player.active_stream = AirplayStream(
-                self, airplay_player, input_format=input_format
-            )
-
-        # use a buffer here to consume small hiccups as the
-        # raop streaming is pretty much realtime and without a buffer to stdin
-        buffer: asyncio.Queue[bytes] = asyncio.Queue(10)
-
-        async def fill_buffer() -> None:
-            async for chunk in audio_source:
-                await buffer.put(chunk)
-            await buffer.put(b"EOF")
-
-        fill_buffer_task = asyncio.create_task(fill_buffer())
+            airplay_player.raop_stream = RaopStream(self, airplay_player, input_format=input_format)
 
         async def audio_streamer() -> None:
-            try:
-                while True:
-                    chunk = await buffer.get()
-                    if chunk == b"EOF":
-                        break
-                    await asyncio.gather(
-                        *[x.active_stream.write_chunk(chunk) for x in sync_clients],
-                        return_exceptions=True,
-                    )
-
-                # entire stream consumed: send EOF
+            async for chunk in audio_source:
                 await asyncio.gather(
-                    *[x.active_stream.write_eof() for x in sync_clients],
+                    *[x.raop_stream.write_chunk(chunk) for x in sync_clients],
                     return_exceptions=True,
                 )
-
-            finally:
-                if not fill_buffer_task.done():
-                    fill_buffer_task.cancel()
-                empty_queue(buffer)
+            # entire stream consumed: send EOF
+            await asyncio.gather(
+                *[x.raop_stream.write_eof() for x in sync_clients],
+                return_exceptions=True,
+            )
 
         # get current ntp and start cliraop
         _, stdout = await check_output(self.cliraop_bin, "-ntp")
         start_ntp = int(stdout.strip())
         wait_start = 1250 + (250 * len(sync_clients))
         await asyncio.gather(
-            *[x.active_stream.start(start_ntp, wait_start) for x in sync_clients],
+            *[x.raop_stream.start(start_ntp, wait_start) for x in sync_clients],
             return_exceptions=True,
         )
-        self._players[player_id].active_stream.audio_source_task = asyncio.create_task(
+        self._players[player_id].raop_stream.audio_source_task = asyncio.create_task(
             audio_streamer()
         )
+        self._play_media_lock.release()
 
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
         """Send VOLUME_SET command to given player.
@@ -751,8 +740,8 @@ class AirplayProvider(PlayerProvider):
         - volume_level: volume level (0..100) to set on the player.
         """
         airplay_player = self._players[player_id]
-        if airplay_player.active_stream:
-            await airplay_player.active_stream.send_cli_command(f"VOLUME={volume_level}\n")
+        if airplay_player.raop_stream and airplay_player.raop_stream.running:
+            await airplay_player.raop_stream.send_cli_command(f"VOLUME={volume_level}\n")
         mass_player = self.mass.players.get(player_id)
         mass_player.volume_level = volume_level
         self.mass.players.update(player_id)
@@ -767,6 +756,8 @@ class AirplayProvider(PlayerProvider):
             - player_id: player_id of the player to handle the command.
             - target_player: player_id of the syncgroup master or group player.
         """
+        if player_id == target_player:
+            return
         child_player = self.mass.players.get(player_id)
         assert child_player  # guard
         parent_player = self.mass.players.get(target_player)
@@ -813,9 +804,6 @@ class AirplayProvider(PlayerProvider):
         group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True)
         group_leader.group_childs.remove(player_id)
         player.synced_to = None
-        # guard if this was the last sync child of the group player
-        if group_leader.group_childs == {group_leader.player_id}:
-            group_leader.group_childs.remove(group_leader.player_id)
         await self.cmd_stop(player_id)
         self.mass.players.update(player_id)
 
@@ -869,9 +857,7 @@ class AirplayProvider(PlayerProvider):
         if address is None:
             return
         self.logger.debug("Discovered Airplay device %s on %s", display_name, address)
-        self._players[player_id] = AirPlayPlayer(
-            player_id, discovery_info=info, address=address, logger=self.logger.getChild(player_id)
-        )
+        self._players[player_id] = AirPlayPlayer(self, player_id, info, address)
         manufacturer, model = get_model_from_am(info.decoded_properties.get("am"))
         if "apple tv" in model.lower():
             # For now, we ignore the Apple TV until we implement the authentication.
@@ -946,7 +932,7 @@ class AirplayProvider(PlayerProvider):
                 (
                     x
                     for x in self._players.values()
-                    if x.active_stream and x.active_stream.active_remote_id == active_remote
+                    if x.raop_stream and x.raop_stream.active_remote_id == active_remote
                 ),
                 None,
             )
@@ -1004,15 +990,15 @@ class AirplayProvider(PlayerProvider):
                     mass_player.volume_level = volume
             elif "device-prevent-playback=1" in path:
                 # device switched to another source (or is powered off)
-                if active_stream := airplay_player.active_stream:
+                if raop_stream := airplay_player.raop_stream:
                     # ignore this if we just started playing to prevent false positives
                     if mass_player.elapsed_time > 10 and mass_player.state == PlayerState.PLAYING:
-                        active_stream.prevent_playback = True
+                        raop_stream.prevent_playback = True
                         self.mass.create_task(self.monitor_prevent_playback(player_id))
             elif "device-prevent-playback=0" in path:
                 # device reports that its ready for playback again
-                if active_stream := airplay_player.active_stream:
-                    active_stream.prevent_playback = False
+                if raop_stream := airplay_player.raop_stream:
+                    raop_stream.prevent_playback = False
 
             # send response
             date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S")
@@ -1032,17 +1018,17 @@ class AirplayProvider(PlayerProvider):
         count = 0
         if not (airplay_player := self._players.get(player_id)):
             return
-        prev_active_remote_id = airplay_player.active_stream.active_remote_id
+        prev_active_remote_id = airplay_player.raop_stream.active_remote_id
         while count < 40:
             count += 1
             if not (airplay_player := self._players.get(player_id)):
                 return
-            if not (active_stream := airplay_player.active_stream):
+            if not (raop_stream := airplay_player.raop_stream):
                 return
-            if active_stream.active_remote_id != prev_active_remote_id:
+            if raop_stream.active_remote_id != prev_active_remote_id:
                 # checksum
                 return
-            if not active_stream.prevent_playback:
+            if not raop_stream.prevent_playback:
                 return
             await asyncio.sleep(0.5)