From 5efa70b7d0886669993fdf12ca67caea233464df Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Mon, 12 Aug 2024 00:35:52 +0200 Subject: [PATCH] Fix AirPlay playback gets mangled up between different streams (#1555) --- music_assistant/common/models/player_queue.py | 2 - .../server/controllers/player_queues.py | 7 +- music_assistant/server/controllers/players.py | 29 +-- music_assistant/server/controllers/streams.py | 6 + music_assistant/server/helpers/util.py | 4 +- .../server/providers/airplay/__init__.py | 170 ++++++++---------- 6 files changed, 111 insertions(+), 107 deletions(-) diff --git a/music_assistant/common/models/player_queue.py b/music_assistant/common/models/player_queue.py index 87e37e8e..8f16281a 100644 --- a/music_assistant/common/models/player_queue.py +++ b/music_assistant/common/models/player_queue.py @@ -53,7 +53,6 @@ class PlayerQueue(DataClassDictMixin): d.pop("current_item", None) d.pop("next_item", None) d.pop("index_in_buffer", None) - d.pop("announcement_in_progress", None) d.pop("flow_mode", None) d.pop("flow_mode_start_index", None) return d @@ -64,7 +63,6 @@ class PlayerQueue(DataClassDictMixin): d.pop("current_item", None) d.pop("next_item", None) d.pop("index_in_buffer", None) - d.pop("announcement_in_progress", None) d.pop("flow_mode", None) d.pop("flow_mode_start_index", None) return cls.from_dict(d) diff --git a/music_assistant/server/controllers/player_queues.py b/music_assistant/server/controllers/player_queues.py index e4643910..d3a6e6bd 100644 --- a/music_assistant/server/controllers/player_queues.py +++ b/music_assistant/server/controllers/player_queues.py @@ -743,10 +743,15 @@ class PlayerQueuesController(CoreController): self.mass, queue_item, seek_position=seek_position, fade_in=fade_in ) # send play_media request to player - await self.mass.players.play_media( + # NOTE that we debounce this a bit to account for someone hitting the next button + # like a madman. This will prevent the player from being overloaded with requests. + self.mass.call_later( + 0.25, + self.mass.players.play_media, player_id=queue_id, # transform into PlayerMedia to send to the actual player implementation media=self.player_media_from_queue_item(queue_item, queue.flow_mode), + task_id=f"play_media_{queue_id}", ) # Interaction with player diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index c9beeafa..6e059754 100644 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -106,6 +106,7 @@ class PlayerController(CoreController): ) self.manifest.icon = "speaker-multiple" self._poll_task: asyncio.Task | None = None + self._player_locks: dict[str, asyncio.Lock] = {} async def setup(self, config: CoreConfig) -> None: """Async initialize of module.""" @@ -199,7 +200,8 @@ class PlayerController(CoreController): await self.mass.player_queues.play(player_id) return player_provider = self.get_player_provider(player_id) - await player_provider.cmd_play(player_id) + async with self._player_locks[player_id]: + await player_provider.cmd_play(player_id) @api_command("players/cmd/pause") @handle_player_command @@ -242,7 +244,6 @@ class PlayerController(CoreController): self.mass.create_task(_watch_pause(player_id)) @api_command("players/cmd/play_pause") - @handle_player_command async def cmd_play_pause(self, player_id: str) -> None: """Toggle play/pause on given player. @@ -292,7 +293,8 @@ class PlayerController(CoreController): if PlayerFeature.POWER in player.supported_features: # forward to player provider player_provider = self.get_player_provider(player_id) - await player_provider.cmd_power(player_id, powered) + async with self._player_locks[player_id]: + await player_provider.cmd_power(player_id, powered) else: # allow the stop command to process and prevent race conditions await asyncio.sleep(0.2) @@ -332,7 +334,8 @@ class PlayerController(CoreController): msg = f"Player {player.display_name} does not support volume_set" raise UnsupportedFeaturedException(msg) player_provider = self.get_player_provider(player_id) - await player_provider.cmd_volume_set(player_id, volume_level) + async with self._player_locks[player_id]: + await player_provider.cmd_volume_set(player_id, volume_level) @api_command("players/cmd/volume_up") @handle_player_command @@ -438,7 +441,8 @@ class PlayerController(CoreController): msg = f"Player {player.display_name} does not support muting" raise UnsupportedFeaturedException(msg) player_provider = self.get_player_provider(player_id) - await player_provider.cmd_volume_mute(player_id, muted) + async with self._player_locks[player_id]: + await player_provider.cmd_volume_mute(player_id, muted) @api_command("players/cmd/seek") async def cmd_seek(self, player_id: str, position: int) -> None: @@ -466,8 +470,8 @@ class PlayerController(CoreController): ) -> None: """Handle playback of an announcement (url) on given player.""" player = self.get(player_id, True) - if player.announcement_in_progress: - return + while player.announcement_in_progress: + await asyncio.sleep(0.5) if not url.startswith("http"): raise PlayerCommandFailed("Only URLs are supported for announcements") try: @@ -576,7 +580,8 @@ class PlayerController(CoreController): ) return player_prov = self.mass.players.get_player_provider(player_id) - await player_prov.enqueue_next_media(player_id=player_id, media=media) + async with self._player_locks[player_id]: + await player_prov.enqueue_next_media(player_id=player_id, media=media) @api_command("players/cmd/sync") @handle_player_command @@ -628,7 +633,7 @@ class PlayerController(CoreController): continue if child_player.synced_to and child_player.synced_to == target_player: continue # already synced to this target - if child_player.synced_to and child_player.synced_to != target_player: + elif child_player.synced_to: # player already synced to another player, unsync first self.logger.warning( "Player %s is already synced, unsyncing first", child_player.name @@ -649,7 +654,8 @@ class PlayerController(CoreController): # forward command to the player provider after all (base) sanity checks player_provider = self.get_player_provider(target_player) - await player_provider.cmd_sync_many(target_player, child_player_ids) + async with self._player_locks[target_player]: + await player_provider.cmd_sync_many(target_player, child_player_ids) @api_command("players/cmd/unsync_many") async def cmd_unsync_many(self, player_ids: list[str]) -> None: @@ -735,6 +741,9 @@ class PlayerController(CoreController): # register playerqueue for this player self.mass.create_task(self.mass.player_queues.on_player_register(player)) + # register lock for this player + self._player_locks[player_id] = asyncio.Lock() + self._players[player_id] = player # ignore disabled players diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index d84900fa..44dfd126 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -498,6 +498,9 @@ class StreamsController(CoreController): use_crossfade = await self.mass.config.get_player_config_value( queue.queue_id, CONF_CROSSFADE ) + if not start_queue_item: + # this can happen in some (edge case) race conditions + return if start_queue_item.media_type != MediaType.TRACK: use_crossfade = False pcm_sample_size = int( @@ -775,6 +778,9 @@ class StreamsController(CoreController): # del chunk finished = True finally: + if "ffmpeg_proc" not in locals(): + # edge case: ffmpeg process was not yet started + return # noqa: B012 if finished and not ffmpeg_proc.closed: await asyncio.wait_for(ffmpeg_proc.wait(), 60) elif not ffmpeg_proc.closed: diff --git a/music_assistant/server/helpers/util.py b/music_assistant/server/helpers/util.py index b5afcc26..b1be4202 100644 --- a/music_assistant/server/helpers/util.py +++ b/music_assistant/server/helpers/util.py @@ -149,9 +149,9 @@ class TaskManager: self.mass = mass self._tasks: list[asyncio.Task] = [] - def create_task(self, coro: Coroutine) -> None: + def create_task(self, coro: Coroutine, eager_start: bool = False) -> None: """Create a new task and add it to the manager.""" - task = self.mass.create_task(coro) + task = self.mass.create_task(coro, eager_start=eager_start) self._tasks.append(task) async def __aenter__(self) -> Self: diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index db046689..b1af1eb2 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -9,7 +9,6 @@ import platform import socket import time from contextlib import suppress -from dataclasses import dataclass from random import randint, randrange from typing import TYPE_CHECKING @@ -17,7 +16,7 @@ from zeroconf import IPVersion, ServiceStateChange from zeroconf.asyncio import AsyncServiceInfo from music_assistant.common.helpers.datetime import utc -from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port +from music_assistant.common.helpers.util import get_ip_pton, select_free_port from music_assistant.common.models.config_entries import ( CONF_ENTRY_CROSSFADE, CONF_ENTRY_CROSSFADE_DURATION, @@ -29,6 +28,7 @@ from music_assistant.common.models.config_entries import ( CONF_ENTRY_SYNC_ADJUST, ConfigEntry, ConfigValueType, + ProviderConfig, create_sample_rates_config_entry, ) from music_assistant.common.models.enums import ( @@ -43,7 +43,9 @@ from music_assistant.common.models.enums import ( from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia from music_assistant.common.models.player_queue import PlayerQueue +from music_assistant.common.models.provider import ProviderManifest from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL +from music_assistant.server import MusicAssistant from music_assistant.server.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params from music_assistant.server.helpers.process import AsyncProcess, check_output from music_assistant.server.helpers.util import TaskManager @@ -202,13 +204,13 @@ def get_primary_ip_address(discovery_info: AsyncServiceInfo) -> str | None: return None -class AirplayStream: - """Object that holds the details of a stream job.""" +class RaopStream: + """Object that holds the details of a (RAOP) stream job.""" def __init__( self, prov: AirplayProvider, airplay_player: AirPlayPlayer, input_format: AudioFormat ) -> None: - """Initialize AirplayStream.""" + """Initialize RaopStream.""" self.prov = prov self.mass = prov.mass self.airplay_player = airplay_player @@ -299,7 +301,7 @@ class AirplayStream: await self._cliraop_proc.start() await asyncio.to_thread(os.close, read) self._started.set() - self._log_reader_task = asyncio.create_task(self._log_watcher()) + self._log_reader_task = self.mass.create_task(self._log_watcher()) async def stop(self): """Stop playback and cleanup.""" @@ -310,20 +312,12 @@ class AirplayStream: if self._cliraop_proc.proc and not self._cliraop_proc.closed: await self.send_cli_command("ACTION=STOP") self._stopped = True # set after send_cli command! - if self._cliraop_proc.proc: - try: - await asyncio.wait_for(self._cliraop_proc.wait(), 5) - except TimeoutError: - self.prov.logger.warning( - "Raop process for %s did not stop in time, is the player offline?", - self.airplay_player.player_id, - ) - await self._cliraop_proc.close(True) - - # ffmpeg can sometimes hang due to the connected pipes - # we handle closing it but it can be a bit slow so do that in the background - if not self._ffmpeg_proc.closed: - self.mass.create_task(self._ffmpeg_proc.close(True)) + if self._cliraop_proc.proc and not self._cliraop_proc.closed: + await self._cliraop_proc.close(True) + if self._ffmpeg_proc and not self._ffmpeg_proc.closed: + await self._ffmpeg_proc.close(True) + self._cliraop_proc = None + self._ffmpeg_proc = None async def write_chunk(self, chunk: bytes) -> None: """Write a (pcm) audio chunk.""" @@ -354,7 +348,7 @@ class AirplayStream: named_pipe = f"/tmp/raop-{self.active_remote_id}" # noqa: S108 self.airplay_player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command) - await self.mass.create_task(send_data) + await asyncio.to_thread(send_data) async def _log_watcher(self) -> None: """Monitor stderr for the running CLIRaop process.""" @@ -415,7 +409,7 @@ class AirplayStream: logger.log(VERBOSE_LOG_LEVEL, line) # if we reach this point, the process exited - if airplay_player.active_stream == self: + if airplay_player.raop_stream == self: mass_player.state = PlayerState.IDLE self.mass.players.update(airplay_player.player_id) # ensure we're cleaned up afterwards (this also logs the returncode) @@ -467,15 +461,39 @@ class AirplayStream: await self.send_cli_command(f"PROGRESS={progress}\n") -@dataclass class AirPlayPlayer: """Holds the details of the (discovered) Airplay (RAOP) player.""" - player_id: str - discovery_info: AsyncServiceInfo - address: str - logger: logging.Logger - active_stream: AirplayStream | None = None + def __init__( + self, prov: AirplayProvider, player_id: str, discovery_info: AsyncServiceInfo, address: str + ) -> None: + """Initialize AirPlayPlayer.""" + self.prov = prov + self.mass = prov.mass + self.player_id = player_id + self.discovery_info = discovery_info + self.address = address + self.logger = prov.logger.getChild(player_id) + self.raop_stream: RaopStream | None = None + + async def cmd_stop(self, update_state: bool = True) -> None: + """Send STOP command to player.""" + if self.raop_stream: + await self.raop_stream.stop() + if update_state and (mass_player := self.mass.players.get(self.player_id)): + mass_player.state = PlayerState.IDLE + self.mass.players.update(mass_player.player_id) + + async def cmd_play(self) -> None: + """Send PLAY (unpause) command to player.""" + if self.raop_stream and self.raop_stream.running: + await self.raop_stream.send_cli_command("ACTION=PLAY") + + async def cmd_pause(self) -> None: + """Send PAUSE command to player.""" + if not self.raop_stream or not self.raop_stream.running: + return + await self.raop_stream.send_cli_command("ACTION=PAUSE") class AirplayProvider(PlayerProvider): @@ -486,6 +504,7 @@ class AirplayProvider(PlayerProvider): _discovery_running: bool = False _dacp_server: asyncio.Server = None _dacp_info: AsyncServiceInfo = None + _play_media_lock: asyncio.Lock = asyncio.Lock() @property def supported_features(self) -> tuple[ProviderFeature, ...]: @@ -593,11 +612,7 @@ class AirplayProvider(PlayerProvider): # forward command to player and any connected sync members async with TaskManager(self.mass) as tg: for airplay_player in self._get_sync_clients(player_id): - if airplay_player.active_stream: - tg.create_task(airplay_player.active_stream.stop()) - if mass_player := self.mass.players.get(airplay_player.player_id): - mass_player.state = PlayerState.IDLE - self.mass.players.update(mass_player.player_id) + tg.create_task(airplay_player.cmd_stop()) async def cmd_play(self, player_id: str) -> None: """Send PLAY (unpause) command to given player. @@ -607,9 +622,7 @@ class AirplayProvider(PlayerProvider): # forward command to player and any connected sync members async with TaskManager(self.mass) as tg: for airplay_player in self._get_sync_clients(player_id): - if airplay_player.active_stream and airplay_player.active_stream.running: - # prefer interactive command to our streamer - tg.create_task(airplay_player.active_stream.send_cli_command("ACTION=PLAY")) + tg.create_task(airplay_player.cmd_play()) async def cmd_pause(self, player_id: str) -> None: """Send PAUSE command to given player. @@ -625,9 +638,7 @@ class AirplayProvider(PlayerProvider): await self.cmd_stop(player_id) return airplay_player = self._players[player_id] - if airplay_player.active_stream and airplay_player.active_stream.running: - # prefer interactive command to our streamer - await airplay_player.active_stream.send_cli_command("ACTION=PAUSE") + await airplay_player.cmd_pause() async def play_media( self, @@ -635,6 +646,7 @@ class AirplayProvider(PlayerProvider): media: PlayerMedia, ) -> None: """Handle PLAY MEDIA on given player.""" + await self._play_media_lock.acquire() player = self.mass.players.get(player_id) if player.synced_to: # should not happen, but just in case @@ -642,8 +654,7 @@ class AirplayProvider(PlayerProvider): # always stop existing stream first async with TaskManager(self.mass) as tg: for airplay_player in self._get_sync_clients(player_id): - if active_stream := airplay_player.active_stream: - tg.create_task(active_stream.stop()) + tg.create_task(airplay_player.cmd_stop(update_state=False)) # select audio source if media.media_type == MediaType.ANNOUNCEMENT: # special case: stream announcement @@ -692,57 +703,35 @@ class AirplayProvider(PlayerProvider): # timestamped playback, which reads pcm audio from stdin # and we can send some interactive commands using a named pipe. - # setup AirplayStream for player and its sync childs + # setup RaopStream for player and its sync childs sync_clients = self._get_sync_clients(player_id) for airplay_player in sync_clients: - airplay_player.active_stream = AirplayStream( - self, airplay_player, input_format=input_format - ) - - # use a buffer here to consume small hiccups as the - # raop streaming is pretty much realtime and without a buffer to stdin - buffer: asyncio.Queue[bytes] = asyncio.Queue(10) - - async def fill_buffer() -> None: - async for chunk in audio_source: - await buffer.put(chunk) - await buffer.put(b"EOF") - - fill_buffer_task = asyncio.create_task(fill_buffer()) + airplay_player.raop_stream = RaopStream(self, airplay_player, input_format=input_format) async def audio_streamer() -> None: - try: - while True: - chunk = await buffer.get() - if chunk == b"EOF": - break - await asyncio.gather( - *[x.active_stream.write_chunk(chunk) for x in sync_clients], - return_exceptions=True, - ) - - # entire stream consumed: send EOF + async for chunk in audio_source: await asyncio.gather( - *[x.active_stream.write_eof() for x in sync_clients], + *[x.raop_stream.write_chunk(chunk) for x in sync_clients], return_exceptions=True, ) - - finally: - if not fill_buffer_task.done(): - fill_buffer_task.cancel() - empty_queue(buffer) + # entire stream consumed: send EOF + await asyncio.gather( + *[x.raop_stream.write_eof() for x in sync_clients], + return_exceptions=True, + ) # get current ntp and start cliraop _, stdout = await check_output(self.cliraop_bin, "-ntp") start_ntp = int(stdout.strip()) wait_start = 1250 + (250 * len(sync_clients)) await asyncio.gather( - *[x.active_stream.start(start_ntp, wait_start) for x in sync_clients], + *[x.raop_stream.start(start_ntp, wait_start) for x in sync_clients], return_exceptions=True, ) - self._players[player_id].active_stream.audio_source_task = asyncio.create_task( + self._players[player_id].raop_stream.audio_source_task = asyncio.create_task( audio_streamer() ) + self._play_media_lock.release() async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player. @@ -751,8 +740,8 @@ class AirplayProvider(PlayerProvider): - volume_level: volume level (0..100) to set on the player. """ airplay_player = self._players[player_id] - if airplay_player.active_stream: - await airplay_player.active_stream.send_cli_command(f"VOLUME={volume_level}\n") + if airplay_player.raop_stream and airplay_player.raop_stream.running: + await airplay_player.raop_stream.send_cli_command(f"VOLUME={volume_level}\n") mass_player = self.mass.players.get(player_id) mass_player.volume_level = volume_level self.mass.players.update(player_id) @@ -767,6 +756,8 @@ class AirplayProvider(PlayerProvider): - player_id: player_id of the player to handle the command. - target_player: player_id of the syncgroup master or group player. """ + if player_id == target_player: + return child_player = self.mass.players.get(player_id) assert child_player # guard parent_player = self.mass.players.get(target_player) @@ -813,9 +804,6 @@ class AirplayProvider(PlayerProvider): group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True) group_leader.group_childs.remove(player_id) player.synced_to = None - # guard if this was the last sync child of the group player - if group_leader.group_childs == {group_leader.player_id}: - group_leader.group_childs.remove(group_leader.player_id) await self.cmd_stop(player_id) self.mass.players.update(player_id) @@ -869,9 +857,7 @@ class AirplayProvider(PlayerProvider): if address is None: return self.logger.debug("Discovered Airplay device %s on %s", display_name, address) - self._players[player_id] = AirPlayPlayer( - player_id, discovery_info=info, address=address, logger=self.logger.getChild(player_id) - ) + self._players[player_id] = AirPlayPlayer(self, player_id, info, address) manufacturer, model = get_model_from_am(info.decoded_properties.get("am")) if "apple tv" in model.lower(): # For now, we ignore the Apple TV until we implement the authentication. @@ -946,7 +932,7 @@ class AirplayProvider(PlayerProvider): ( x for x in self._players.values() - if x.active_stream and x.active_stream.active_remote_id == active_remote + if x.raop_stream and x.raop_stream.active_remote_id == active_remote ), None, ) @@ -1004,15 +990,15 @@ class AirplayProvider(PlayerProvider): mass_player.volume_level = volume elif "device-prevent-playback=1" in path: # device switched to another source (or is powered off) - if active_stream := airplay_player.active_stream: + if raop_stream := airplay_player.raop_stream: # ignore this if we just started playing to prevent false positives if mass_player.elapsed_time > 10 and mass_player.state == PlayerState.PLAYING: - active_stream.prevent_playback = True + raop_stream.prevent_playback = True self.mass.create_task(self.monitor_prevent_playback(player_id)) elif "device-prevent-playback=0" in path: # device reports that its ready for playback again - if active_stream := airplay_player.active_stream: - active_stream.prevent_playback = False + if raop_stream := airplay_player.raop_stream: + raop_stream.prevent_playback = False # send response date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S") @@ -1032,17 +1018,17 @@ class AirplayProvider(PlayerProvider): count = 0 if not (airplay_player := self._players.get(player_id)): return - prev_active_remote_id = airplay_player.active_stream.active_remote_id + prev_active_remote_id = airplay_player.raop_stream.active_remote_id while count < 40: count += 1 if not (airplay_player := self._players.get(player_id)): return - if not (active_stream := airplay_player.active_stream): + if not (raop_stream := airplay_player.raop_stream): return - if active_stream.active_remote_id != prev_active_remote_id: + if raop_stream.active_remote_id != prev_active_remote_id: # checksum return - if not active_stream.prevent_playback: + if not raop_stream.prevent_playback: return await asyncio.sleep(0.5) -- 2.34.1