import urllib.parse
from collections.abc import AsyncGenerator
from dataclasses import dataclass
-from typing import TYPE_CHECKING, Final
+from typing import TYPE_CHECKING, Final, cast
from aiofiles.os import wrap
from aiohttp import web
get_stream_details,
resample_pcm_audio,
)
-from music_assistant.helpers.buffered_generator import use_audio_buffer
+from music_assistant.helpers.buffered_generator import buffered_audio, use_audio_buffer
from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
from music_assistant.helpers.smart_fades import (
from music_assistant.models.core_controller import CoreController
from music_assistant.models.music_provider import MusicProvider
from music_assistant.models.plugin import PluginProvider, PluginSource
+from music_assistant.providers.universal_group.constants import UGP_PREFIX
+from music_assistant.providers.universal_group.player import UniversalGroupPlayer
if TYPE_CHECKING:
from music_assistant_models.config_entries import CoreConfig
+ from music_assistant_models.player import PlayerMedia
from music_assistant_models.player_queue import PlayerQueue
from music_assistant_models.queue_item import QueueItem
from music_assistant_models.streamdetails import StreamDetails
if queue_item.media_type == MediaType.RADIO:
# keep very short buffer for radio streams
# to keep them (more or less) realtime and prevent time outs
- read_rate_input_args = ["-readrate", "1.0", "-readrate_initial_burst", "3"]
+ read_rate_input_args = ["-readrate", "1.00", "-readrate_initial_burst", "1"]
elif "Network_Module" in user_agent or "transferMode.dlna.org" in request.headers:
# and ofcourse we have an exception of the exception. Where most players actually NEED
# the readrate filter to avoid disconnecting, some other players (DLNA/MusicCast)
# like https hosts and it also offers the pre-announce 'bell'
return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
+ def get_stream(
+ self, media: PlayerMedia, pcm_format: AudioFormat
+ ) -> AsyncGenerator[bytes, None]:
+ """
+ Get a stream of the given media as raw PCM audio.
+
+ This is used as helper for player providers that can consume the raw PCM
+ audio stream directly (e.g. AirPlay) and not rely on HTTP transport.
+ """
+ # select audio source
+ if media.media_type == MediaType.ANNOUNCEMENT:
+ # special case: stream announcement
+ assert media.custom_data
+ audio_source = self.get_announcement_stream(
+ media.custom_data["announcement_url"],
+ output_format=pcm_format,
+ pre_announce=media.custom_data["pre_announce"],
+ pre_announce_url=media.custom_data["pre_announce_url"],
+ )
+ elif media.media_type == MediaType.PLUGIN_SOURCE:
+ # special case: plugin source stream
+ assert media.custom_data
+ audio_source = self.get_plugin_source_stream(
+ plugin_source_id=media.custom_data["source_id"],
+ output_format=pcm_format,
+ # need to pass player_id from the PlayerMedia object
+ # because this could have been a group
+ player_id=media.custom_data["player_id"],
+ chunk_size=get_chunksize(pcm_format, 1), # ensure 1 second chunks
+ )
+ elif media.source_id and media.source_id.startswith(UGP_PREFIX):
+ # special case: UGP stream
+ ugp_player = cast("UniversalGroupPlayer", self.mass.players.get(media.source_id))
+ ugp_stream = ugp_player.stream
+ assert ugp_stream is not None # for type checker
+ if ugp_stream.base_pcm_format == pcm_format:
+ # no conversion needed
+ audio_source = ugp_stream.subscribe_raw()
+ else:
+ audio_source = ugp_stream.get_stream(output_format=pcm_format)
+ elif media.source_id and media.queue_item_id and media.media_type == MediaType.FLOW_STREAM:
+ # regular queue (flow) stream request
+ queue = self.mass.player_queues.get(media.source_id)
+ assert queue
+ start_queue_item = self.mass.player_queues.get_item(
+ media.source_id, media.queue_item_id
+ )
+ assert start_queue_item
+ audio_source = self.mass.streams.get_queue_flow_stream(
+ queue=queue,
+ start_queue_item=start_queue_item,
+ pcm_format=pcm_format,
+ )
+ elif media.source_id and media.queue_item_id:
+ # single item stream (e.g. radio)
+ queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
+ assert queue_item
+ audio_source = buffered_audio(
+ self.get_queue_item_stream(
+ queue_item=queue_item,
+ pcm_format=pcm_format,
+ ),
+ pcm_format=pcm_format,
+ buffer_size=10,
+ min_buffer_before_yield=2,
+ )
+ else:
+ # assume url or some other direct path
+ # NOTE: this will fail if its an uri not playable by ffmpeg
+ audio_source = get_ffmpeg_stream(
+ audio_input=media.uri,
+ input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
+ output_format=pcm_format,
+ )
+ return audio_source
+
@use_audio_buffer(buffer_size=30, min_buffer_before_yield=4)
async def get_queue_flow_stream(
self,
input_format=AudioFormat(content_type=ContentType.try_parse(pre_announce_url)),
output_format=output_format,
filter_params=filter_params,
+ chunk_size=get_chunksize(output_format, 1),
):
yield chunk
input_format=audio_format,
output_format=output_format,
filter_params=filter_params,
+ chunk_size=get_chunksize(output_format, 1),
):
yield chunk
output_format: AudioFormat,
player_id: str,
player_filter_params: list[str] | None = None,
+ chunk_size: int | None = None,
) -> AsyncGenerator[bytes, None]:
"""Get the special plugin source stream."""
plugin_prov: PluginProvider = self.mass.get_provider(plugin_source_id)
output_format=output_format,
filter_params=player_filter_params,
extra_input_args=["-y", "-re"],
+ chunk_size=chunk_size,
):
if plugin_source.in_use_by != player_id:
self.logger.info(
assert isinstance(music_prov, MusicProvider)
self.mass.create_task(music_prov.on_streamed(streamdetails))
- @use_audio_buffer(buffer_size=30, min_buffer_before_yield=4)
+ @use_audio_buffer(buffer_size=30, min_buffer_before_yield=2)
async def get_queue_item_stream_with_smartfade(
self,
queue_item: QueueItem,
--- /dev/null
+"""Simple async-friendly named pipe writer using threads."""
+
+from __future__ import annotations
+
+import asyncio
+import os
+import time
+from contextlib import suppress
+from types import TracebackType
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from logging import Logger
+
+
+class AsyncNamedPipeWriter:
+ """Simple async writer for named pipes using thread pool for blocking I/O."""
+
+ def __init__(self, pipe_path: str, logger: Logger | None = None) -> None:
+ """Initialize named pipe writer.
+
+ Args:
+ pipe_path: Path to the named pipe
+ logger: Optional logger for debug/error messages
+ """
+ self._pipe_path = pipe_path
+ self.logger = logger
+ self._fd: int | None = None
+
+ @property
+ def path(self) -> str:
+ """Return the named pipe path."""
+ return self._pipe_path
+
+ @property
+ def is_open(self) -> bool:
+ """Return True if the pipe is currently open."""
+ return self._fd is not None
+
+ async def create(self) -> None:
+ """Create the named pipe (if it does not exist)."""
+
+ def _create() -> None:
+ with suppress(FileExistsError):
+ os.mkfifo(self._pipe_path, 0o600)
+
+ await asyncio.to_thread(_create)
+
+ async def open(self) -> None:
+ """Open the named pipe for writing (blocking operation runs in thread)."""
+ if self._fd is not None:
+ return # Already open
+
+ def _open() -> int:
+ # Open pipe in BLOCKING write mode (simple approach)
+ # - open() blocks until the reader (cliraop) connects
+ # - write() blocks if pipe buffer is full
+ # Both operations run in thread pool via asyncio.to_thread(),
+ # so they won't block the event loop
+ return os.open(self._pipe_path, os.O_WRONLY)
+
+ self._fd = await asyncio.to_thread(_open)
+
+ if self.logger:
+ self.logger.debug("Pipe %s opened for writing", self._pipe_path)
+
+ async def write(self, data: bytes, log_slow_writes: bool = True) -> None:
+ """Write data to the named pipe (blocking operation runs in thread).
+
+ Args:
+ data: Data to write to the pipe
+ log_slow_writes: Whether to log slow writes (>5s)
+
+ Raises:
+ RuntimeError: If pipe is not open
+ """
+ if self._fd is None:
+ raise RuntimeError(f"Pipe {self._pipe_path} is not open")
+
+ start_time = time.time()
+
+ def _write() -> None:
+ assert self._fd is not None
+ # Write all data (may block if pipe buffer is full)
+ bytes_written = 0
+ while bytes_written < len(data):
+ n = os.write(self._fd, data[bytes_written:])
+ bytes_written += n
+
+ # Run blocking write in thread pool
+ await asyncio.to_thread(_write)
+
+ if log_slow_writes:
+ elapsed = time.time() - start_time
+ # Only log if it took more than 5 seconds (real stall)
+ if elapsed > 5.0 and self.logger:
+ self.logger.error(
+ "!!! STALLED PIPE WRITE: Took %.3fs to write %d bytes to %s",
+ elapsed,
+ len(data),
+ self._pipe_path,
+ )
+
+ async def close(self) -> None:
+ """Close the named pipe."""
+ if self._fd is None:
+ return
+
+ fd = self._fd
+ self._fd = None
+
+ def _close() -> None:
+ with suppress(Exception):
+ os.close(fd)
+
+ await asyncio.to_thread(_close)
+
+ async def remove(self) -> None:
+ """Remove the named pipe."""
+ await self.close()
+
+ def _remove() -> None:
+ with suppress(Exception):
+ os.remove(self._pipe_path)
+
+ await asyncio.to_thread(_remove)
+
+ async def __aenter__(self) -> AsyncNamedPipeWriter:
+ """Context manager entry."""
+ await self.open()
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> None:
+ """Context manager exit."""
+ await self.close()
+
+ def __str__(self) -> str:
+ """Return string representation."""
+ return self._pipe_path
+++ /dev/null
-"""Helper for async-friendly named pipe operations."""
-
-from __future__ import annotations
-
-import asyncio
-import fcntl
-import os
-from contextlib import suppress
-from types import TracebackType
-from typing import TYPE_CHECKING
-
-if TYPE_CHECKING:
- from logging import Logger
-
-
-class AsyncNamedPipeWriter:
- """Async writer for named pipes (FIFOs) with non-blocking I/O.
-
- Handles opening named pipes in non-blocking mode and writing data
- asynchronously using the event loop without consuming thread pool resources.
- Automatically uses optimal write chunk size based on actual pipe buffer size.
- """
-
- # Default pipe buffer sizes (platform-specific)
- DEFAULT_PIPE_SIZE_MACOS = 16384 # macOS default: 16KB
- DEFAULT_PIPE_SIZE_LINUX = 65536 # Linux default: 64KB
-
- # Target pipe buffer size (512KB for ~1+ seconds of 44.1kHz stereo audio buffering)
- TARGET_PIPE_SIZE = 524288
-
- def __init__(self, pipe_path: str, logger: Logger | None = None) -> None:
- """Initialize async named pipe writer.
-
- Args:
- pipe_path: Path to the named pipe
- logger: Optional logger for debug/error messages
- """
- self.pipe_path = pipe_path
- self.logger = logger
- self._fd: int | None = None
- self._pipe_buffer_size: int = self.DEFAULT_PIPE_SIZE_MACOS
- self._write_chunk_size: int = 8192 # Conservative default (8KB)
-
- @property
- def is_open(self) -> bool:
- """Return True if the pipe is currently open."""
- return self._fd is not None
-
- async def open(self, increase_buffer: bool = True) -> None:
- """Open the named pipe in non-blocking mode for writing.
-
- Args:
- increase_buffer: Whether to attempt increasing the pipe buffer size
- """
- if self._fd is not None:
- return # Already open
-
- def _open() -> tuple[int, int]:
- # Open pipe in non-blocking binary write mode
- fd = os.open(self.pipe_path, os.O_WRONLY | os.O_NONBLOCK)
-
- actual_size = self._pipe_buffer_size # Default
-
- if increase_buffer:
- try:
- # macOS/Linux: query current pipe buffer size
- if hasattr(fcntl, "F_GETPIPE_SZ"):
- current_size = fcntl.fcntl(fd, fcntl.F_GETPIPE_SZ)
- actual_size = current_size
- if self.logger:
- self.logger.debug(
- "Pipe %s buffer size: %d bytes", self.pipe_path, current_size
- )
-
- # Linux only: try to set larger size if F_SETPIPE_SZ exists
- if hasattr(fcntl, "F_SETPIPE_SZ"):
- try:
- fcntl.fcntl(fd, fcntl.F_SETPIPE_SZ, self.TARGET_PIPE_SIZE)
- # Verify the new size
- actual_size = fcntl.fcntl(fd, fcntl.F_GETPIPE_SZ)
- if self.logger:
- self.logger.info(
- "Pipe %s buffer increased to %d bytes",
- self.pipe_path,
- actual_size,
- )
- except OSError as e:
- if self.logger:
- self.logger.debug("Could not increase pipe buffer size: %s", e)
- except (OSError, AttributeError) as e:
- if self.logger:
- self.logger.debug("Cannot query/adjust pipe buffer size: %s", e)
-
- return fd, actual_size
-
- self._fd, self._pipe_buffer_size = await asyncio.to_thread(_open)
-
- # Set write chunk size based on actual pipe buffer size
- # Use 1/4 of pipe buffer to avoid filling it completely in one write
- # This allows better flow control and prevents blocking
- self._write_chunk_size = max(8192, self._pipe_buffer_size // 4)
-
- if self.logger:
- self.logger.debug(
- "Pipe %s opened: buffer=%d bytes, write_chunk_size=%d bytes",
- self.pipe_path,
- self._pipe_buffer_size,
- self._write_chunk_size,
- )
-
- async def write(self, data: bytes, timeout_per_wait: float = 0.1) -> None:
- """Write data to the named pipe asynchronously.
-
- Writes data in chunks sized according to the pipe's buffer capacity.
- Uses non-blocking I/O with event loop (no thread pool consumption).
-
- Args:
- data: Data to write to the pipe
- timeout_per_wait: Timeout for each wait iteration (default: 100ms)
-
- Raises:
- RuntimeError: If pipe is not open
- TimeoutError: If pipe write is blocked for too long (>400 waits)
- """
- if self._fd is None:
- raise RuntimeError(f"Pipe {self.pipe_path} is not open")
-
- bytes_written = 0
- wait_count = 0
-
- while bytes_written < len(data):
- # Write up to write_chunk_size bytes at a time
- to_write = min(self._write_chunk_size, len(data) - bytes_written)
- try:
- # Try non-blocking write
- n = os.write(self._fd, data[bytes_written : bytes_written + to_write])
- bytes_written += n
- wait_count = 0 # Reset wait counter on successful write
- except BlockingIOError:
- # Pipe buffer is full, wait until writable
- wait_count += 1
- if wait_count > 400: # Too many waits (~40+ seconds at 100ms each)
- raise TimeoutError(
- f"Pipe write blocked after {wait_count} waits on {self.pipe_path}"
- )
-
- loop = asyncio.get_event_loop()
- future: asyncio.Future[None] = loop.create_future()
- assert self._fd is not None # Already checked at method entry
- fd = self._fd # Capture fd for closure
-
- def on_writable(
- _loop: asyncio.AbstractEventLoop = loop,
- _future: asyncio.Future[None] = future,
- _fd: int = fd,
- ) -> None:
- _loop.remove_writer(_fd)
- if not _future.done():
- _future.set_result(None)
-
- loop.add_writer(fd, on_writable)
- try:
- await asyncio.wait_for(future, timeout=timeout_per_wait)
- except TimeoutError:
- loop.remove_writer(fd)
- # Continue loop - will hit wait_count limit if truly stuck
-
- async def close(self) -> None:
- """Close the named pipe."""
- if self._fd is None:
- return
-
- with suppress(Exception):
- await asyncio.to_thread(os.close, self._fd)
- self._fd = None
-
- async def __aenter__(self) -> AsyncNamedPipeWriter:
- """Context manager entry."""
- await self.open()
- return self
-
- async def __aexit__(
- self,
- exc_type: type[BaseException] | None,
- exc_val: BaseException | None,
- exc_tb: TracebackType | None,
- ) -> None:
- """Context manager exit."""
- await self.close()
from typing import TYPE_CHECKING, cast
from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
-from music_assistant_models.enums import (
- ConfigEntryType,
- ContentType,
- MediaType,
- PlaybackState,
- PlayerFeature,
- PlayerType,
-)
-from music_assistant_models.media_items import AudioFormat
+from music_assistant_models.enums import ConfigEntryType, PlaybackState, PlayerFeature, PlayerType
from propcache import under_cached_property as cached_property
from music_assistant.constants import (
CONF_ENTRY_SYNC_ADJUST,
create_sample_rates_config_entry,
)
-from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
from music_assistant.models.player import DeviceInfo, Player, PlayerMedia
-from music_assistant.providers.universal_group.constants import UGP_PREFIX
from .constants import (
AIRPLAY_DISCOVERY_TYPE,
- AIRPLAY_FLOW_PCM_FORMAT,
AIRPLAY_PCM_FORMAT,
CACHE_CATEGORY_PREV_VOLUME,
CONF_ACTION_FINISH_PAIRING,
if TYPE_CHECKING:
from zeroconf.asyncio import AsyncServiceInfo
- from music_assistant.providers.universal_group import UniversalGroupPlayer
-
from .protocols.airplay2 import AirPlay2Stream
from .protocols.raop import RaopStream
from .provider import AirPlayProvider
return False
return super().available
+ @property
+ def corrected_elapsed_time(self) -> float:
+ """Return the corrected elapsed time accounting for stream session restarts."""
+ if not self.stream or not self.stream.session:
+ return super().corrected_elapsed_time or 0.0
+ return time.time() - self.stream.session.last_stream_started
+
async def get_config_entries(
self,
action: str | None = None,
self._attr_current_media = media
# select audio source
- if media.media_type == MediaType.ANNOUNCEMENT:
- # special case: stream announcement
- assert media.custom_data
- pcm_format = AIRPLAY_PCM_FORMAT
- audio_source = self.mass.streams.get_announcement_stream(
- media.custom_data["announcement_url"],
- output_format=AIRPLAY_PCM_FORMAT,
- pre_announce=media.custom_data["pre_announce"],
- pre_announce_url=media.custom_data["pre_announce_url"],
- )
- elif media.media_type == MediaType.PLUGIN_SOURCE:
- # special case: plugin source stream
- pcm_format = AIRPLAY_PCM_FORMAT
- assert media.custom_data
- audio_source = self.mass.streams.get_plugin_source_stream(
- plugin_source_id=media.custom_data["source_id"],
- output_format=AIRPLAY_PCM_FORMAT,
- # need to pass player_id from the PlayerMedia object
- # because this could have been a group
- player_id=media.custom_data["player_id"],
- )
- elif media.source_id and media.source_id.startswith(UGP_PREFIX):
- # special case: UGP stream
- ugp_player = cast("UniversalGroupPlayer", self.mass.players.get(media.source_id))
- ugp_stream = ugp_player.stream
- assert ugp_stream is not None # for type checker
- pcm_format = ugp_stream.base_pcm_format
- audio_source = ugp_stream.subscribe_raw()
- elif media.source_id and media.queue_item_id:
- # regular queue (flow) stream request
- pcm_format = AIRPLAY_FLOW_PCM_FORMAT
- queue = self.mass.player_queues.get(media.source_id)
- assert queue
- start_queue_item = self.mass.player_queues.get_item(
- media.source_id, media.queue_item_id
- )
- assert start_queue_item
- audio_source = self.mass.streams.get_queue_flow_stream(
- queue=queue,
- start_queue_item=start_queue_item,
- pcm_format=pcm_format,
- )
- else:
- # assume url or some other direct path
- # NOTE: this will fail if its an uri not playable by ffmpeg
- pcm_format = AIRPLAY_PCM_FORMAT
- audio_source = get_ffmpeg_stream(
- audio_input=media.uri,
- input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
- output_format=AIRPLAY_PCM_FORMAT,
- )
+ audio_source = self.mass.streams.get_stream(media, AIRPLAY_PCM_FORMAT)
# if an existing stream session is running, we could replace it with the new stream
if self.stream and self.stream.running:
# setup StreamSession for player (and its sync childs if any)
sync_clients = self._get_sync_clients()
provider = cast("AirPlayProvider", self.provider)
- stream_session = AirPlayStreamSession(provider, sync_clients, pcm_format, audio_source)
+ stream_session = AirPlayStreamSession(
+ provider, sync_clients, AIRPLAY_PCM_FORMAT, audio_source
+ )
await stream_session.start()
async def volume_set(self, volume_level: int) -> None:
# nothing to do
return
- stream_session = self.stream.session if self.stream else None
+ stream_session = (
+ self.stream.session
+ if self.stream and self.stream.running and self.stream.session
+ else None
+ )
# handle removals first
if player_ids_to_remove:
if self.player_id in player_ids_to_remove:
# dissolve the entire sync group
- if self.stream and self.stream.running and self.stream.session:
+ if stream_session:
# stop the stream session if it is running
- await self.stream.session.stop()
+ await stream_session.stop()
self._attr_group_members = []
self.update_state()
return
from __future__ import annotations
import asyncio
-import os
import time
from abc import ABC, abstractmethod
-from contextlib import suppress
from random import randint
from typing import TYPE_CHECKING
from music_assistant_models.media_items import AudioFormat
from music_assistant.constants import VERBOSE_LOG_LEVEL
-from music_assistant.helpers.namedpipe import AsyncNamedPipeWriter
+from music_assistant.helpers.named_pipe import AsyncNamedPipeWriter
if TYPE_CHECKING:
from music_assistant_models.player import PlayerMedia
with abstract methods for protocol-specific behavior.
"""
+ _cli_proc: AsyncProcess | None # reference to the (protocol-specific) CLI process
session: AirPlayStreamSession | None = None # reference to the active stream session (if any)
# the pcm audio format used for streaming to this protocol
self.prov = player.provider
self.mass = player.provider.mass
self.player = player
+ self.logger = player.provider.logger.getChild(f"protocol.{self.__class__.__name__}")
# Generate unique ID to prevent race conditions with named pipes
self.active_remote_id: str = str(randint(1000, 8000))
self.prevent_playback: bool = False
self._cli_proc: AsyncProcess | None = None
+ self.audio_pipe = AsyncNamedPipeWriter(
+ f"/tmp/{self.player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-audio", # noqa: S108
+ self.logger,
+ )
+ self.commands_pipe = AsyncNamedPipeWriter(
+ f"/tmp/{self.player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-cmd", # noqa: S108
+ self.logger,
+ )
# State tracking
self._started = asyncio.Event()
self._stopped = False
self._total_bytes_sent = 0
self._stream_bytes_sent = 0
- self.audio_named_pipe = (
- f"/tmp/{player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-audio" # noqa: S108
- )
- self.commands_named_pipe = (
- f"/tmp/{player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-cmd" # noqa: S108
- )
- # Async named pipe writers (kept open for session duration)
- self._audio_pipe: AsyncNamedPipeWriter | None = None
- self._commands_pipe: AsyncNamedPipeWriter | None = None
@property
def running(self) -> bool:
and not self._cli_proc.closed
)
- @abstractmethod
- async def get_ntp(self) -> int:
- """Get current NTP timestamp from the CLI binary."""
- # this can probably be removed now that we already get the ntp
- # in python (within the stream session start)
-
@abstractmethod
async def start(self, start_ntp: int, skip: int = 0) -> None:
"""Initialize streaming process for the player.
skip: Number of seconds to skip (for late joiners)
"""
- async def start_pairing(self) -> None:
- """Start pairing process for this protocol (if supported)."""
- raise NotImplementedError("Pairing not implemented for this protocol")
-
- async def finish_pairing(self, pin: str) -> str:
- """Finish pairing process with given PIN (if supported)."""
- raise NotImplementedError("Pairing not implemented for this protocol")
-
- async def _create_pipes(self) -> None:
- """Create named pipes (FIFOs) before starting CLI process.
-
- This must be called before starting the CLI binary so the FIFOs exist
- when the CLI tries to open them for reading.
- """
- await asyncio.to_thread(self._create_named_pipe, self.audio_named_pipe)
- await asyncio.to_thread(self._create_named_pipe, self.commands_named_pipe)
- self.player.logger.debug("Named pipes created for streaming session")
-
- def _create_named_pipe(self, pipe_path: str) -> None:
- """Create a named pipe (FIFO) if it doesn't exist."""
- if not os.path.exists(pipe_path):
- os.mkfifo(pipe_path)
-
- async def _open_pipes(self) -> None:
- """Open both named pipes in non-blocking mode for async I/O.
-
- This must be called AFTER the CLI process has started and opened the pipes
- for reading. Otherwise opening with O_WRONLY | O_NONBLOCK will fail with ENXIO.
- """
- # Open audio pipe with buffer size optimization
- self._audio_pipe = AsyncNamedPipeWriter(self.audio_named_pipe, logger=self.player.logger)
- await self._audio_pipe.open(increase_buffer=True)
-
- # Open command pipe (no need to increase buffer for small commands)
- self._commands_pipe = AsyncNamedPipeWriter(
- self.commands_named_pipe, logger=self.player.logger
- )
- await self._commands_pipe.open(increase_buffer=False)
-
- self.player.logger.debug("Named pipes opened in non-blocking mode for streaming session")
-
async def stop(self) -> None:
"""Stop playback and cleanup."""
# Send stop command before setting _stopped flag
await self.send_cli_command("ACTION=STOP")
-
self._stopped = True
- # Close named pipes (sends EOF to C side, triggering graceful shutdown)
- if self._audio_pipe is not None:
- await self._audio_pipe.close()
- self._audio_pipe = None
-
- if self._commands_pipe is not None:
- await self._commands_pipe.close()
- self._commands_pipe = None
-
# Close the CLI process (wait for it to terminate)
if self._cli_proc and not self._cli_proc.closed:
await self._cli_proc.close(True)
self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0)
- # Remove named pipes from filesystem
- with suppress(Exception):
- await asyncio.to_thread(os.remove, self.audio_named_pipe)
- with suppress(Exception):
- await asyncio.to_thread(os.remove, self.commands_named_pipe)
-
- async def write_chunk(self, chunk: bytes) -> None:
- """
- Write a (pcm) audio chunk to the stream.
+ # Cleanup named pipes
+ await self.audio_pipe.remove()
+ await self.commands_pipe.remove()
- Writes one second worth of audio data based on the pcm format.
- Uses non-blocking I/O with asyncio event loop (no thread pool consumption).
- """
- if self._audio_pipe is None or not self._audio_pipe.is_open:
- return
+ async def start_pairing(self) -> None:
+ """Start pairing process for this protocol (if supported)."""
+ raise NotImplementedError("Pairing not implemented for this protocol")
- pipe_write_start = time.time()
-
- try:
- await self._audio_pipe.write(chunk)
- except TimeoutError as e:
- # Re-raise with player context
- raise TimeoutError(f"Player {self.player.player_id}: {e}") from e
-
- pipe_write_elapsed = time.time() - pipe_write_start
-
- # Log only truly abnormal pipe writes (>5s indicates a real stall)
- # Normal writes take ~1s due to pipe rate-limiting to playback speed
- # Can take up to ~4s if player's latency buffer is full
- if pipe_write_elapsed > 5.0:
- self.player.logger.error(
- "!!! STALLED PIPE WRITE: Player %s took %.3fs to write %d bytes to pipe",
- self.player.player_id,
- pipe_write_elapsed,
- len(chunk),
- )
-
- async def write_eof(self) -> None:
- """Write EOF to signal end of stream."""
- # default implementation simply closes the named pipe
- # can be overridden with protocol specific implementation if needed
- if self._audio_pipe is not None:
- await self._audio_pipe.close()
- self._audio_pipe = None
+ async def finish_pairing(self, pin: str) -> str:
+ """Finish pairing process with given PIN (if supported)."""
+ raise NotImplementedError("Pairing not implemented for this protocol")
async def send_cli_command(self, command: str) -> None:
- """Send an interactive command to the running CLI binary using non-blocking I/O."""
+ """Send an interactive command to the running CLI binary."""
if self._stopped or not self._cli_proc or self._cli_proc.closed:
return
- if self._commands_pipe is None or not self._commands_pipe.is_open:
+ if not self.commands_pipe:
return
- await self._started.wait()
+ if not self.commands_pipe.is_open:
+ await self.commands_pipe.open()
- if not command.endswith("\n"):
- command += "\n"
+ await self._started.wait()
self.player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
self.player.last_command_sent = time.time()
-
- # Write command to pipe
- data = command.encode("utf-8")
-
- with suppress(BrokenPipeError):
- try:
- # Use shorter timeout for commands (1 second per wait iteration)
- await self._commands_pipe.write(data, timeout_per_wait=1.0)
- except TimeoutError:
- self.player.logger.warning("Command pipe write timeout for %s", command.strip())
+ await self.commands_pipe.write(command.encode("utf-8"))
async def send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None:
"""Send metadata to player."""
from music_assistant_models.errors import PlayerCommandFailed
from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
-from music_assistant.helpers.process import AsyncProcess, check_output
+from music_assistant.helpers.process import AsyncProcess
from music_assistant.providers.airplay.constants import (
CONF_ALAC_ENCODE,
CONF_AP_CREDENTIALS,
and not self._cli_proc.closed
)
- async def get_ntp(self) -> int:
- """Get current NTP timestamp from the CLI binary."""
- # this can probably be removed now that we already get the ntp
- # in python (within the stream session start)
- cli_binary = await get_cli_binary(self.player.protocol)
- # TODO: we can potentially also just generate this ourselves?
- self.prov.logger.debug("Getting NTP timestamp from %s CLI binary", self.player.protocol)
- _, stdout = await check_output(cli_binary, "-ntp")
- self.prov.logger.debug(f"Output from ntp check: {stdout.decode().strip()}")
- return int(stdout.strip())
-
async def start(self, start_ntp: int, skip: int = 0) -> None:
"""Initialize CLIRaop process for a player."""
assert self.player.discovery_info is not None # for type checker
cli_binary = await get_cli_binary(self.player.protocol)
-
extra_args: list[str] = []
player_id = self.player.player_id
extra_args += ["-if", self.mass.streams.bind_ip]
# we use this intermediate binary to do the actual streaming because attempts to do
# so using pure python (e.g. pyatv) were not successful due to the realtime nature
- # Create named pipes before starting CLI process
- await self._create_pipes()
-
cliraop_args = [
cli_binary,
"-ntpstart",
"-activeremote",
self.active_remote_id,
"-cmdpipe",
- self.commands_named_pipe,
+ self.commands_pipe.path,
"-udn",
self.player.discovery_info.name,
self.player.address,
- self.audio_named_pipe,
+ self.audio_pipe.path,
]
self.player.logger.debug(
"Starting cliraop process for player %s with args: %s",
self.player.player_id,
cliraop_args,
)
- self._cli_proc = AsyncProcess(cliraop_args, stdin=False, stderr=True, name="cliraop")
+ self._cli_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop")
await self._cli_proc.start()
+
# read up to first 50 lines of stderr to get the initial status
for _ in range(50):
line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore")
if "connected to " in line:
self.player.logger.info("AirPlay device connected. Starting playback.")
self._started.set()
- # Open pipes now that cliraop is ready
- await self._open_pipes()
break
if "Cannot connect to AirPlay device" in line:
raise PlayerCommandFailed("Cannot connect to AirPlay device")
- # repeat sending the volume level to the player because some players seem
- # to ignore it the first time
- # https://github.com/music-assistant/support/issues/3330
- await self.send_cli_command(f"VOLUME={self.player.volume_level}\n")
+
# start reading the stderr of the cliraop process from another task
self._stderr_reader_task = self.mass.create_task(self._stderr_reader())
assert self.player.discovery_info is not None # for type checker
cli_binary = await get_cli_binary(self.player.protocol)
- # Create named pipes before starting CLI process
- await self._create_pipes()
-
cliraop_args = [
cli_binary,
"-pair",
"-udn",
self.player.discovery_info.name,
self.player.address,
- self.audio_named_pipe,
]
self.player.logger.debug(
"Starting PAIRING with cliraop process for player %s with args: %s",
async for line in self._cli_proc.iter_stderr():
if "elapsed milliseconds:" in line:
# this is received more or less every second while playing
- # millis = int(line.split("elapsed milliseconds: ")[1])
- # self.player.elapsed_time = (millis / 1000) - self.elapsed_time_correction
- # self.player.elapsed_time_last_updated = time.time()
- logger.log(VERBOSE_LOG_LEVEL, line)
- continue
+ millis = int(line.split("elapsed milliseconds: ")[1])
+ # note that this represents the total elapsed time of the streaming session
+ elapsed_time = millis / 1000
+ player.set_state_from_stream(elapsed_time=elapsed_time)
if "set pause" in line or "Pause at" in line:
player.set_state_from_stream(state=PlaybackState.PAUSED)
if "Restarted at" in line or "restarting w/ pause" in line:
from contextlib import suppress
from typing import TYPE_CHECKING
+from music_assistant_models.enums import PlaybackState
+
from music_assistant.helpers.audio import get_player_filter_params
from music_assistant.helpers.ffmpeg import FFMpeg
from music_assistant.helpers.util import TaskManager, close_async_generator
self.wait_start = wait_start_seconds # in seconds
self.start_time = cur_time + wait_start_seconds
self.start_ntp = unix_time_to_ntp(self.start_time)
-
self.prov.logger.info(
"Starting stream session with %d clients",
len(self.sync_clients),
# Link stream session to player stream
airplay_player.stream.session = self
-
- await self._start_client_ffmpeg(airplay_player)
-
+ # create the named pipes
+ await airplay_player.stream.audio_pipe.create()
+ await airplay_player.stream.commands_pipe.create()
# start the stream
await airplay_player.stream.start(self.start_ntp)
+ # start the (player-specific) ffmpeg process
+ # note that ffmpeg will open the named pipe for writing
+ await self._start_client_ffmpeg(airplay_player)
+ # open the command pipe for writing
+ await airplay_player.stream.commands_pipe.open()
+ # repeat sending the volume level to the player because some players seem
+ # to ignore it the first time
+ # https://github.com/music-assistant/support/issues/3330
+ await airplay_player.stream.send_cli_command(f"VOLUME={airplay_player.volume_level}\n")
async with TaskManager(self.mass) as tm:
for _airplay_player in self.sync_clients:
assert airplay_player.stream.session == self
async with self._lock:
self.sync_clients.remove(airplay_player)
+ await airplay_player.stream.stop()
if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
await ffmpeg.close()
del ffmpeg
- await airplay_player.stream.stop()
airplay_player.stream = None
# If this was the last client, stop the session
if not self.sync_clients:
return
allow_late_join = self.prov.config.get_value(CONF_ENABLE_LATE_JOIN, False)
+ allow_late_join = False # TODO: disable late join for now until we can test it properly
if not allow_late_join:
# Late joining is not allowed - restart the session for all players
await self.stop() # we need to stop the current session to add a new client
# Set new audio source and restart the stream
self._audio_source = audio_source
self._audio_source_task = asyncio.create_task(self._audio_streamer())
-
self.last_stream_started = time.time()
for sync_client in self.sync_clients:
sync_client.set_state_from_stream(state=None, elapsed_time=0)
- async def _audio_streamer(self) -> None:
+ async def _audio_streamer(self) -> None: # noqa: PLR0915
"""Stream audio to all players."""
generator_exhausted = False
_last_metadata: str | None = None
+ chunk_size = self.pcm_format.pcm_sample_size
try:
# each chunk is exactly one second of audio data based on the pcm format.
async for chunk in self._audio_source:
+ if len(chunk) != chunk_size:
+ self.prov.logger.warning(
+ "Audio source yielded chunk of unexpected size %d (expected %d), "
+ "this may lead to desync issues",
+ len(chunk),
+ chunk_size,
+ )
async with self._lock:
sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
if not sync_clients:
- self.prov.logger.error(
- "!!! AUDIO STREAMER EXITING: No running clients left! "
- "Total sync_clients: %d, Details: %s",
- len(self.sync_clients),
- [
- (x.player_id, x.stream.running if x.stream else None)
- for x in self.sync_clients
- ],
+ self.prov.logger.debug(
+ "Audio streamer exiting: No running clients left in session"
)
return
- # Write to all players with a timeout (10 seconds)
- # Timeout must account for player's internal latency buffer (1-4 seconds)
- # The player may legitimately not accept data while draining its buffer
- write_start = time.time()
+ # Write chunk to all players
write_tasks = [
- asyncio.wait_for(self._write_chunk_to_player(x, chunk), timeout=10.0)
- for x in sync_clients
- if x.stream
+ self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream
]
results = await asyncio.gather(*write_tasks, return_exceptions=True)
- write_elapsed = time.time() - write_start
# Check for write errors or timeouts
- players_to_remove = []
+ players_to_remove: list[AirPlayPlayer] = []
for i, result in enumerate(results):
if i >= len(sync_clients):
continue
if isinstance(result, asyncio.TimeoutError):
self.prov.logger.error(
- "!!! TIMEOUT writing chunk %d to player %s - "
- "REMOVING from sync group! Total write time=%.3fs",
+ "TIMEOUT writing chunk %d to player %s - REMOVING from sync group!",
self.chunks_streamed,
player.player_id,
- write_elapsed,
)
players_to_remove.append(player)
elif isinstance(result, Exception):
self.prov.logger.error(
- "!!! Error writing chunk %d to player %s: %s - "
- "REMOVING from sync group! Total write time=%.3fs",
+ "Error writing chunk %d to player %s: %s - REMOVING from sync group!",
self.chunks_streamed,
player.player_id,
result,
- write_elapsed,
)
players_to_remove.append(player)
chunk_number = self.chunks_streamed + 1
player_id = airplay_player.player_id
- # if the player has an associated FFMpeg instance, use that first
+ # don't write a chunk if we're paused
+ while airplay_player.playback_state == PlaybackState.PAUSED:
+ await asyncio.sleep(0.1)
+
+ # we write the chunk to the player's ffmpeg process which
+ # applies any player-specific filters (e.g. volume, dsp, etc)
+ # and outputs in the correct format for the player stream
+ # to the named pipe associated with the player's stream
if ffmpeg := self._player_ffmpeg.get(player_id):
await ffmpeg.write(chunk)
- chunk_to_send = await ffmpeg.read(len(chunk))
- else:
- chunk_to_send = chunk
- assert airplay_player.stream
stream_write_start = time.time()
- await airplay_player.stream.write_chunk(chunk_to_send)
stream_write_elapsed = time.time() - stream_write_start
-
total_elapsed = time.time() - write_start
-
# Log only truly abnormal writes (>5s indicates a real stall)
# Can take up to ~4s if player's latency buffer is being drained
if total_elapsed > 5.0:
if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
await ffmpeg.write_eof()
await ffmpeg.close()
- assert airplay_player.stream
- await airplay_player.stream.write_eof()
async def _send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None:
"""Send metadata to all players."""
await ffmpeg.close()
del ffmpeg
assert airplay_player.stream # for type checker
- # Create optional FFMpeg instance per player if needed
+ # Create the FFMpeg instance per player which accepts our PCM audio
+ # applies any player-specific filters (e.g. volume, dsp, etc)
+ # and outputs in the correct format for the player stream
+ # to the named pipe associated with the player's stream
filter_params = get_player_filter_params(
self.mass,
airplay_player.player_id,
self.pcm_format,
airplay_player.stream.pcm_format,
)
- if filter_params or self.pcm_format != airplay_player.stream.pcm_format:
- ffmpeg = FFMpeg(
- audio_input="-",
- input_format=self.pcm_format,
- output_format=airplay_player.stream.pcm_format,
- filter_params=filter_params,
- )
- await ffmpeg.start()
- self._player_ffmpeg[airplay_player.player_id] = ffmpeg
+ ffmpeg = FFMpeg(
+ audio_input="-",
+ input_format=self.pcm_format,
+ output_format=airplay_player.stream.pcm_format,
+ filter_params=filter_params,
+ audio_output=airplay_player.stream.audio_pipe.path,
+ extra_input_args=["-y"],
+ )
+ await ffmpeg.start()
+ self._player_ffmpeg[airplay_player.player_id] = ffmpeg