From: Marcel van der Veldt Date: Sun, 25 Feb 2024 18:59:57 +0000 (+0100) Subject: Some optimizations for Airplay streaming (#1107) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=6a30b858e1cf131acafaeb250b6beca15fa5954e;p=music-assistant-server.git Some optimizations for Airplay streaming (#1107) --- diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 8d46f959..03d898d4 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -137,7 +137,7 @@ class MultiClientStreamJob: self._audio_task.cancel() for sub_queue in self.subscribed_players.values(): with suppress(asyncio.QueueFull): - sub_queue.put_nowait(b"") + sub_queue.put_nowait(b"EOF") def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str: """Resolve the childplayer specific stream URL to this streamjob.""" @@ -167,7 +167,7 @@ class MultiClientStreamJob: and player_id not in self.workaround_players_seen ): self.workaround_players_seen.add(player_id) - yield b"" + yield b"EOF" return try: @@ -184,10 +184,10 @@ class MultiClientStreamJob: # so that chunks can be pushed self._all_clients_connected.set() - # keep reading audio chunks from the queue until we receive an empty one + # keep reading audio chunks from the queue until we receive an EOF chunk while True: chunk = await sub_queue.get() - if chunk == b"": + if chunk == b"EOF": # EOF chunk received break yield chunk @@ -246,8 +246,8 @@ class MultiClientStreamJob: await self._put_chunk(chunk) - # mark EOF with empty chunk - await self._put_chunk(b"") + # mark EOF with EOF chunk + await self._put_chunk(b"EOF") def parse_pcm_info(content_type: str) -> tuple[int, int, int]: @@ -905,7 +905,9 @@ class StreamsController(CoreController): queue.display_name, queue_track.streamdetails.seconds_streamed, ) - + # end of queue flow: make sure we yield the last_fadeout_part + if last_fadeout_part: + yield last_fadeout_part self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name) async def _get_player_ffmpeg_args( diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index ea6af71e..0b3882a1 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -63,8 +63,6 @@ class AsyncProcess: """Yield chunks of n size from the process stdout.""" while True: chunk = await self.readexactly(n) - if chunk == b"": - break yield chunk if len(chunk) < n: break diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index ae299cba..37ad643e 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -13,11 +13,11 @@ from dataclasses import dataclass from random import randint, randrange from typing import TYPE_CHECKING -from zeroconf import ServiceStateChange +from zeroconf import IPVersion, ServiceStateChange from zeroconf.asyncio import AsyncServiceInfo from music_assistant.common.helpers.datetime import utc -from music_assistant.common.helpers.util import get_ip_pton, select_free_port +from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port from music_assistant.common.models.config_entries import ( CONF_ENTRY_CROSSFADE, CONF_ENTRY_CROSSFADE_DURATION, @@ -48,8 +48,6 @@ if TYPE_CHECKING: DOMAIN = "airplay" -CONF_LATENCY = "latency" -DEFAULT_LATENCY = 2000 CONF_ENCRYPTION = "encryption" CONF_ALAC_ENCODE = "alac_encode" CONF_VOLUME_START = "volume_start" @@ -58,18 +56,6 @@ CONF_PASSWORD = "password" PLAYER_CONFIG_ENTRIES = ( CONF_ENTRY_CROSSFADE, CONF_ENTRY_CROSSFADE_DURATION, - ConfigEntry( - key=CONF_LATENCY, - type=ConfigEntryType.INTEGER, - range=(500, 4000), - default_value=DEFAULT_LATENCY, - label="Latency", - description="Sets the number of milliseconds of audio buffer in the player. " - "This is important to absorb network throughput jitter. \n" - "Increase this value if you notice network dropouts at the cost of a slower " - "response to commands.", - advanced=True, - ), ConfigEntry( key=CONF_ENCRYPTION, type=ConfigEntryType.BOOLEAN, @@ -172,6 +158,13 @@ def get_model_from_am(am_property: str | None) -> tuple[str, str]: return (manufacturer, model) +def get_primary_ip_address(discovery_info: AsyncServiceInfo) -> str | None: + """Get primary IP address from zeroconf discovery info.""" + return next( + (x for x in discovery_info.parsed_addresses(IPVersion.V4Only) if x != "127.0.0.1"), None + ) + + class AirplayStreamJob: """Object that holds the details of a stream job.""" @@ -184,13 +177,20 @@ class AirplayStreamJob: # with the named pipe used to send commands self.active_remote_id: str = str(randint(1000, 8000)) self.start_ntp: int | None = None # use as checksum + self._audio_buffer = asyncio.Queue(2) self._log_reader_task: asyncio.Task | None = None + self._audio_reader_task: asyncio.Task | None = None self._cliraop_proc: asyncio.subprocess.Process | None = None + self._stop_requested = False @property def running(self) -> bool: """Return bool if we're running.""" - return self._cliraop_proc and self._cliraop_proc.returncode is None + return ( + not self._stop_requested + and self._cliraop_proc + and self._cliraop_proc.returncode is None + ) async def init_cliraop(self, start_ntp: int) -> None: """Initialize CLIRaop process for a player.""" @@ -198,10 +198,6 @@ class AirplayStreamJob: extra_args = [] player_id = self.airplay_player.player_id mass_player = self.mass.players.get(player_id) - latency = self.mass.config.get_raw_player_config_value( - player_id, CONF_LATENCY, DEFAULT_LATENCY - ) - extra_args += ["-l", str(latency)] if self.mass.config.get_raw_player_config_value(player_id, CONF_ENCRYPTION, False): extra_args += ["-e"] if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True): @@ -223,7 +219,7 @@ class AirplayStreamJob: "-p", str(self.airplay_player.discovery_info.port), "-w", - str(2500 - sync_adjust), + str(2000 - sync_adjust), "-v", str(mass_player.volume_level), *extra_args, @@ -247,26 +243,22 @@ class AirplayStreamJob: close_fds=True, ) self._log_reader_task = asyncio.create_task(self._log_watcher()) + self._audio_reader_task = asyncio.create_task(self._audio_reader()) async def stop(self): """Stop playback and cleanup.""" if not self.running: return - # prefer interactive command to our streamer await self.send_cli_command("ACTION=STOP") - # use communicate to clear stdin/stdout and wait for exit - try: - await asyncio.wait_for(self._cliraop_proc.wait(), 5) - except TimeoutError: - self.airplay_player.logger.error( # noqa: TRY400 - "RAOP process did not stop on time, attempting forced close." - ) - self._cliraop_proc.kill() - await asyncio.wait_for(self._cliraop_proc.wait(), 5) - finally: - # stop background task - if self._log_reader_task and not self._log_reader_task.done(): - self._log_reader_task.cancel() + self._stop_requested = True + # stop background tasks + if self._log_reader_task and not self._log_reader_task.done(): + self._log_reader_task.cancel() + if self._audio_reader_task and not self._audio_reader_task.done(): + self._audio_reader_task.cancel() + + empty_queue(self._audio_buffer) + await asyncio.wait_for(self._cliraop_proc.communicate(), 30) async def send_cli_command(self, command: str) -> None: """Send an interactive command to the running CLIRaop binary.""" @@ -295,55 +287,82 @@ class AirplayStreamJob: if not line: continue if "elapsed milliseconds:" in line: + # do not log this line, its too verbose millis = int(line.split("elapsed milliseconds: ")[1]) mass_player.elapsed_time = millis / 1000 mass_player.elapsed_time_last_updated = time.time() - continue # do not log this line, its too verbose + continue if "set pause" in line or "Pause at" in line: + logger.info("raop streaming paused") mass_player.state = PlayerState.PAUSED self.mass.players.update(airplay_player.player_id) - elif "Restarted at" in line or "restarting w/ pause" in line: + continue + if "Restarted at" in line or "restarting w/ pause" in line: + logger.info("raop streaming restarted after pause") mass_player.state = PlayerState.PLAYING self.mass.players.update(airplay_player.player_id) - elif "Stopped at" in line: + continue + if "Stopped at" in line: + logger.info("raop streaming stopped") mass_player.state = PlayerState.IDLE self.mass.players.update(airplay_player.player_id) - elif "restarting w/o pause" in line: + continue + if "restarting w/o pause" in line: # streaming has started + logger.info("raop streaming started") mass_player.state = PlayerState.PLAYING mass_player.elapsed_time = 0 mass_player.elapsed_time_last_updated = time.time() self.mass.players.update(airplay_player.player_id) + continue + if "lost packet out of backlog" in line: + logger.warning(line) + continue + # debug log everything else logger.debug(line) # if we reach this point, the process exited - airplay_player.logger.debug("Log watcher task finished...") - mass_player.state = PlayerState.IDLE - self.mass.players.update(airplay_player.player_id) logger.debug( "CLIRaop process stopped with errorcode %s", self._cliraop_proc.returncode, ) + if ( + airplay_player.active_stream + and airplay_player.active_stream.active_remote_id == self.active_remote_id + ): + mass_player.state = PlayerState.IDLE + self.mass.players.update(airplay_player.player_id) + + async def _audio_reader(self) -> None: + """Read audio chunks from buffer and send them to the cliraop process.""" + logger = self.airplay_player.logger + logger.debug("Audio reader started") + while self.running: + chunk = await self._audio_buffer.get() + if chunk == b"EOF": + # EOF chunk + break + self._cliraop_proc.stdin.write(chunk) + with suppress(BrokenPipeError): + await self._cliraop_proc.stdin.drain() + # send EOF + if self._cliraop_proc.returncode is None and not self._cliraop_proc.stdin.is_closing(): + self._cliraop_proc.stdin.write_eof() + with suppress(BrokenPipeError): + await self._cliraop_proc.stdin.drain() + logger.debug("Audio reader finished") async def write_chunk(self, data: bytes) -> None: - """Write a chunk of (pcm) data to the stdin of CLIRaop.""" - if not self.running or not self._cliraop_proc.stdin.can_write_eof(): - return - self._cliraop_proc.stdin.write(data) - if not self.running or not self._cliraop_proc.stdin.can_write_eof(): + """Write a chunk of (pcm) data to the audio buffer.""" + if not self.running: return - with suppress(BrokenPipeError): - await self._cliraop_proc.stdin.drain() + await self._audio_buffer.put(data) - async def write_eof(self, data: bytes) -> None: - """Write a chunk of (pcm) data to the stdin of CLIRaop.""" - if not self.running or not self._cliraop_proc.stdin.can_write_eof(): - return - self._cliraop_proc.stdin.write_eof() - if not self.running or not self._cliraop_proc.stdin.can_write_eof(): + async def write_eof(self) -> None: + """Write end-of-file chunk to the audo buffer.""" + if not self.running: return - with suppress(BrokenPipeError): - await self._cliraop_proc.stdin.drain() + await self._audio_buffer.put(b"EOF") @dataclass @@ -352,6 +371,7 @@ class AirPlayPlayer: player_id: str discovery_info: AsyncServiceInfo + address: str logger: logging.Logger active_stream: AirplayStreamJob | None = None @@ -418,11 +438,11 @@ class AirplayProvider(PlayerProvider): # handle update for existing device if airplay_player := self._players.get(player_id): if mass_player := self.mass.players.get(player_id): - cur_address = info.parsed_addresses()[0] - prev_address = airplay_player.discovery_info.parsed_addresses()[0] - if cur_address != prev_address: + cur_address = get_primary_ip_address(info) + if cur_address and cur_address != airplay_player.address: + airplay_player.address = cur_address airplay_player.logger.info( - "Address updated from %s to %s", prev_address, cur_address + "Address updated from %s to %s", airplay_player.address, cur_address ) mass_player.device_info = DeviceInfo( model=mass_player.device_info.model, @@ -463,6 +483,8 @@ class AirplayProvider(PlayerProvider): - player_id: player_id of the player to handle the command. """ + if existing_stream := self._stream_tasks.get(player_id): + existing_stream.cancel() async def stop_player(airplay_player: AirPlayPlayer) -> None: if airplay_player.active_stream: @@ -522,7 +544,11 @@ class AirplayProvider(PlayerProvider): self._resync_handle.cancel() self._resync_handle = None # always stop existing stream first - await self.cmd_stop(player_id) + if existing_stream := self._stream_tasks.get(player_id): + existing_stream.cancel() + for airplay_player in self._get_sync_clients(player_id): + if airplay_player.active_stream and airplay_player.active_stream.running: + self.mass.create_task(airplay_player.active_stream.stop()) # start streaming the queue (pcm) audio in a background task queue = self.mass.player_queues.get_active_queue(player_id) self._stream_tasks[player_id] = asyncio.create_task( @@ -554,7 +580,12 @@ class AirplayProvider(PlayerProvider): self._resync_handle.cancel() self._resync_handle = None # always stop existing stream first - await self.cmd_stop(player_id) + if existing_stream := self._stream_tasks.get(player_id): + existing_stream.cancel() + async with asyncio.TaskGroup() as tg: + for airplay_player in self._get_sync_clients(player_id): + if airplay_player.active_stream and airplay_player.active_stream.running: + tg.create_task(airplay_player.active_stream.stop()) if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 44100: # TODO: resample on the fly here ? raise RuntimeError("Unsupported PCM format") @@ -605,32 +636,23 @@ class AirplayProvider(PlayerProvider): async for pcm_chunk in audio_iterator: # send audio chunk to player(s) available_clients = 0 - async with asyncio.TaskGroup() as tg: - for airplay_player in self._get_sync_clients(player_id): - if ( - not airplay_player.active_stream - or not airplay_player.active_stream.running - or airplay_player.active_stream.start_ntp != start_ntp - ): - # catch when this stream is no longer active on the player - continue - available_clients += 1 - tg.create_task(airplay_player.active_stream.write_chunk(pcm_chunk)) - # send the progress report every 5 seconds - now = time.time() - if now - prev_progress_report >= 5: - prev_progress_report = now - tg.create_task( - airplay_player.active_stream.send_cli_command( - f"PROGRESS={int(queue.elapsed_time)}\n" - ) - ) + for airplay_player in self._get_sync_clients(player_id): + if ( + not airplay_player.active_stream + or not airplay_player.active_stream.running + or airplay_player.active_stream.start_ntp != start_ntp + ): + # catch when this stream is no longer active on the player + continue + available_clients += 1 + await airplay_player.active_stream.write_chunk(pcm_chunk) if not available_clients: # this streamjob is no longer active return # send metadata to player(s) if needed # NOTE: this must all be done in separate tasks to not disturb audio + now = time.time() if queue and queue.current_item and queue.current_item.streamdetails: metadata_checksum = ( queue.current_item.streamdetails.stream_title @@ -638,9 +660,19 @@ class AirplayProvider(PlayerProvider): ) if prev_metadata_checksum != metadata_checksum: prev_metadata_checksum = metadata_checksum + prev_progress_report = now self.mass.create_task(self._send_metadata(player_id, queue)) + # send the progress report every 5 seconds + elif now - prev_progress_report >= 5: + prev_progress_report = now + self.mass.create_task(self._send_progress(player_id, queue)) # end of stream reached - write eof + self.logger.debug( + "Finished RAOP stream for Queue %s to %s", + queue.display_name, + "/".join(synced_player_ids), + ) for airplay_player in self._get_sync_clients(player_id): if ( not airplay_player.active_stream @@ -722,6 +754,9 @@ class AirplayProvider(PlayerProvider): group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True) group_leader.group_childs.remove(player_id) player.synced_to = None + # guard if this was the last sync child of the group player + if group_leader.group_childs == {group_leader.player_id}: + group_leader.group_childs.remove(group_leader.player_id) await self.cmd_stop(player_id) self.mass.players.update(player_id) @@ -774,17 +809,17 @@ class AirplayProvider(PlayerProvider): self, player_id: str, display_name: str, info: AsyncServiceInfo ) -> None: """Handle setup of a new player that is discovered using mdns.""" - address = info.parsed_addresses()[0] - # some guards if our info is valid/complete - if address == "127.0.0.1": + address = get_primary_ip_address(info) + if address is None: return + # some guards if our info is valid/complete if "md" not in info.decoded_properties: return if "et" not in info.decoded_properties: return self.logger.debug("Discovered Airplay device %s on %s", display_name, address) self._players[player_id] = AirPlayPlayer( - player_id, discovery_info=info, logger=self.logger.getChild(player_id) + player_id, discovery_info=info, address=address, logger=self.logger.getChild(player_id) ) manufacturer, model = get_model_from_am(info.decoded_properties.get("am")) if "apple tv" in model.lower(): @@ -870,16 +905,17 @@ class AirplayProvider(PlayerProvider): player_id = airplay_player.player_id mass_player = self.mass.players.get(player_id) + active_queue = self.mass.player_queues.get_active_queue(player_id) if path == "/ctrl-int/1/nextitem": - self.mass.create_task(self.mass.player_queues.next(player_id)) + self.mass.create_task(self.mass.player_queues.next(active_queue.queue_id)) elif path == "/ctrl-int/1/previtem": - self.mass.create_task(self.mass.player_queues.previous(player_id)) + self.mass.create_task(self.mass.player_queues.previous(active_queue.queue_id)) elif path == "/ctrl-int/1/play": - self.mass.create_task(self.mass.player_queues.play(player_id)) + self.mass.create_task(self.mass.player_queues.play(active_queue.queue_id)) elif path == "/ctrl-int/1/playpause": - self.mass.create_task(self.mass.player_queues.play_pause(player_id)) + self.mass.create_task(self.mass.player_queues.play_pause(active_queue.queue_id)) elif path == "/ctrl-int/1/stop": - self.mass.create_task(self.cmd_stop(player_id)) + self.mass.create_task(self.mass.player_queues.stop(active_queue.queue_id)) elif path == "/ctrl-int/1/volumeup": self.mass.create_task(self.mass.players.cmd_volume_up(player_id)) elif path == "/ctrl-int/1/volumedown": @@ -887,10 +923,12 @@ class AirplayProvider(PlayerProvider): elif path == "/ctrl-int/1/shuffle_songs": queue = self.mass.player_queues.get(player_id) self.mass.create_task( - self.mass.player_queues.set_shuffle(player_id, not queue.shuffle_enabled) + self.mass.player_queues.set_shuffle( + active_queue.queue_id, not queue.shuffle_enabled + ) ) elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"): - self.mass.create_task(self.mass.player_queues.pause(player_id)) + self.mass.create_task(self.mass.player_queues.pause(active_queue.queue_id)) elif "dmcp.device-volume=" in path: raop_volume = float(path.split("dmcp.device-volume=", 1)[-1]) volume = convert_airplay_volume(raop_volume) @@ -944,10 +982,10 @@ class AirplayProvider(PlayerProvider): album = _album.name cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n" - cmd += f"DURATION={duration}\nACTION=SENDMETA\n" + cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n" for airplay_player in self._get_sync_clients(player_id): - if not airplay_player.active_stream: + if not airplay_player.active_stream or not airplay_player.active_stream.running: continue await airplay_player.active_stream.send_cli_command(cmd) @@ -960,6 +998,16 @@ class AirplayProvider(PlayerProvider): queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg" ) for airplay_player in self._get_sync_clients(player_id): - if not airplay_player.active_stream: + if not airplay_player.active_stream or not airplay_player.active_stream.running: continue await airplay_player.active_stream.send_cli_command(f"ARTWORK={image_url}\n") + + async def _send_progress(self, player_id: str, queue: PlayerQueue) -> None: + """Send progress report to player (and connected sync childs).""" + if not queue or not queue.current_item: + return + progress = int(queue.corrected_elapsed_time) + for airplay_player in self._get_sync_clients(player_id): + if not airplay_player.active_stream or not airplay_player.active_stream.running: + continue + await airplay_player.active_stream.send_cli_command(f"PROGRESS={progress}\n") diff --git a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 index 09d26819..bcfec2c5 100755 Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 differ diff --git a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 index 0ed63539..3face4f8 100755 Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 differ diff --git a/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 index 20689e63..9cc60b1b 100755 Binary files a/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 differ diff --git a/music_assistant/server/providers/opensubsonic/sonic_provider.py b/music_assistant/server/providers/opensubsonic/sonic_provider.py index bc77f4ca..5c432e7f 100644 --- a/music_assistant/server/providers/opensubsonic/sonic_provider.py +++ b/music_assistant/server/providers/opensubsonic/sonic_provider.py @@ -683,7 +683,7 @@ class OpenSonicProvider(MusicProvider): audio_buffer.put(chunk), self.mass.loop ).result() # send empty chunk when we're done - asyncio.run_coroutine_threadsafe(audio_buffer.put(b""), self.mass.loop).result() + asyncio.run_coroutine_threadsafe(audio_buffer.put(b"EOF"), self.mass.loop).result() # fire up an executor thread to put the audio chunks (threadsafe) on the audio buffer streamer_task = self.mass.loop.run_in_executor(None, _streamer) @@ -691,7 +691,7 @@ class OpenSonicProvider(MusicProvider): while True: # keep reading from the audio buffer until there is no more data chunk = await audio_buffer.get() - if chunk == b"": + if chunk == b"EOF": break yield chunk finally: