From: Marcel van der Veldt Date: Sat, 1 Nov 2025 13:43:08 +0000 (+0100) Subject: Streaming fixes for AirPlay provider X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=32e8d18421246de24ce1004166d75d2f10d9cd68;p=music-assistant-server.git Streaming fixes for AirPlay provider --- diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index 67ecff09..fc3d81db 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -15,7 +15,7 @@ import os import urllib.parse from collections.abc import AsyncGenerator from dataclasses import dataclass -from typing import TYPE_CHECKING, Final +from typing import TYPE_CHECKING, Final, cast from aiofiles.os import wrap from aiohttp import web @@ -66,7 +66,7 @@ from music_assistant.helpers.audio import ( get_stream_details, resample_pcm_audio, ) -from music_assistant.helpers.buffered_generator import use_audio_buffer +from music_assistant.helpers.buffered_generator import buffered_audio, use_audio_buffer from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream from music_assistant.helpers.smart_fades import ( @@ -79,9 +79,12 @@ from music_assistant.helpers.webserver import Webserver from music_assistant.models.core_controller import CoreController from music_assistant.models.music_provider import MusicProvider from music_assistant.models.plugin import PluginProvider, PluginSource +from music_assistant.providers.universal_group.constants import UGP_PREFIX +from music_assistant.providers.universal_group.player import UniversalGroupPlayer if TYPE_CHECKING: from music_assistant_models.config_entries import CoreConfig + from music_assistant_models.player import PlayerMedia from music_assistant_models.player_queue import PlayerQueue from music_assistant_models.queue_item import QueueItem from music_assistant_models.streamdetails import StreamDetails @@ -490,7 +493,7 @@ class StreamsController(CoreController): if queue_item.media_type == MediaType.RADIO: # keep very short buffer for radio streams # to keep them (more or less) realtime and prevent time outs - read_rate_input_args = ["-readrate", "1.0", "-readrate_initial_burst", "3"] + read_rate_input_args = ["-readrate", "1.00", "-readrate_initial_burst", "1"] elif "Network_Module" in user_agent or "transferMode.dlna.org" in request.headers: # and ofcourse we have an exception of the exception. Where most players actually NEED # the readrate filter to avoid disconnecting, some other players (DLNA/MusicCast) @@ -828,6 +831,82 @@ class StreamsController(CoreController): # like https hosts and it also offers the pre-announce 'bell' return f"{self.base_url}/announcement/{player_id}.{content_type.value}" + def get_stream( + self, media: PlayerMedia, pcm_format: AudioFormat + ) -> AsyncGenerator[bytes, None]: + """ + Get a stream of the given media as raw PCM audio. + + This is used as helper for player providers that can consume the raw PCM + audio stream directly (e.g. AirPlay) and not rely on HTTP transport. + """ + # select audio source + if media.media_type == MediaType.ANNOUNCEMENT: + # special case: stream announcement + assert media.custom_data + audio_source = self.get_announcement_stream( + media.custom_data["announcement_url"], + output_format=pcm_format, + pre_announce=media.custom_data["pre_announce"], + pre_announce_url=media.custom_data["pre_announce_url"], + ) + elif media.media_type == MediaType.PLUGIN_SOURCE: + # special case: plugin source stream + assert media.custom_data + audio_source = self.get_plugin_source_stream( + plugin_source_id=media.custom_data["source_id"], + output_format=pcm_format, + # need to pass player_id from the PlayerMedia object + # because this could have been a group + player_id=media.custom_data["player_id"], + chunk_size=get_chunksize(pcm_format, 1), # ensure 1 second chunks + ) + elif media.source_id and media.source_id.startswith(UGP_PREFIX): + # special case: UGP stream + ugp_player = cast("UniversalGroupPlayer", self.mass.players.get(media.source_id)) + ugp_stream = ugp_player.stream + assert ugp_stream is not None # for type checker + if ugp_stream.base_pcm_format == pcm_format: + # no conversion needed + audio_source = ugp_stream.subscribe_raw() + else: + audio_source = ugp_stream.get_stream(output_format=pcm_format) + elif media.source_id and media.queue_item_id and media.media_type == MediaType.FLOW_STREAM: + # regular queue (flow) stream request + queue = self.mass.player_queues.get(media.source_id) + assert queue + start_queue_item = self.mass.player_queues.get_item( + media.source_id, media.queue_item_id + ) + assert start_queue_item + audio_source = self.mass.streams.get_queue_flow_stream( + queue=queue, + start_queue_item=start_queue_item, + pcm_format=pcm_format, + ) + elif media.source_id and media.queue_item_id: + # single item stream (e.g. radio) + queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id) + assert queue_item + audio_source = buffered_audio( + self.get_queue_item_stream( + queue_item=queue_item, + pcm_format=pcm_format, + ), + pcm_format=pcm_format, + buffer_size=10, + min_buffer_before_yield=2, + ) + else: + # assume url or some other direct path + # NOTE: this will fail if its an uri not playable by ffmpeg + audio_source = get_ffmpeg_stream( + audio_input=media.uri, + input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)), + output_format=pcm_format, + ) + return audio_source + @use_audio_buffer(buffer_size=30, min_buffer_before_yield=4) async def get_queue_flow_stream( self, @@ -1077,6 +1156,7 @@ class StreamsController(CoreController): input_format=AudioFormat(content_type=ContentType.try_parse(pre_announce_url)), output_format=output_format, filter_params=filter_params, + chunk_size=get_chunksize(output_format, 1), ): yield chunk @@ -1088,6 +1168,7 @@ class StreamsController(CoreController): input_format=audio_format, output_format=output_format, filter_params=filter_params, + chunk_size=get_chunksize(output_format, 1), ): yield chunk @@ -1097,6 +1178,7 @@ class StreamsController(CoreController): output_format: AudioFormat, player_id: str, player_filter_params: list[str] | None = None, + chunk_size: int | None = None, ) -> AsyncGenerator[bytes, None]: """Get the special plugin source stream.""" plugin_prov: PluginProvider = self.mass.get_provider(plugin_source_id) @@ -1124,6 +1206,7 @@ class StreamsController(CoreController): output_format=output_format, filter_params=player_filter_params, extra_input_args=["-y", "-re"], + chunk_size=chunk_size, ): if plugin_source.in_use_by != player_id: self.logger.info( @@ -1274,7 +1357,7 @@ class StreamsController(CoreController): assert isinstance(music_prov, MusicProvider) self.mass.create_task(music_prov.on_streamed(streamdetails)) - @use_audio_buffer(buffer_size=30, min_buffer_before_yield=4) + @use_audio_buffer(buffer_size=30, min_buffer_before_yield=2) async def get_queue_item_stream_with_smartfade( self, queue_item: QueueItem, diff --git a/music_assistant/helpers/named_pipe.py b/music_assistant/helpers/named_pipe.py new file mode 100644 index 00000000..be0b394e --- /dev/null +++ b/music_assistant/helpers/named_pipe.py @@ -0,0 +1,144 @@ +"""Simple async-friendly named pipe writer using threads.""" + +from __future__ import annotations + +import asyncio +import os +import time +from contextlib import suppress +from types import TracebackType +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from logging import Logger + + +class AsyncNamedPipeWriter: + """Simple async writer for named pipes using thread pool for blocking I/O.""" + + def __init__(self, pipe_path: str, logger: Logger | None = None) -> None: + """Initialize named pipe writer. + + Args: + pipe_path: Path to the named pipe + logger: Optional logger for debug/error messages + """ + self._pipe_path = pipe_path + self.logger = logger + self._fd: int | None = None + + @property + def path(self) -> str: + """Return the named pipe path.""" + return self._pipe_path + + @property + def is_open(self) -> bool: + """Return True if the pipe is currently open.""" + return self._fd is not None + + async def create(self) -> None: + """Create the named pipe (if it does not exist).""" + + def _create() -> None: + with suppress(FileExistsError): + os.mkfifo(self._pipe_path, 0o600) + + await asyncio.to_thread(_create) + + async def open(self) -> None: + """Open the named pipe for writing (blocking operation runs in thread).""" + if self._fd is not None: + return # Already open + + def _open() -> int: + # Open pipe in BLOCKING write mode (simple approach) + # - open() blocks until the reader (cliraop) connects + # - write() blocks if pipe buffer is full + # Both operations run in thread pool via asyncio.to_thread(), + # so they won't block the event loop + return os.open(self._pipe_path, os.O_WRONLY) + + self._fd = await asyncio.to_thread(_open) + + if self.logger: + self.logger.debug("Pipe %s opened for writing", self._pipe_path) + + async def write(self, data: bytes, log_slow_writes: bool = True) -> None: + """Write data to the named pipe (blocking operation runs in thread). + + Args: + data: Data to write to the pipe + log_slow_writes: Whether to log slow writes (>5s) + + Raises: + RuntimeError: If pipe is not open + """ + if self._fd is None: + raise RuntimeError(f"Pipe {self._pipe_path} is not open") + + start_time = time.time() + + def _write() -> None: + assert self._fd is not None + # Write all data (may block if pipe buffer is full) + bytes_written = 0 + while bytes_written < len(data): + n = os.write(self._fd, data[bytes_written:]) + bytes_written += n + + # Run blocking write in thread pool + await asyncio.to_thread(_write) + + if log_slow_writes: + elapsed = time.time() - start_time + # Only log if it took more than 5 seconds (real stall) + if elapsed > 5.0 and self.logger: + self.logger.error( + "!!! STALLED PIPE WRITE: Took %.3fs to write %d bytes to %s", + elapsed, + len(data), + self._pipe_path, + ) + + async def close(self) -> None: + """Close the named pipe.""" + if self._fd is None: + return + + fd = self._fd + self._fd = None + + def _close() -> None: + with suppress(Exception): + os.close(fd) + + await asyncio.to_thread(_close) + + async def remove(self) -> None: + """Remove the named pipe.""" + await self.close() + + def _remove() -> None: + with suppress(Exception): + os.remove(self._pipe_path) + + await asyncio.to_thread(_remove) + + async def __aenter__(self) -> AsyncNamedPipeWriter: + """Context manager entry.""" + await self.open() + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Context manager exit.""" + await self.close() + + def __str__(self) -> str: + """Return string representation.""" + return self._pipe_path diff --git a/music_assistant/helpers/namedpipe.py b/music_assistant/helpers/namedpipe.py deleted file mode 100644 index 58ee4aa6..00000000 --- a/music_assistant/helpers/namedpipe.py +++ /dev/null @@ -1,189 +0,0 @@ -"""Helper for async-friendly named pipe operations.""" - -from __future__ import annotations - -import asyncio -import fcntl -import os -from contextlib import suppress -from types import TracebackType -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from logging import Logger - - -class AsyncNamedPipeWriter: - """Async writer for named pipes (FIFOs) with non-blocking I/O. - - Handles opening named pipes in non-blocking mode and writing data - asynchronously using the event loop without consuming thread pool resources. - Automatically uses optimal write chunk size based on actual pipe buffer size. - """ - - # Default pipe buffer sizes (platform-specific) - DEFAULT_PIPE_SIZE_MACOS = 16384 # macOS default: 16KB - DEFAULT_PIPE_SIZE_LINUX = 65536 # Linux default: 64KB - - # Target pipe buffer size (512KB for ~1+ seconds of 44.1kHz stereo audio buffering) - TARGET_PIPE_SIZE = 524288 - - def __init__(self, pipe_path: str, logger: Logger | None = None) -> None: - """Initialize async named pipe writer. - - Args: - pipe_path: Path to the named pipe - logger: Optional logger for debug/error messages - """ - self.pipe_path = pipe_path - self.logger = logger - self._fd: int | None = None - self._pipe_buffer_size: int = self.DEFAULT_PIPE_SIZE_MACOS - self._write_chunk_size: int = 8192 # Conservative default (8KB) - - @property - def is_open(self) -> bool: - """Return True if the pipe is currently open.""" - return self._fd is not None - - async def open(self, increase_buffer: bool = True) -> None: - """Open the named pipe in non-blocking mode for writing. - - Args: - increase_buffer: Whether to attempt increasing the pipe buffer size - """ - if self._fd is not None: - return # Already open - - def _open() -> tuple[int, int]: - # Open pipe in non-blocking binary write mode - fd = os.open(self.pipe_path, os.O_WRONLY | os.O_NONBLOCK) - - actual_size = self._pipe_buffer_size # Default - - if increase_buffer: - try: - # macOS/Linux: query current pipe buffer size - if hasattr(fcntl, "F_GETPIPE_SZ"): - current_size = fcntl.fcntl(fd, fcntl.F_GETPIPE_SZ) - actual_size = current_size - if self.logger: - self.logger.debug( - "Pipe %s buffer size: %d bytes", self.pipe_path, current_size - ) - - # Linux only: try to set larger size if F_SETPIPE_SZ exists - if hasattr(fcntl, "F_SETPIPE_SZ"): - try: - fcntl.fcntl(fd, fcntl.F_SETPIPE_SZ, self.TARGET_PIPE_SIZE) - # Verify the new size - actual_size = fcntl.fcntl(fd, fcntl.F_GETPIPE_SZ) - if self.logger: - self.logger.info( - "Pipe %s buffer increased to %d bytes", - self.pipe_path, - actual_size, - ) - except OSError as e: - if self.logger: - self.logger.debug("Could not increase pipe buffer size: %s", e) - except (OSError, AttributeError) as e: - if self.logger: - self.logger.debug("Cannot query/adjust pipe buffer size: %s", e) - - return fd, actual_size - - self._fd, self._pipe_buffer_size = await asyncio.to_thread(_open) - - # Set write chunk size based on actual pipe buffer size - # Use 1/4 of pipe buffer to avoid filling it completely in one write - # This allows better flow control and prevents blocking - self._write_chunk_size = max(8192, self._pipe_buffer_size // 4) - - if self.logger: - self.logger.debug( - "Pipe %s opened: buffer=%d bytes, write_chunk_size=%d bytes", - self.pipe_path, - self._pipe_buffer_size, - self._write_chunk_size, - ) - - async def write(self, data: bytes, timeout_per_wait: float = 0.1) -> None: - """Write data to the named pipe asynchronously. - - Writes data in chunks sized according to the pipe's buffer capacity. - Uses non-blocking I/O with event loop (no thread pool consumption). - - Args: - data: Data to write to the pipe - timeout_per_wait: Timeout for each wait iteration (default: 100ms) - - Raises: - RuntimeError: If pipe is not open - TimeoutError: If pipe write is blocked for too long (>400 waits) - """ - if self._fd is None: - raise RuntimeError(f"Pipe {self.pipe_path} is not open") - - bytes_written = 0 - wait_count = 0 - - while bytes_written < len(data): - # Write up to write_chunk_size bytes at a time - to_write = min(self._write_chunk_size, len(data) - bytes_written) - try: - # Try non-blocking write - n = os.write(self._fd, data[bytes_written : bytes_written + to_write]) - bytes_written += n - wait_count = 0 # Reset wait counter on successful write - except BlockingIOError: - # Pipe buffer is full, wait until writable - wait_count += 1 - if wait_count > 400: # Too many waits (~40+ seconds at 100ms each) - raise TimeoutError( - f"Pipe write blocked after {wait_count} waits on {self.pipe_path}" - ) - - loop = asyncio.get_event_loop() - future: asyncio.Future[None] = loop.create_future() - assert self._fd is not None # Already checked at method entry - fd = self._fd # Capture fd for closure - - def on_writable( - _loop: asyncio.AbstractEventLoop = loop, - _future: asyncio.Future[None] = future, - _fd: int = fd, - ) -> None: - _loop.remove_writer(_fd) - if not _future.done(): - _future.set_result(None) - - loop.add_writer(fd, on_writable) - try: - await asyncio.wait_for(future, timeout=timeout_per_wait) - except TimeoutError: - loop.remove_writer(fd) - # Continue loop - will hit wait_count limit if truly stuck - - async def close(self) -> None: - """Close the named pipe.""" - if self._fd is None: - return - - with suppress(Exception): - await asyncio.to_thread(os.close, self._fd) - self._fd = None - - async def __aenter__(self) -> AsyncNamedPipeWriter: - """Context manager entry.""" - await self.open() - return self - - async def __aexit__( - self, - exc_type: type[BaseException] | None, - exc_val: BaseException | None, - exc_tb: TracebackType | None, - ) -> None: - """Context manager exit.""" - await self.close() diff --git a/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 b/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 index 8bdd418a..17f5611b 100755 Binary files a/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 and b/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 differ diff --git a/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 b/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 index 86698180..7afe5bf2 100755 Binary files a/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 and b/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 differ diff --git a/music_assistant/providers/airplay/bin/cliraop-macos-arm64 b/music_assistant/providers/airplay/bin/cliraop-macos-arm64 index 43d836f5..4279b317 100755 Binary files a/music_assistant/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/providers/airplay/bin/cliraop-macos-arm64 differ diff --git a/music_assistant/providers/airplay/player.py b/music_assistant/providers/airplay/player.py index 60882ce2..924a8f52 100644 --- a/music_assistant/providers/airplay/player.py +++ b/music_assistant/providers/airplay/player.py @@ -7,15 +7,7 @@ import time from typing import TYPE_CHECKING, cast from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType -from music_assistant_models.enums import ( - ConfigEntryType, - ContentType, - MediaType, - PlaybackState, - PlayerFeature, - PlayerType, -) -from music_assistant_models.media_items import AudioFormat +from music_assistant_models.enums import ConfigEntryType, PlaybackState, PlayerFeature, PlayerType from propcache import under_cached_property as cached_property from music_assistant.constants import ( @@ -27,13 +19,10 @@ from music_assistant.constants import ( CONF_ENTRY_SYNC_ADJUST, create_sample_rates_config_entry, ) -from music_assistant.helpers.ffmpeg import get_ffmpeg_stream from music_assistant.models.player import DeviceInfo, Player, PlayerMedia -from music_assistant.providers.universal_group.constants import UGP_PREFIX from .constants import ( AIRPLAY_DISCOVERY_TYPE, - AIRPLAY_FLOW_PCM_FORMAT, AIRPLAY_PCM_FORMAT, CACHE_CATEGORY_PREV_VOLUME, CONF_ACTION_FINISH_PAIRING, @@ -56,8 +45,6 @@ from .stream_session import AirPlayStreamSession if TYPE_CHECKING: from zeroconf.asyncio import AsyncServiceInfo - from music_assistant.providers.universal_group import UniversalGroupPlayer - from .protocols.airplay2 import AirPlay2Stream from .protocols.raop import RaopStream from .provider import AirPlayProvider @@ -132,6 +119,13 @@ class AirPlayPlayer(Player): return False return super().available + @property + def corrected_elapsed_time(self) -> float: + """Return the corrected elapsed time accounting for stream session restarts.""" + if not self.stream or not self.stream.session: + return super().corrected_elapsed_time or 0.0 + return time.time() - self.stream.session.last_stream_started + async def get_config_entries( self, action: str | None = None, @@ -439,57 +433,7 @@ class AirPlayPlayer(Player): self._attr_current_media = media # select audio source - if media.media_type == MediaType.ANNOUNCEMENT: - # special case: stream announcement - assert media.custom_data - pcm_format = AIRPLAY_PCM_FORMAT - audio_source = self.mass.streams.get_announcement_stream( - media.custom_data["announcement_url"], - output_format=AIRPLAY_PCM_FORMAT, - pre_announce=media.custom_data["pre_announce"], - pre_announce_url=media.custom_data["pre_announce_url"], - ) - elif media.media_type == MediaType.PLUGIN_SOURCE: - # special case: plugin source stream - pcm_format = AIRPLAY_PCM_FORMAT - assert media.custom_data - audio_source = self.mass.streams.get_plugin_source_stream( - plugin_source_id=media.custom_data["source_id"], - output_format=AIRPLAY_PCM_FORMAT, - # need to pass player_id from the PlayerMedia object - # because this could have been a group - player_id=media.custom_data["player_id"], - ) - elif media.source_id and media.source_id.startswith(UGP_PREFIX): - # special case: UGP stream - ugp_player = cast("UniversalGroupPlayer", self.mass.players.get(media.source_id)) - ugp_stream = ugp_player.stream - assert ugp_stream is not None # for type checker - pcm_format = ugp_stream.base_pcm_format - audio_source = ugp_stream.subscribe_raw() - elif media.source_id and media.queue_item_id: - # regular queue (flow) stream request - pcm_format = AIRPLAY_FLOW_PCM_FORMAT - queue = self.mass.player_queues.get(media.source_id) - assert queue - start_queue_item = self.mass.player_queues.get_item( - media.source_id, media.queue_item_id - ) - assert start_queue_item - audio_source = self.mass.streams.get_queue_flow_stream( - queue=queue, - start_queue_item=start_queue_item, - pcm_format=pcm_format, - ) - else: - # assume url or some other direct path - # NOTE: this will fail if its an uri not playable by ffmpeg - pcm_format = AIRPLAY_PCM_FORMAT - audio_source = get_ffmpeg_stream( - audio_input=media.uri, - input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)), - output_format=AIRPLAY_PCM_FORMAT, - ) + audio_source = self.mass.streams.get_stream(media, AIRPLAY_PCM_FORMAT) # if an existing stream session is running, we could replace it with the new stream if self.stream and self.stream.running: @@ -504,7 +448,9 @@ class AirPlayPlayer(Player): # setup StreamSession for player (and its sync childs if any) sync_clients = self._get_sync_clients() provider = cast("AirPlayProvider", self.provider) - stream_session = AirPlayStreamSession(provider, sync_clients, pcm_format, audio_source) + stream_session = AirPlayStreamSession( + provider, sync_clients, AIRPLAY_PCM_FORMAT, audio_source + ) await stream_session.start() async def volume_set(self, volume_level: int) -> None: @@ -534,14 +480,18 @@ class AirPlayPlayer(Player): # nothing to do return - stream_session = self.stream.session if self.stream else None + stream_session = ( + self.stream.session + if self.stream and self.stream.running and self.stream.session + else None + ) # handle removals first if player_ids_to_remove: if self.player_id in player_ids_to_remove: # dissolve the entire sync group - if self.stream and self.stream.running and self.stream.session: + if stream_session: # stop the stream session if it is running - await self.stream.session.stop() + await stream_session.stop() self._attr_group_members = [] self.update_state() return diff --git a/music_assistant/providers/airplay/protocols/_protocol.py b/music_assistant/providers/airplay/protocols/_protocol.py index 5ba420ef..45636351 100644 --- a/music_assistant/providers/airplay/protocols/_protocol.py +++ b/music_assistant/providers/airplay/protocols/_protocol.py @@ -3,10 +3,8 @@ from __future__ import annotations import asyncio -import os import time from abc import ABC, abstractmethod -from contextlib import suppress from random import randint from typing import TYPE_CHECKING @@ -14,7 +12,7 @@ from music_assistant_models.enums import ContentType, PlaybackState from music_assistant_models.media_items import AudioFormat from music_assistant.constants import VERBOSE_LOG_LEVEL -from music_assistant.helpers.namedpipe import AsyncNamedPipeWriter +from music_assistant.helpers.named_pipe import AsyncNamedPipeWriter if TYPE_CHECKING: from music_assistant_models.player import PlayerMedia @@ -31,6 +29,7 @@ class AirPlayProtocol(ABC): with abstract methods for protocol-specific behavior. """ + _cli_proc: AsyncProcess | None # reference to the (protocol-specific) CLI process session: AirPlayStreamSession | None = None # reference to the active stream session (if any) # the pcm audio format used for streaming to this protocol @@ -52,24 +51,24 @@ class AirPlayProtocol(ABC): self.prov = player.provider self.mass = player.provider.mass self.player = player + self.logger = player.provider.logger.getChild(f"protocol.{self.__class__.__name__}") # Generate unique ID to prevent race conditions with named pipes self.active_remote_id: str = str(randint(1000, 8000)) self.prevent_playback: bool = False self._cli_proc: AsyncProcess | None = None + self.audio_pipe = AsyncNamedPipeWriter( + f"/tmp/{self.player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-audio", # noqa: S108 + self.logger, + ) + self.commands_pipe = AsyncNamedPipeWriter( + f"/tmp/{self.player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-cmd", # noqa: S108 + self.logger, + ) # State tracking self._started = asyncio.Event() self._stopped = False self._total_bytes_sent = 0 self._stream_bytes_sent = 0 - self.audio_named_pipe = ( - f"/tmp/{player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-audio" # noqa: S108 - ) - self.commands_named_pipe = ( - f"/tmp/{player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-cmd" # noqa: S108 - ) - # Async named pipe writers (kept open for session duration) - self._audio_pipe: AsyncNamedPipeWriter | None = None - self._commands_pipe: AsyncNamedPipeWriter | None = None @property def running(self) -> bool: @@ -81,12 +80,6 @@ class AirPlayProtocol(ABC): and not self._cli_proc.closed ) - @abstractmethod - async def get_ntp(self) -> int: - """Get current NTP timestamp from the CLI binary.""" - # this can probably be removed now that we already get the ntp - # in python (within the stream session start) - @abstractmethod async def start(self, start_ntp: int, skip: int = 0) -> None: """Initialize streaming process for the player. @@ -96,138 +89,45 @@ class AirPlayProtocol(ABC): skip: Number of seconds to skip (for late joiners) """ - async def start_pairing(self) -> None: - """Start pairing process for this protocol (if supported).""" - raise NotImplementedError("Pairing not implemented for this protocol") - - async def finish_pairing(self, pin: str) -> str: - """Finish pairing process with given PIN (if supported).""" - raise NotImplementedError("Pairing not implemented for this protocol") - - async def _create_pipes(self) -> None: - """Create named pipes (FIFOs) before starting CLI process. - - This must be called before starting the CLI binary so the FIFOs exist - when the CLI tries to open them for reading. - """ - await asyncio.to_thread(self._create_named_pipe, self.audio_named_pipe) - await asyncio.to_thread(self._create_named_pipe, self.commands_named_pipe) - self.player.logger.debug("Named pipes created for streaming session") - - def _create_named_pipe(self, pipe_path: str) -> None: - """Create a named pipe (FIFO) if it doesn't exist.""" - if not os.path.exists(pipe_path): - os.mkfifo(pipe_path) - - async def _open_pipes(self) -> None: - """Open both named pipes in non-blocking mode for async I/O. - - This must be called AFTER the CLI process has started and opened the pipes - for reading. Otherwise opening with O_WRONLY | O_NONBLOCK will fail with ENXIO. - """ - # Open audio pipe with buffer size optimization - self._audio_pipe = AsyncNamedPipeWriter(self.audio_named_pipe, logger=self.player.logger) - await self._audio_pipe.open(increase_buffer=True) - - # Open command pipe (no need to increase buffer for small commands) - self._commands_pipe = AsyncNamedPipeWriter( - self.commands_named_pipe, logger=self.player.logger - ) - await self._commands_pipe.open(increase_buffer=False) - - self.player.logger.debug("Named pipes opened in non-blocking mode for streaming session") - async def stop(self) -> None: """Stop playback and cleanup.""" # Send stop command before setting _stopped flag await self.send_cli_command("ACTION=STOP") - self._stopped = True - # Close named pipes (sends EOF to C side, triggering graceful shutdown) - if self._audio_pipe is not None: - await self._audio_pipe.close() - self._audio_pipe = None - - if self._commands_pipe is not None: - await self._commands_pipe.close() - self._commands_pipe = None - # Close the CLI process (wait for it to terminate) if self._cli_proc and not self._cli_proc.closed: await self._cli_proc.close(True) self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0) - # Remove named pipes from filesystem - with suppress(Exception): - await asyncio.to_thread(os.remove, self.audio_named_pipe) - with suppress(Exception): - await asyncio.to_thread(os.remove, self.commands_named_pipe) - - async def write_chunk(self, chunk: bytes) -> None: - """ - Write a (pcm) audio chunk to the stream. + # Cleanup named pipes + await self.audio_pipe.remove() + await self.commands_pipe.remove() - Writes one second worth of audio data based on the pcm format. - Uses non-blocking I/O with asyncio event loop (no thread pool consumption). - """ - if self._audio_pipe is None or not self._audio_pipe.is_open: - return + async def start_pairing(self) -> None: + """Start pairing process for this protocol (if supported).""" + raise NotImplementedError("Pairing not implemented for this protocol") - pipe_write_start = time.time() - - try: - await self._audio_pipe.write(chunk) - except TimeoutError as e: - # Re-raise with player context - raise TimeoutError(f"Player {self.player.player_id}: {e}") from e - - pipe_write_elapsed = time.time() - pipe_write_start - - # Log only truly abnormal pipe writes (>5s indicates a real stall) - # Normal writes take ~1s due to pipe rate-limiting to playback speed - # Can take up to ~4s if player's latency buffer is full - if pipe_write_elapsed > 5.0: - self.player.logger.error( - "!!! STALLED PIPE WRITE: Player %s took %.3fs to write %d bytes to pipe", - self.player.player_id, - pipe_write_elapsed, - len(chunk), - ) - - async def write_eof(self) -> None: - """Write EOF to signal end of stream.""" - # default implementation simply closes the named pipe - # can be overridden with protocol specific implementation if needed - if self._audio_pipe is not None: - await self._audio_pipe.close() - self._audio_pipe = None + async def finish_pairing(self, pin: str) -> str: + """Finish pairing process with given PIN (if supported).""" + raise NotImplementedError("Pairing not implemented for this protocol") async def send_cli_command(self, command: str) -> None: - """Send an interactive command to the running CLI binary using non-blocking I/O.""" + """Send an interactive command to the running CLI binary.""" if self._stopped or not self._cli_proc or self._cli_proc.closed: return - if self._commands_pipe is None or not self._commands_pipe.is_open: + if not self.commands_pipe: return - await self._started.wait() + if not self.commands_pipe.is_open: + await self.commands_pipe.open() - if not command.endswith("\n"): - command += "\n" + await self._started.wait() self.player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command) self.player.last_command_sent = time.time() - - # Write command to pipe - data = command.encode("utf-8") - - with suppress(BrokenPipeError): - try: - # Use shorter timeout for commands (1 second per wait iteration) - await self._commands_pipe.write(data, timeout_per_wait=1.0) - except TimeoutError: - self.player.logger.warning("Command pipe write timeout for %s", command.strip()) + await self.commands_pipe.write(command.encode("utf-8")) async def send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None: """Send metadata to player.""" diff --git a/music_assistant/providers/airplay/protocols/raop.py b/music_assistant/providers/airplay/protocols/raop.py index 1df58582..968d5869 100644 --- a/music_assistant/providers/airplay/protocols/raop.py +++ b/music_assistant/providers/airplay/protocols/raop.py @@ -10,7 +10,7 @@ from music_assistant_models.enums import PlaybackState from music_assistant_models.errors import PlayerCommandFailed from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL -from music_assistant.helpers.process import AsyncProcess, check_output +from music_assistant.helpers.process import AsyncProcess from music_assistant.providers.airplay.constants import ( CONF_ALAC_ENCODE, CONF_AP_CREDENTIALS, @@ -49,22 +49,10 @@ class RaopStream(AirPlayProtocol): and not self._cli_proc.closed ) - async def get_ntp(self) -> int: - """Get current NTP timestamp from the CLI binary.""" - # this can probably be removed now that we already get the ntp - # in python (within the stream session start) - cli_binary = await get_cli_binary(self.player.protocol) - # TODO: we can potentially also just generate this ourselves? - self.prov.logger.debug("Getting NTP timestamp from %s CLI binary", self.player.protocol) - _, stdout = await check_output(cli_binary, "-ntp") - self.prov.logger.debug(f"Output from ntp check: {stdout.decode().strip()}") - return int(stdout.strip()) - async def start(self, start_ntp: int, skip: int = 0) -> None: """Initialize CLIRaop process for a player.""" assert self.player.discovery_info is not None # for type checker cli_binary = await get_cli_binary(self.player.protocol) - extra_args: list[str] = [] player_id = self.player.player_id extra_args += ["-if", self.mass.streams.bind_ip] @@ -101,9 +89,6 @@ class RaopStream(AirPlayProtocol): # we use this intermediate binary to do the actual streaming because attempts to do # so using pure python (e.g. pyatv) were not successful due to the realtime nature - # Create named pipes before starting CLI process - await self._create_pipes() - cliraop_args = [ cli_binary, "-ntpstart", @@ -120,19 +105,20 @@ class RaopStream(AirPlayProtocol): "-activeremote", self.active_remote_id, "-cmdpipe", - self.commands_named_pipe, + self.commands_pipe.path, "-udn", self.player.discovery_info.name, self.player.address, - self.audio_named_pipe, + self.audio_pipe.path, ] self.player.logger.debug( "Starting cliraop process for player %s with args: %s", self.player.player_id, cliraop_args, ) - self._cli_proc = AsyncProcess(cliraop_args, stdin=False, stderr=True, name="cliraop") + self._cli_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop") await self._cli_proc.start() + # read up to first 50 lines of stderr to get the initial status for _ in range(50): line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore") @@ -140,15 +126,10 @@ class RaopStream(AirPlayProtocol): if "connected to " in line: self.player.logger.info("AirPlay device connected. Starting playback.") self._started.set() - # Open pipes now that cliraop is ready - await self._open_pipes() break if "Cannot connect to AirPlay device" in line: raise PlayerCommandFailed("Cannot connect to AirPlay device") - # repeat sending the volume level to the player because some players seem - # to ignore it the first time - # https://github.com/music-assistant/support/issues/3330 - await self.send_cli_command(f"VOLUME={self.player.volume_level}\n") + # start reading the stderr of the cliraop process from another task self._stderr_reader_task = self.mass.create_task(self._stderr_reader()) @@ -157,9 +138,6 @@ class RaopStream(AirPlayProtocol): assert self.player.discovery_info is not None # for type checker cli_binary = await get_cli_binary(self.player.protocol) - # Create named pipes before starting CLI process - await self._create_pipes() - cliraop_args = [ cli_binary, "-pair", @@ -170,7 +148,6 @@ class RaopStream(AirPlayProtocol): "-udn", self.player.discovery_info.name, self.player.address, - self.audio_named_pipe, ] self.player.logger.debug( "Starting PAIRING with cliraop process for player %s with args: %s", @@ -217,11 +194,10 @@ class RaopStream(AirPlayProtocol): async for line in self._cli_proc.iter_stderr(): if "elapsed milliseconds:" in line: # this is received more or less every second while playing - # millis = int(line.split("elapsed milliseconds: ")[1]) - # self.player.elapsed_time = (millis / 1000) - self.elapsed_time_correction - # self.player.elapsed_time_last_updated = time.time() - logger.log(VERBOSE_LOG_LEVEL, line) - continue + millis = int(line.split("elapsed milliseconds: ")[1]) + # note that this represents the total elapsed time of the streaming session + elapsed_time = millis / 1000 + player.set_state_from_stream(elapsed_time=elapsed_time) if "set pause" in line or "Pause at" in line: player.set_state_from_stream(state=PlaybackState.PAUSED) if "Restarted at" in line or "restarting w/ pause" in line: diff --git a/music_assistant/providers/airplay/stream_session.py b/music_assistant/providers/airplay/stream_session.py index 433f794e..1ddcb74e 100644 --- a/music_assistant/providers/airplay/stream_session.py +++ b/music_assistant/providers/airplay/stream_session.py @@ -9,6 +9,8 @@ from collections.abc import AsyncGenerator from contextlib import suppress from typing import TYPE_CHECKING +from music_assistant_models.enums import PlaybackState + from music_assistant.helpers.audio import get_player_filter_params from music_assistant.helpers.ffmpeg import FFMpeg from music_assistant.helpers.util import TaskManager, close_async_generator @@ -69,7 +71,6 @@ class AirPlayStreamSession: self.wait_start = wait_start_seconds # in seconds self.start_time = cur_time + wait_start_seconds self.start_ntp = unix_time_to_ntp(self.start_time) - self.prov.logger.info( "Starting stream session with %d clients", len(self.sync_clients), @@ -89,11 +90,20 @@ class AirPlayStreamSession: # Link stream session to player stream airplay_player.stream.session = self - - await self._start_client_ffmpeg(airplay_player) - + # create the named pipes + await airplay_player.stream.audio_pipe.create() + await airplay_player.stream.commands_pipe.create() # start the stream await airplay_player.stream.start(self.start_ntp) + # start the (player-specific) ffmpeg process + # note that ffmpeg will open the named pipe for writing + await self._start_client_ffmpeg(airplay_player) + # open the command pipe for writing + await airplay_player.stream.commands_pipe.open() + # repeat sending the volume level to the player because some players seem + # to ignore it the first time + # https://github.com/music-assistant/support/issues/3330 + await airplay_player.stream.send_cli_command(f"VOLUME={airplay_player.volume_level}\n") async with TaskManager(self.mass) as tm: for _airplay_player in self.sync_clients: @@ -121,10 +131,10 @@ class AirPlayStreamSession: assert airplay_player.stream.session == self async with self._lock: self.sync_clients.remove(airplay_player) + await airplay_player.stream.stop() if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): await ffmpeg.close() del ffmpeg - await airplay_player.stream.stop() airplay_player.stream = None # If this was the last client, stop the session if not self.sync_clients: @@ -144,6 +154,7 @@ class AirPlayStreamSession: return allow_late_join = self.prov.config.get_value(CONF_ENABLE_LATE_JOIN, False) + allow_late_join = False # TODO: disable late join for now until we can test it properly if not allow_late_join: # Late joining is not allowed - restart the session for all players await self.stop() # we need to stop the current session to add a new client @@ -203,46 +214,41 @@ class AirPlayStreamSession: # Set new audio source and restart the stream self._audio_source = audio_source self._audio_source_task = asyncio.create_task(self._audio_streamer()) - self.last_stream_started = time.time() for sync_client in self.sync_clients: sync_client.set_state_from_stream(state=None, elapsed_time=0) - async def _audio_streamer(self) -> None: + async def _audio_streamer(self) -> None: # noqa: PLR0915 """Stream audio to all players.""" generator_exhausted = False _last_metadata: str | None = None + chunk_size = self.pcm_format.pcm_sample_size try: # each chunk is exactly one second of audio data based on the pcm format. async for chunk in self._audio_source: + if len(chunk) != chunk_size: + self.prov.logger.warning( + "Audio source yielded chunk of unexpected size %d (expected %d), " + "this may lead to desync issues", + len(chunk), + chunk_size, + ) async with self._lock: sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running] if not sync_clients: - self.prov.logger.error( - "!!! AUDIO STREAMER EXITING: No running clients left! " - "Total sync_clients: %d, Details: %s", - len(self.sync_clients), - [ - (x.player_id, x.stream.running if x.stream else None) - for x in self.sync_clients - ], + self.prov.logger.debug( + "Audio streamer exiting: No running clients left in session" ) return - # Write to all players with a timeout (10 seconds) - # Timeout must account for player's internal latency buffer (1-4 seconds) - # The player may legitimately not accept data while draining its buffer - write_start = time.time() + # Write chunk to all players write_tasks = [ - asyncio.wait_for(self._write_chunk_to_player(x, chunk), timeout=10.0) - for x in sync_clients - if x.stream + self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream ] results = await asyncio.gather(*write_tasks, return_exceptions=True) - write_elapsed = time.time() - write_start # Check for write errors or timeouts - players_to_remove = [] + players_to_remove: list[AirPlayPlayer] = [] for i, result in enumerate(results): if i >= len(sync_clients): continue @@ -250,21 +256,17 @@ class AirPlayStreamSession: if isinstance(result, asyncio.TimeoutError): self.prov.logger.error( - "!!! TIMEOUT writing chunk %d to player %s - " - "REMOVING from sync group! Total write time=%.3fs", + "TIMEOUT writing chunk %d to player %s - REMOVING from sync group!", self.chunks_streamed, player.player_id, - write_elapsed, ) players_to_remove.append(player) elif isinstance(result, Exception): self.prov.logger.error( - "!!! Error writing chunk %d to player %s: %s - " - "REMOVING from sync group! Total write time=%.3fs", + "Error writing chunk %d to player %s: %s - REMOVING from sync group!", self.chunks_streamed, player.player_id, result, - write_elapsed, ) players_to_remove.append(player) @@ -338,20 +340,20 @@ class AirPlayStreamSession: chunk_number = self.chunks_streamed + 1 player_id = airplay_player.player_id - # if the player has an associated FFMpeg instance, use that first + # don't write a chunk if we're paused + while airplay_player.playback_state == PlaybackState.PAUSED: + await asyncio.sleep(0.1) + + # we write the chunk to the player's ffmpeg process which + # applies any player-specific filters (e.g. volume, dsp, etc) + # and outputs in the correct format for the player stream + # to the named pipe associated with the player's stream if ffmpeg := self._player_ffmpeg.get(player_id): await ffmpeg.write(chunk) - chunk_to_send = await ffmpeg.read(len(chunk)) - else: - chunk_to_send = chunk - assert airplay_player.stream stream_write_start = time.time() - await airplay_player.stream.write_chunk(chunk_to_send) stream_write_elapsed = time.time() - stream_write_start - total_elapsed = time.time() - write_start - # Log only truly abnormal writes (>5s indicates a real stall) # Can take up to ~4s if player's latency buffer is being drained if total_elapsed > 5.0: @@ -369,8 +371,6 @@ class AirPlayStreamSession: if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): await ffmpeg.write_eof() await ffmpeg.close() - assert airplay_player.stream - await airplay_player.stream.write_eof() async def _send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None: """Send metadata to all players.""" @@ -391,19 +391,23 @@ class AirPlayStreamSession: await ffmpeg.close() del ffmpeg assert airplay_player.stream # for type checker - # Create optional FFMpeg instance per player if needed + # Create the FFMpeg instance per player which accepts our PCM audio + # applies any player-specific filters (e.g. volume, dsp, etc) + # and outputs in the correct format for the player stream + # to the named pipe associated with the player's stream filter_params = get_player_filter_params( self.mass, airplay_player.player_id, self.pcm_format, airplay_player.stream.pcm_format, ) - if filter_params or self.pcm_format != airplay_player.stream.pcm_format: - ffmpeg = FFMpeg( - audio_input="-", - input_format=self.pcm_format, - output_format=airplay_player.stream.pcm_format, - filter_params=filter_params, - ) - await ffmpeg.start() - self._player_ffmpeg[airplay_player.player_id] = ffmpeg + ffmpeg = FFMpeg( + audio_input="-", + input_format=self.pcm_format, + output_format=airplay_player.stream.pcm_format, + filter_params=filter_params, + audio_output=airplay_player.stream.audio_pipe.path, + extra_input_args=["-y"], + ) + await ffmpeg.start() + self._player_ffmpeg[airplay_player.player_id] = ffmpeg