Streaming fixes for AirPlay provider
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 1 Nov 2025 13:43:08 +0000 (14:43 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 1 Nov 2025 13:43:08 +0000 (14:43 +0100)
music_assistant/controllers/streams.py
music_assistant/helpers/named_pipe.py [new file with mode: 0644]
music_assistant/helpers/namedpipe.py [deleted file]
music_assistant/providers/airplay/bin/cliraop-linux-aarch64
music_assistant/providers/airplay/bin/cliraop-linux-x86_64
music_assistant/providers/airplay/bin/cliraop-macos-arm64
music_assistant/providers/airplay/player.py
music_assistant/providers/airplay/protocols/_protocol.py
music_assistant/providers/airplay/protocols/raop.py
music_assistant/providers/airplay/stream_session.py

index 67ecff09ba19316fa509eae344a6f7fa85495208..fc3d81db3254783278c7fb141587b2cf9e566a58 100644 (file)
@@ -15,7 +15,7 @@ import os
 import urllib.parse
 from collections.abc import AsyncGenerator
 from dataclasses import dataclass
-from typing import TYPE_CHECKING, Final
+from typing import TYPE_CHECKING, Final, cast
 
 from aiofiles.os import wrap
 from aiohttp import web
@@ -66,7 +66,7 @@ from music_assistant.helpers.audio import (
     get_stream_details,
     resample_pcm_audio,
 )
-from music_assistant.helpers.buffered_generator import use_audio_buffer
+from music_assistant.helpers.buffered_generator import buffered_audio, use_audio_buffer
 from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
 from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
 from music_assistant.helpers.smart_fades import (
@@ -79,9 +79,12 @@ from music_assistant.helpers.webserver import Webserver
 from music_assistant.models.core_controller import CoreController
 from music_assistant.models.music_provider import MusicProvider
 from music_assistant.models.plugin import PluginProvider, PluginSource
+from music_assistant.providers.universal_group.constants import UGP_PREFIX
+from music_assistant.providers.universal_group.player import UniversalGroupPlayer
 
 if TYPE_CHECKING:
     from music_assistant_models.config_entries import CoreConfig
+    from music_assistant_models.player import PlayerMedia
     from music_assistant_models.player_queue import PlayerQueue
     from music_assistant_models.queue_item import QueueItem
     from music_assistant_models.streamdetails import StreamDetails
@@ -490,7 +493,7 @@ class StreamsController(CoreController):
         if queue_item.media_type == MediaType.RADIO:
             # keep very short buffer for radio streams
             # to keep them (more or less) realtime and prevent time outs
-            read_rate_input_args = ["-readrate", "1.0", "-readrate_initial_burst", "3"]
+            read_rate_input_args = ["-readrate", "1.00", "-readrate_initial_burst", "1"]
         elif "Network_Module" in user_agent or "transferMode.dlna.org" in request.headers:
             # and ofcourse we have an exception of the exception. Where most players actually NEED
             # the readrate filter to avoid disconnecting, some other players (DLNA/MusicCast)
@@ -828,6 +831,82 @@ class StreamsController(CoreController):
         # like https hosts and it also offers the pre-announce 'bell'
         return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
 
+    def get_stream(
+        self, media: PlayerMedia, pcm_format: AudioFormat
+    ) -> AsyncGenerator[bytes, None]:
+        """
+        Get a stream of the given media as raw PCM audio.
+
+        This is used as helper for player providers that can consume the raw PCM
+        audio stream directly (e.g. AirPlay) and not rely on HTTP transport.
+        """
+        # select audio source
+        if media.media_type == MediaType.ANNOUNCEMENT:
+            # special case: stream announcement
+            assert media.custom_data
+            audio_source = self.get_announcement_stream(
+                media.custom_data["announcement_url"],
+                output_format=pcm_format,
+                pre_announce=media.custom_data["pre_announce"],
+                pre_announce_url=media.custom_data["pre_announce_url"],
+            )
+        elif media.media_type == MediaType.PLUGIN_SOURCE:
+            # special case: plugin source stream
+            assert media.custom_data
+            audio_source = self.get_plugin_source_stream(
+                plugin_source_id=media.custom_data["source_id"],
+                output_format=pcm_format,
+                # need to pass player_id from the PlayerMedia object
+                # because this could have been a group
+                player_id=media.custom_data["player_id"],
+                chunk_size=get_chunksize(pcm_format, 1),  # ensure 1 second chunks
+            )
+        elif media.source_id and media.source_id.startswith(UGP_PREFIX):
+            # special case: UGP stream
+            ugp_player = cast("UniversalGroupPlayer", self.mass.players.get(media.source_id))
+            ugp_stream = ugp_player.stream
+            assert ugp_stream is not None  # for type checker
+            if ugp_stream.base_pcm_format == pcm_format:
+                # no conversion needed
+                audio_source = ugp_stream.subscribe_raw()
+            else:
+                audio_source = ugp_stream.get_stream(output_format=pcm_format)
+        elif media.source_id and media.queue_item_id and media.media_type == MediaType.FLOW_STREAM:
+            # regular queue (flow) stream request
+            queue = self.mass.player_queues.get(media.source_id)
+            assert queue
+            start_queue_item = self.mass.player_queues.get_item(
+                media.source_id, media.queue_item_id
+            )
+            assert start_queue_item
+            audio_source = self.mass.streams.get_queue_flow_stream(
+                queue=queue,
+                start_queue_item=start_queue_item,
+                pcm_format=pcm_format,
+            )
+        elif media.source_id and media.queue_item_id:
+            # single item stream (e.g. radio)
+            queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
+            assert queue_item
+            audio_source = buffered_audio(
+                self.get_queue_item_stream(
+                    queue_item=queue_item,
+                    pcm_format=pcm_format,
+                ),
+                pcm_format=pcm_format,
+                buffer_size=10,
+                min_buffer_before_yield=2,
+            )
+        else:
+            # assume url or some other direct path
+            # NOTE: this will fail if its an uri not playable by ffmpeg
+            audio_source = get_ffmpeg_stream(
+                audio_input=media.uri,
+                input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
+                output_format=pcm_format,
+            )
+        return audio_source
+
     @use_audio_buffer(buffer_size=30, min_buffer_before_yield=4)
     async def get_queue_flow_stream(
         self,
@@ -1077,6 +1156,7 @@ class StreamsController(CoreController):
                 input_format=AudioFormat(content_type=ContentType.try_parse(pre_announce_url)),
                 output_format=output_format,
                 filter_params=filter_params,
+                chunk_size=get_chunksize(output_format, 1),
             ):
                 yield chunk
 
@@ -1088,6 +1168,7 @@ class StreamsController(CoreController):
             input_format=audio_format,
             output_format=output_format,
             filter_params=filter_params,
+            chunk_size=get_chunksize(output_format, 1),
         ):
             yield chunk
 
@@ -1097,6 +1178,7 @@ class StreamsController(CoreController):
         output_format: AudioFormat,
         player_id: str,
         player_filter_params: list[str] | None = None,
+        chunk_size: int | None = None,
     ) -> AsyncGenerator[bytes, None]:
         """Get the special plugin source stream."""
         plugin_prov: PluginProvider = self.mass.get_provider(plugin_source_id)
@@ -1124,6 +1206,7 @@ class StreamsController(CoreController):
                 output_format=output_format,
                 filter_params=player_filter_params,
                 extra_input_args=["-y", "-re"],
+                chunk_size=chunk_size,
             ):
                 if plugin_source.in_use_by != player_id:
                     self.logger.info(
@@ -1274,7 +1357,7 @@ class StreamsController(CoreController):
                     assert isinstance(music_prov, MusicProvider)
                 self.mass.create_task(music_prov.on_streamed(streamdetails))
 
-    @use_audio_buffer(buffer_size=30, min_buffer_before_yield=4)
+    @use_audio_buffer(buffer_size=30, min_buffer_before_yield=2)
     async def get_queue_item_stream_with_smartfade(
         self,
         queue_item: QueueItem,
diff --git a/music_assistant/helpers/named_pipe.py b/music_assistant/helpers/named_pipe.py
new file mode 100644 (file)
index 0000000..be0b394
--- /dev/null
@@ -0,0 +1,144 @@
+"""Simple async-friendly named pipe writer using threads."""
+
+from __future__ import annotations
+
+import asyncio
+import os
+import time
+from contextlib import suppress
+from types import TracebackType
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from logging import Logger
+
+
+class AsyncNamedPipeWriter:
+    """Simple async writer for named pipes using thread pool for blocking I/O."""
+
+    def __init__(self, pipe_path: str, logger: Logger | None = None) -> None:
+        """Initialize named pipe writer.
+
+        Args:
+            pipe_path: Path to the named pipe
+            logger: Optional logger for debug/error messages
+        """
+        self._pipe_path = pipe_path
+        self.logger = logger
+        self._fd: int | None = None
+
+    @property
+    def path(self) -> str:
+        """Return the named pipe path."""
+        return self._pipe_path
+
+    @property
+    def is_open(self) -> bool:
+        """Return True if the pipe is currently open."""
+        return self._fd is not None
+
+    async def create(self) -> None:
+        """Create the named pipe (if it does not exist)."""
+
+        def _create() -> None:
+            with suppress(FileExistsError):
+                os.mkfifo(self._pipe_path, 0o600)
+
+        await asyncio.to_thread(_create)
+
+    async def open(self) -> None:
+        """Open the named pipe for writing (blocking operation runs in thread)."""
+        if self._fd is not None:
+            return  # Already open
+
+        def _open() -> int:
+            # Open pipe in BLOCKING write mode (simple approach)
+            # - open() blocks until the reader (cliraop) connects
+            # - write() blocks if pipe buffer is full
+            # Both operations run in thread pool via asyncio.to_thread(),
+            # so they won't block the event loop
+            return os.open(self._pipe_path, os.O_WRONLY)
+
+        self._fd = await asyncio.to_thread(_open)
+
+        if self.logger:
+            self.logger.debug("Pipe %s opened for writing", self._pipe_path)
+
+    async def write(self, data: bytes, log_slow_writes: bool = True) -> None:
+        """Write data to the named pipe (blocking operation runs in thread).
+
+        Args:
+            data: Data to write to the pipe
+            log_slow_writes: Whether to log slow writes (>5s)
+
+        Raises:
+            RuntimeError: If pipe is not open
+        """
+        if self._fd is None:
+            raise RuntimeError(f"Pipe {self._pipe_path} is not open")
+
+        start_time = time.time()
+
+        def _write() -> None:
+            assert self._fd is not None
+            # Write all data (may block if pipe buffer is full)
+            bytes_written = 0
+            while bytes_written < len(data):
+                n = os.write(self._fd, data[bytes_written:])
+                bytes_written += n
+
+        # Run blocking write in thread pool
+        await asyncio.to_thread(_write)
+
+        if log_slow_writes:
+            elapsed = time.time() - start_time
+            # Only log if it took more than 5 seconds (real stall)
+            if elapsed > 5.0 and self.logger:
+                self.logger.error(
+                    "!!! STALLED PIPE WRITE: Took %.3fs to write %d bytes to %s",
+                    elapsed,
+                    len(data),
+                    self._pipe_path,
+                )
+
+    async def close(self) -> None:
+        """Close the named pipe."""
+        if self._fd is None:
+            return
+
+        fd = self._fd
+        self._fd = None
+
+        def _close() -> None:
+            with suppress(Exception):
+                os.close(fd)
+
+        await asyncio.to_thread(_close)
+
+    async def remove(self) -> None:
+        """Remove the named pipe."""
+        await self.close()
+
+        def _remove() -> None:
+            with suppress(Exception):
+                os.remove(self._pipe_path)
+
+        await asyncio.to_thread(_remove)
+
+    async def __aenter__(self) -> AsyncNamedPipeWriter:
+        """Context manager entry."""
+        await self.open()
+        return self
+
+    async def __aexit__(
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> None:
+        """Context manager exit."""
+        await self.close()
+
+    def __str__(self) -> str:
+        """Return string representation."""
+        return self._pipe_path
diff --git a/music_assistant/helpers/namedpipe.py b/music_assistant/helpers/namedpipe.py
deleted file mode 100644 (file)
index 58ee4aa..0000000
+++ /dev/null
@@ -1,189 +0,0 @@
-"""Helper for async-friendly named pipe operations."""
-
-from __future__ import annotations
-
-import asyncio
-import fcntl
-import os
-from contextlib import suppress
-from types import TracebackType
-from typing import TYPE_CHECKING
-
-if TYPE_CHECKING:
-    from logging import Logger
-
-
-class AsyncNamedPipeWriter:
-    """Async writer for named pipes (FIFOs) with non-blocking I/O.
-
-    Handles opening named pipes in non-blocking mode and writing data
-    asynchronously using the event loop without consuming thread pool resources.
-    Automatically uses optimal write chunk size based on actual pipe buffer size.
-    """
-
-    # Default pipe buffer sizes (platform-specific)
-    DEFAULT_PIPE_SIZE_MACOS = 16384  # macOS default: 16KB
-    DEFAULT_PIPE_SIZE_LINUX = 65536  # Linux default: 64KB
-
-    # Target pipe buffer size (512KB for ~1+ seconds of 44.1kHz stereo audio buffering)
-    TARGET_PIPE_SIZE = 524288
-
-    def __init__(self, pipe_path: str, logger: Logger | None = None) -> None:
-        """Initialize async named pipe writer.
-
-        Args:
-            pipe_path: Path to the named pipe
-            logger: Optional logger for debug/error messages
-        """
-        self.pipe_path = pipe_path
-        self.logger = logger
-        self._fd: int | None = None
-        self._pipe_buffer_size: int = self.DEFAULT_PIPE_SIZE_MACOS
-        self._write_chunk_size: int = 8192  # Conservative default (8KB)
-
-    @property
-    def is_open(self) -> bool:
-        """Return True if the pipe is currently open."""
-        return self._fd is not None
-
-    async def open(self, increase_buffer: bool = True) -> None:
-        """Open the named pipe in non-blocking mode for writing.
-
-        Args:
-            increase_buffer: Whether to attempt increasing the pipe buffer size
-        """
-        if self._fd is not None:
-            return  # Already open
-
-        def _open() -> tuple[int, int]:
-            # Open pipe in non-blocking binary write mode
-            fd = os.open(self.pipe_path, os.O_WRONLY | os.O_NONBLOCK)
-
-            actual_size = self._pipe_buffer_size  # Default
-
-            if increase_buffer:
-                try:
-                    # macOS/Linux: query current pipe buffer size
-                    if hasattr(fcntl, "F_GETPIPE_SZ"):
-                        current_size = fcntl.fcntl(fd, fcntl.F_GETPIPE_SZ)
-                        actual_size = current_size
-                        if self.logger:
-                            self.logger.debug(
-                                "Pipe %s buffer size: %d bytes", self.pipe_path, current_size
-                            )
-
-                        # Linux only: try to set larger size if F_SETPIPE_SZ exists
-                        if hasattr(fcntl, "F_SETPIPE_SZ"):
-                            try:
-                                fcntl.fcntl(fd, fcntl.F_SETPIPE_SZ, self.TARGET_PIPE_SIZE)
-                                # Verify the new size
-                                actual_size = fcntl.fcntl(fd, fcntl.F_GETPIPE_SZ)
-                                if self.logger:
-                                    self.logger.info(
-                                        "Pipe %s buffer increased to %d bytes",
-                                        self.pipe_path,
-                                        actual_size,
-                                    )
-                            except OSError as e:
-                                if self.logger:
-                                    self.logger.debug("Could not increase pipe buffer size: %s", e)
-                except (OSError, AttributeError) as e:
-                    if self.logger:
-                        self.logger.debug("Cannot query/adjust pipe buffer size: %s", e)
-
-            return fd, actual_size
-
-        self._fd, self._pipe_buffer_size = await asyncio.to_thread(_open)
-
-        # Set write chunk size based on actual pipe buffer size
-        # Use 1/4 of pipe buffer to avoid filling it completely in one write
-        # This allows better flow control and prevents blocking
-        self._write_chunk_size = max(8192, self._pipe_buffer_size // 4)
-
-        if self.logger:
-            self.logger.debug(
-                "Pipe %s opened: buffer=%d bytes, write_chunk_size=%d bytes",
-                self.pipe_path,
-                self._pipe_buffer_size,
-                self._write_chunk_size,
-            )
-
-    async def write(self, data: bytes, timeout_per_wait: float = 0.1) -> None:
-        """Write data to the named pipe asynchronously.
-
-        Writes data in chunks sized according to the pipe's buffer capacity.
-        Uses non-blocking I/O with event loop (no thread pool consumption).
-
-        Args:
-            data: Data to write to the pipe
-            timeout_per_wait: Timeout for each wait iteration (default: 100ms)
-
-        Raises:
-            RuntimeError: If pipe is not open
-            TimeoutError: If pipe write is blocked for too long (>400 waits)
-        """
-        if self._fd is None:
-            raise RuntimeError(f"Pipe {self.pipe_path} is not open")
-
-        bytes_written = 0
-        wait_count = 0
-
-        while bytes_written < len(data):
-            # Write up to write_chunk_size bytes at a time
-            to_write = min(self._write_chunk_size, len(data) - bytes_written)
-            try:
-                # Try non-blocking write
-                n = os.write(self._fd, data[bytes_written : bytes_written + to_write])
-                bytes_written += n
-                wait_count = 0  # Reset wait counter on successful write
-            except BlockingIOError:
-                # Pipe buffer is full, wait until writable
-                wait_count += 1
-                if wait_count > 400:  # Too many waits (~40+ seconds at 100ms each)
-                    raise TimeoutError(
-                        f"Pipe write blocked after {wait_count} waits on {self.pipe_path}"
-                    )
-
-                loop = asyncio.get_event_loop()
-                future: asyncio.Future[None] = loop.create_future()
-                assert self._fd is not None  # Already checked at method entry
-                fd = self._fd  # Capture fd for closure
-
-                def on_writable(
-                    _loop: asyncio.AbstractEventLoop = loop,
-                    _future: asyncio.Future[None] = future,
-                    _fd: int = fd,
-                ) -> None:
-                    _loop.remove_writer(_fd)
-                    if not _future.done():
-                        _future.set_result(None)
-
-                loop.add_writer(fd, on_writable)
-                try:
-                    await asyncio.wait_for(future, timeout=timeout_per_wait)
-                except TimeoutError:
-                    loop.remove_writer(fd)
-                    # Continue loop - will hit wait_count limit if truly stuck
-
-    async def close(self) -> None:
-        """Close the named pipe."""
-        if self._fd is None:
-            return
-
-        with suppress(Exception):
-            await asyncio.to_thread(os.close, self._fd)
-        self._fd = None
-
-    async def __aenter__(self) -> AsyncNamedPipeWriter:
-        """Context manager entry."""
-        await self.open()
-        return self
-
-    async def __aexit__(
-        self,
-        exc_type: type[BaseException] | None,
-        exc_val: BaseException | None,
-        exc_tb: TracebackType | None,
-    ) -> None:
-        """Context manager exit."""
-        await self.close()
index 8bdd418af586a67566b16c7eb69b66d0905587f8..17f5611b93861101b3251342c1a40a3db613f036 100755 (executable)
Binary files a/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 and b/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 differ
index 866981806e9303afb024fc4a093e5f3901e3973a..7afe5bf22311332f05599ae6964863d794901a06 100755 (executable)
Binary files a/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 and b/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 differ
index 43d836f56293144ef7c16238e07abdbb7ab83821..4279b317cc47f182286282d9eb05d8fc352b396f 100755 (executable)
Binary files a/music_assistant/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/providers/airplay/bin/cliraop-macos-arm64 differ
index 60882ce2c400cfd9e1c47b0ce6568abd5757fe4f..924a8f52275f069c1dbf4ee946702c5a63483d81 100644 (file)
@@ -7,15 +7,7 @@ import time
 from typing import TYPE_CHECKING, cast
 
 from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
-from music_assistant_models.enums import (
-    ConfigEntryType,
-    ContentType,
-    MediaType,
-    PlaybackState,
-    PlayerFeature,
-    PlayerType,
-)
-from music_assistant_models.media_items import AudioFormat
+from music_assistant_models.enums import ConfigEntryType, PlaybackState, PlayerFeature, PlayerType
 from propcache import under_cached_property as cached_property
 
 from music_assistant.constants import (
@@ -27,13 +19,10 @@ from music_assistant.constants import (
     CONF_ENTRY_SYNC_ADJUST,
     create_sample_rates_config_entry,
 )
-from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
 from music_assistant.models.player import DeviceInfo, Player, PlayerMedia
-from music_assistant.providers.universal_group.constants import UGP_PREFIX
 
 from .constants import (
     AIRPLAY_DISCOVERY_TYPE,
-    AIRPLAY_FLOW_PCM_FORMAT,
     AIRPLAY_PCM_FORMAT,
     CACHE_CATEGORY_PREV_VOLUME,
     CONF_ACTION_FINISH_PAIRING,
@@ -56,8 +45,6 @@ from .stream_session import AirPlayStreamSession
 if TYPE_CHECKING:
     from zeroconf.asyncio import AsyncServiceInfo
 
-    from music_assistant.providers.universal_group import UniversalGroupPlayer
-
     from .protocols.airplay2 import AirPlay2Stream
     from .protocols.raop import RaopStream
     from .provider import AirPlayProvider
@@ -132,6 +119,13 @@ class AirPlayPlayer(Player):
                 return False
         return super().available
 
+    @property
+    def corrected_elapsed_time(self) -> float:
+        """Return the corrected elapsed time accounting for stream session restarts."""
+        if not self.stream or not self.stream.session:
+            return super().corrected_elapsed_time or 0.0
+        return time.time() - self.stream.session.last_stream_started
+
     async def get_config_entries(
         self,
         action: str | None = None,
@@ -439,57 +433,7 @@ class AirPlayPlayer(Player):
         self._attr_current_media = media
 
         # select audio source
-        if media.media_type == MediaType.ANNOUNCEMENT:
-            # special case: stream announcement
-            assert media.custom_data
-            pcm_format = AIRPLAY_PCM_FORMAT
-            audio_source = self.mass.streams.get_announcement_stream(
-                media.custom_data["announcement_url"],
-                output_format=AIRPLAY_PCM_FORMAT,
-                pre_announce=media.custom_data["pre_announce"],
-                pre_announce_url=media.custom_data["pre_announce_url"],
-            )
-        elif media.media_type == MediaType.PLUGIN_SOURCE:
-            # special case: plugin source stream
-            pcm_format = AIRPLAY_PCM_FORMAT
-            assert media.custom_data
-            audio_source = self.mass.streams.get_plugin_source_stream(
-                plugin_source_id=media.custom_data["source_id"],
-                output_format=AIRPLAY_PCM_FORMAT,
-                # need to pass player_id from the PlayerMedia object
-                # because this could have been a group
-                player_id=media.custom_data["player_id"],
-            )
-        elif media.source_id and media.source_id.startswith(UGP_PREFIX):
-            # special case: UGP stream
-            ugp_player = cast("UniversalGroupPlayer", self.mass.players.get(media.source_id))
-            ugp_stream = ugp_player.stream
-            assert ugp_stream is not None  # for type checker
-            pcm_format = ugp_stream.base_pcm_format
-            audio_source = ugp_stream.subscribe_raw()
-        elif media.source_id and media.queue_item_id:
-            # regular queue (flow) stream request
-            pcm_format = AIRPLAY_FLOW_PCM_FORMAT
-            queue = self.mass.player_queues.get(media.source_id)
-            assert queue
-            start_queue_item = self.mass.player_queues.get_item(
-                media.source_id, media.queue_item_id
-            )
-            assert start_queue_item
-            audio_source = self.mass.streams.get_queue_flow_stream(
-                queue=queue,
-                start_queue_item=start_queue_item,
-                pcm_format=pcm_format,
-            )
-        else:
-            # assume url or some other direct path
-            # NOTE: this will fail if its an uri not playable by ffmpeg
-            pcm_format = AIRPLAY_PCM_FORMAT
-            audio_source = get_ffmpeg_stream(
-                audio_input=media.uri,
-                input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
-                output_format=AIRPLAY_PCM_FORMAT,
-            )
+        audio_source = self.mass.streams.get_stream(media, AIRPLAY_PCM_FORMAT)
 
         # if an existing stream session is running, we could replace it with the new stream
         if self.stream and self.stream.running:
@@ -504,7 +448,9 @@ class AirPlayPlayer(Player):
         # setup StreamSession for player (and its sync childs if any)
         sync_clients = self._get_sync_clients()
         provider = cast("AirPlayProvider", self.provider)
-        stream_session = AirPlayStreamSession(provider, sync_clients, pcm_format, audio_source)
+        stream_session = AirPlayStreamSession(
+            provider, sync_clients, AIRPLAY_PCM_FORMAT, audio_source
+        )
         await stream_session.start()
 
     async def volume_set(self, volume_level: int) -> None:
@@ -534,14 +480,18 @@ class AirPlayPlayer(Player):
             # nothing to do
             return
 
-        stream_session = self.stream.session if self.stream else None
+        stream_session = (
+            self.stream.session
+            if self.stream and self.stream.running and self.stream.session
+            else None
+        )
         # handle removals first
         if player_ids_to_remove:
             if self.player_id in player_ids_to_remove:
                 # dissolve the entire sync group
-                if self.stream and self.stream.running and self.stream.session:
+                if stream_session:
                     # stop the stream session if it is running
-                    await self.stream.session.stop()
+                    await stream_session.stop()
                 self._attr_group_members = []
                 self.update_state()
                 return
index 5ba420ef67ff4b3f3ce775516aa7629979e80681..456363519a73b17c5e84932bc1fa2ed7010ac197 100644 (file)
@@ -3,10 +3,8 @@
 from __future__ import annotations
 
 import asyncio
-import os
 import time
 from abc import ABC, abstractmethod
-from contextlib import suppress
 from random import randint
 from typing import TYPE_CHECKING
 
@@ -14,7 +12,7 @@ from music_assistant_models.enums import ContentType, PlaybackState
 from music_assistant_models.media_items import AudioFormat
 
 from music_assistant.constants import VERBOSE_LOG_LEVEL
-from music_assistant.helpers.namedpipe import AsyncNamedPipeWriter
+from music_assistant.helpers.named_pipe import AsyncNamedPipeWriter
 
 if TYPE_CHECKING:
     from music_assistant_models.player import PlayerMedia
@@ -31,6 +29,7 @@ class AirPlayProtocol(ABC):
     with abstract methods for protocol-specific behavior.
     """
 
+    _cli_proc: AsyncProcess | None  # reference to the (protocol-specific) CLI process
     session: AirPlayStreamSession | None = None  # reference to the active stream session (if any)
 
     # the pcm audio format used for streaming to this protocol
@@ -52,24 +51,24 @@ class AirPlayProtocol(ABC):
         self.prov = player.provider
         self.mass = player.provider.mass
         self.player = player
+        self.logger = player.provider.logger.getChild(f"protocol.{self.__class__.__name__}")
         # Generate unique ID to prevent race conditions with named pipes
         self.active_remote_id: str = str(randint(1000, 8000))
         self.prevent_playback: bool = False
         self._cli_proc: AsyncProcess | None = None
+        self.audio_pipe = AsyncNamedPipeWriter(
+            f"/tmp/{self.player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-audio",  # noqa: S108
+            self.logger,
+        )
+        self.commands_pipe = AsyncNamedPipeWriter(
+            f"/tmp/{self.player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-cmd",  # noqa: S108
+            self.logger,
+        )
         # State tracking
         self._started = asyncio.Event()
         self._stopped = False
         self._total_bytes_sent = 0
         self._stream_bytes_sent = 0
-        self.audio_named_pipe = (
-            f"/tmp/{player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-audio"  # noqa: S108
-        )
-        self.commands_named_pipe = (
-            f"/tmp/{player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-cmd"  # noqa: S108
-        )
-        # Async named pipe writers (kept open for session duration)
-        self._audio_pipe: AsyncNamedPipeWriter | None = None
-        self._commands_pipe: AsyncNamedPipeWriter | None = None
 
     @property
     def running(self) -> bool:
@@ -81,12 +80,6 @@ class AirPlayProtocol(ABC):
             and not self._cli_proc.closed
         )
 
-    @abstractmethod
-    async def get_ntp(self) -> int:
-        """Get current NTP timestamp from the CLI binary."""
-        # this can probably be removed now that we already get the ntp
-        # in python (within the stream session start)
-
     @abstractmethod
     async def start(self, start_ntp: int, skip: int = 0) -> None:
         """Initialize streaming process for the player.
@@ -96,138 +89,45 @@ class AirPlayProtocol(ABC):
             skip: Number of seconds to skip (for late joiners)
         """
 
-    async def start_pairing(self) -> None:
-        """Start pairing process for this protocol (if supported)."""
-        raise NotImplementedError("Pairing not implemented for this protocol")
-
-    async def finish_pairing(self, pin: str) -> str:
-        """Finish pairing process with given PIN (if supported)."""
-        raise NotImplementedError("Pairing not implemented for this protocol")
-
-    async def _create_pipes(self) -> None:
-        """Create named pipes (FIFOs) before starting CLI process.
-
-        This must be called before starting the CLI binary so the FIFOs exist
-        when the CLI tries to open them for reading.
-        """
-        await asyncio.to_thread(self._create_named_pipe, self.audio_named_pipe)
-        await asyncio.to_thread(self._create_named_pipe, self.commands_named_pipe)
-        self.player.logger.debug("Named pipes created for streaming session")
-
-    def _create_named_pipe(self, pipe_path: str) -> None:
-        """Create a named pipe (FIFO) if it doesn't exist."""
-        if not os.path.exists(pipe_path):
-            os.mkfifo(pipe_path)
-
-    async def _open_pipes(self) -> None:
-        """Open both named pipes in non-blocking mode for async I/O.
-
-        This must be called AFTER the CLI process has started and opened the pipes
-        for reading. Otherwise opening with O_WRONLY | O_NONBLOCK will fail with ENXIO.
-        """
-        # Open audio pipe with buffer size optimization
-        self._audio_pipe = AsyncNamedPipeWriter(self.audio_named_pipe, logger=self.player.logger)
-        await self._audio_pipe.open(increase_buffer=True)
-
-        # Open command pipe (no need to increase buffer for small commands)
-        self._commands_pipe = AsyncNamedPipeWriter(
-            self.commands_named_pipe, logger=self.player.logger
-        )
-        await self._commands_pipe.open(increase_buffer=False)
-
-        self.player.logger.debug("Named pipes opened in non-blocking mode for streaming session")
-
     async def stop(self) -> None:
         """Stop playback and cleanup."""
         # Send stop command before setting _stopped flag
         await self.send_cli_command("ACTION=STOP")
-
         self._stopped = True
 
-        # Close named pipes (sends EOF to C side, triggering graceful shutdown)
-        if self._audio_pipe is not None:
-            await self._audio_pipe.close()
-            self._audio_pipe = None
-
-        if self._commands_pipe is not None:
-            await self._commands_pipe.close()
-            self._commands_pipe = None
-
         # Close the CLI process (wait for it to terminate)
         if self._cli_proc and not self._cli_proc.closed:
             await self._cli_proc.close(True)
 
         self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0)
 
-        # Remove named pipes from filesystem
-        with suppress(Exception):
-            await asyncio.to_thread(os.remove, self.audio_named_pipe)
-        with suppress(Exception):
-            await asyncio.to_thread(os.remove, self.commands_named_pipe)
-
-    async def write_chunk(self, chunk: bytes) -> None:
-        """
-        Write a (pcm) audio chunk to the stream.
+        # Cleanup named pipes
+        await self.audio_pipe.remove()
+        await self.commands_pipe.remove()
 
-        Writes one second worth of audio data based on the pcm format.
-        Uses non-blocking I/O with asyncio event loop (no thread pool consumption).
-        """
-        if self._audio_pipe is None or not self._audio_pipe.is_open:
-            return
+    async def start_pairing(self) -> None:
+        """Start pairing process for this protocol (if supported)."""
+        raise NotImplementedError("Pairing not implemented for this protocol")
 
-        pipe_write_start = time.time()
-
-        try:
-            await self._audio_pipe.write(chunk)
-        except TimeoutError as e:
-            # Re-raise with player context
-            raise TimeoutError(f"Player {self.player.player_id}: {e}") from e
-
-        pipe_write_elapsed = time.time() - pipe_write_start
-
-        # Log only truly abnormal pipe writes (>5s indicates a real stall)
-        # Normal writes take ~1s due to pipe rate-limiting to playback speed
-        # Can take up to ~4s if player's latency buffer is full
-        if pipe_write_elapsed > 5.0:
-            self.player.logger.error(
-                "!!! STALLED PIPE WRITE: Player %s took %.3fs to write %d bytes to pipe",
-                self.player.player_id,
-                pipe_write_elapsed,
-                len(chunk),
-            )
-
-    async def write_eof(self) -> None:
-        """Write EOF to signal end of stream."""
-        # default implementation simply closes the named pipe
-        # can be overridden with protocol specific implementation if needed
-        if self._audio_pipe is not None:
-            await self._audio_pipe.close()
-            self._audio_pipe = None
+    async def finish_pairing(self, pin: str) -> str:
+        """Finish pairing process with given PIN (if supported)."""
+        raise NotImplementedError("Pairing not implemented for this protocol")
 
     async def send_cli_command(self, command: str) -> None:
-        """Send an interactive command to the running CLI binary using non-blocking I/O."""
+        """Send an interactive command to the running CLI binary."""
         if self._stopped or not self._cli_proc or self._cli_proc.closed:
             return
-        if self._commands_pipe is None or not self._commands_pipe.is_open:
+        if not self.commands_pipe:
             return
 
-        await self._started.wait()
+        if not self.commands_pipe.is_open:
+            await self.commands_pipe.open()
 
-        if not command.endswith("\n"):
-            command += "\n"
+        await self._started.wait()
 
         self.player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
         self.player.last_command_sent = time.time()
-
-        # Write command to pipe
-        data = command.encode("utf-8")
-
-        with suppress(BrokenPipeError):
-            try:
-                # Use shorter timeout for commands (1 second per wait iteration)
-                await self._commands_pipe.write(data, timeout_per_wait=1.0)
-            except TimeoutError:
-                self.player.logger.warning("Command pipe write timeout for %s", command.strip())
+        await self.commands_pipe.write(command.encode("utf-8"))
 
     async def send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None:
         """Send metadata to player."""
index 1df585820bb6005f8ccfe43e01a6c9a1dda74191..968d5869a75b823a50f4dde6dbcf94e43c7bd291 100644 (file)
@@ -10,7 +10,7 @@ from music_assistant_models.enums import PlaybackState
 from music_assistant_models.errors import PlayerCommandFailed
 
 from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
-from music_assistant.helpers.process import AsyncProcess, check_output
+from music_assistant.helpers.process import AsyncProcess
 from music_assistant.providers.airplay.constants import (
     CONF_ALAC_ENCODE,
     CONF_AP_CREDENTIALS,
@@ -49,22 +49,10 @@ class RaopStream(AirPlayProtocol):
             and not self._cli_proc.closed
         )
 
-    async def get_ntp(self) -> int:
-        """Get current NTP timestamp from the CLI binary."""
-        # this can probably be removed now that we already get the ntp
-        # in python (within the stream session start)
-        cli_binary = await get_cli_binary(self.player.protocol)
-        # TODO: we can potentially also just generate this ourselves?
-        self.prov.logger.debug("Getting NTP timestamp from %s CLI binary", self.player.protocol)
-        _, stdout = await check_output(cli_binary, "-ntp")
-        self.prov.logger.debug(f"Output from ntp check: {stdout.decode().strip()}")
-        return int(stdout.strip())
-
     async def start(self, start_ntp: int, skip: int = 0) -> None:
         """Initialize CLIRaop process for a player."""
         assert self.player.discovery_info is not None  # for type checker
         cli_binary = await get_cli_binary(self.player.protocol)
-
         extra_args: list[str] = []
         player_id = self.player.player_id
         extra_args += ["-if", self.mass.streams.bind_ip]
@@ -101,9 +89,6 @@ class RaopStream(AirPlayProtocol):
         # we use this intermediate binary to do the actual streaming because attempts to do
         # so using pure python (e.g. pyatv) were not successful due to the realtime nature
 
-        # Create named pipes before starting CLI process
-        await self._create_pipes()
-
         cliraop_args = [
             cli_binary,
             "-ntpstart",
@@ -120,19 +105,20 @@ class RaopStream(AirPlayProtocol):
             "-activeremote",
             self.active_remote_id,
             "-cmdpipe",
-            self.commands_named_pipe,
+            self.commands_pipe.path,
             "-udn",
             self.player.discovery_info.name,
             self.player.address,
-            self.audio_named_pipe,
+            self.audio_pipe.path,
         ]
         self.player.logger.debug(
             "Starting cliraop process for player %s with args: %s",
             self.player.player_id,
             cliraop_args,
         )
-        self._cli_proc = AsyncProcess(cliraop_args, stdin=False, stderr=True, name="cliraop")
+        self._cli_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop")
         await self._cli_proc.start()
+
         # read up to first 50 lines of stderr to get the initial status
         for _ in range(50):
             line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore")
@@ -140,15 +126,10 @@ class RaopStream(AirPlayProtocol):
             if "connected to " in line:
                 self.player.logger.info("AirPlay device connected. Starting playback.")
                 self._started.set()
-                # Open pipes now that cliraop is ready
-                await self._open_pipes()
                 break
             if "Cannot connect to AirPlay device" in line:
                 raise PlayerCommandFailed("Cannot connect to AirPlay device")
-        # repeat sending the volume level to the player because some players seem
-        # to ignore it the first time
-        # https://github.com/music-assistant/support/issues/3330
-        await self.send_cli_command(f"VOLUME={self.player.volume_level}\n")
+
         # start reading the stderr of the cliraop process from another task
         self._stderr_reader_task = self.mass.create_task(self._stderr_reader())
 
@@ -157,9 +138,6 @@ class RaopStream(AirPlayProtocol):
         assert self.player.discovery_info is not None  # for type checker
         cli_binary = await get_cli_binary(self.player.protocol)
 
-        # Create named pipes before starting CLI process
-        await self._create_pipes()
-
         cliraop_args = [
             cli_binary,
             "-pair",
@@ -170,7 +148,6 @@ class RaopStream(AirPlayProtocol):
             "-udn",
             self.player.discovery_info.name,
             self.player.address,
-            self.audio_named_pipe,
         ]
         self.player.logger.debug(
             "Starting PAIRING with cliraop process for player %s with args: %s",
@@ -217,11 +194,10 @@ class RaopStream(AirPlayProtocol):
         async for line in self._cli_proc.iter_stderr():
             if "elapsed milliseconds:" in line:
                 # this is received more or less every second while playing
-                # millis = int(line.split("elapsed milliseconds: ")[1])
-                # self.player.elapsed_time = (millis / 1000) - self.elapsed_time_correction
-                # self.player.elapsed_time_last_updated = time.time()
-                logger.log(VERBOSE_LOG_LEVEL, line)
-                continue
+                millis = int(line.split("elapsed milliseconds: ")[1])
+                # note that this represents the total elapsed time of the streaming session
+                elapsed_time = millis / 1000
+                player.set_state_from_stream(elapsed_time=elapsed_time)
             if "set pause" in line or "Pause at" in line:
                 player.set_state_from_stream(state=PlaybackState.PAUSED)
             if "Restarted at" in line or "restarting w/ pause" in line:
index 433f794e6282aa35436b6e32739c4c7548c0ffa7..1ddcb74e720a98850bdd1b301cde217b5ba68196 100644 (file)
@@ -9,6 +9,8 @@ from collections.abc import AsyncGenerator
 from contextlib import suppress
 from typing import TYPE_CHECKING
 
+from music_assistant_models.enums import PlaybackState
+
 from music_assistant.helpers.audio import get_player_filter_params
 from music_assistant.helpers.ffmpeg import FFMpeg
 from music_assistant.helpers.util import TaskManager, close_async_generator
@@ -69,7 +71,6 @@ class AirPlayStreamSession:
         self.wait_start = wait_start_seconds  # in seconds
         self.start_time = cur_time + wait_start_seconds
         self.start_ntp = unix_time_to_ntp(self.start_time)
-
         self.prov.logger.info(
             "Starting stream session with %d clients",
             len(self.sync_clients),
@@ -89,11 +90,20 @@ class AirPlayStreamSession:
 
             # Link stream session to player stream
             airplay_player.stream.session = self
-
-            await self._start_client_ffmpeg(airplay_player)
-
+            # create the named pipes
+            await airplay_player.stream.audio_pipe.create()
+            await airplay_player.stream.commands_pipe.create()
             # start the stream
             await airplay_player.stream.start(self.start_ntp)
+            # start the (player-specific) ffmpeg process
+            # note that ffmpeg will open the named pipe for writing
+            await self._start_client_ffmpeg(airplay_player)
+            # open the command pipe for writing
+            await airplay_player.stream.commands_pipe.open()
+            # repeat sending the volume level to the player because some players seem
+            # to ignore it the first time
+            # https://github.com/music-assistant/support/issues/3330
+            await airplay_player.stream.send_cli_command(f"VOLUME={airplay_player.volume_level}\n")
 
         async with TaskManager(self.mass) as tm:
             for _airplay_player in self.sync_clients:
@@ -121,10 +131,10 @@ class AirPlayStreamSession:
         assert airplay_player.stream.session == self
         async with self._lock:
             self.sync_clients.remove(airplay_player)
+        await airplay_player.stream.stop()
         if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
             await ffmpeg.close()
             del ffmpeg
-        await airplay_player.stream.stop()
         airplay_player.stream = None
         # If this was the last client, stop the session
         if not self.sync_clients:
@@ -144,6 +154,7 @@ class AirPlayStreamSession:
             return
 
         allow_late_join = self.prov.config.get_value(CONF_ENABLE_LATE_JOIN, False)
+        allow_late_join = False  # TODO: disable late join for now until we can test it properly
         if not allow_late_join:
             # Late joining is not allowed - restart the session for all players
             await self.stop()  # we need to stop the current session to add a new client
@@ -203,46 +214,41 @@ class AirPlayStreamSession:
         # Set new audio source and restart the stream
         self._audio_source = audio_source
         self._audio_source_task = asyncio.create_task(self._audio_streamer())
-
         self.last_stream_started = time.time()
         for sync_client in self.sync_clients:
             sync_client.set_state_from_stream(state=None, elapsed_time=0)
 
-    async def _audio_streamer(self) -> None:
+    async def _audio_streamer(self) -> None:  # noqa: PLR0915
         """Stream audio to all players."""
         generator_exhausted = False
         _last_metadata: str | None = None
+        chunk_size = self.pcm_format.pcm_sample_size
         try:
             # each chunk is exactly one second of audio data based on the pcm format.
             async for chunk in self._audio_source:
+                if len(chunk) != chunk_size:
+                    self.prov.logger.warning(
+                        "Audio source yielded chunk of unexpected size %d (expected %d), "
+                        "this may lead to desync issues",
+                        len(chunk),
+                        chunk_size,
+                    )
                 async with self._lock:
                     sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
                     if not sync_clients:
-                        self.prov.logger.error(
-                            "!!! AUDIO STREAMER EXITING: No running clients left! "
-                            "Total sync_clients: %d, Details: %s",
-                            len(self.sync_clients),
-                            [
-                                (x.player_id, x.stream.running if x.stream else None)
-                                for x in self.sync_clients
-                            ],
+                        self.prov.logger.debug(
+                            "Audio streamer exiting: No running clients left in session"
                         )
                         return
 
-                    # Write to all players with a timeout (10 seconds)
-                    # Timeout must account for player's internal latency buffer (1-4 seconds)
-                    # The player may legitimately not accept data while draining its buffer
-                    write_start = time.time()
+                    # Write chunk to all players
                     write_tasks = [
-                        asyncio.wait_for(self._write_chunk_to_player(x, chunk), timeout=10.0)
-                        for x in sync_clients
-                        if x.stream
+                        self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream
                     ]
                     results = await asyncio.gather(*write_tasks, return_exceptions=True)
-                    write_elapsed = time.time() - write_start
 
                     # Check for write errors or timeouts
-                    players_to_remove = []
+                    players_to_remove: list[AirPlayPlayer] = []
                     for i, result in enumerate(results):
                         if i >= len(sync_clients):
                             continue
@@ -250,21 +256,17 @@ class AirPlayStreamSession:
 
                         if isinstance(result, asyncio.TimeoutError):
                             self.prov.logger.error(
-                                "!!! TIMEOUT writing chunk %d to player %s - "
-                                "REMOVING from sync group! Total write time=%.3fs",
+                                "TIMEOUT writing chunk %d to player %s - REMOVING from sync group!",
                                 self.chunks_streamed,
                                 player.player_id,
-                                write_elapsed,
                             )
                             players_to_remove.append(player)
                         elif isinstance(result, Exception):
                             self.prov.logger.error(
-                                "!!! Error writing chunk %d to player %s: %s - "
-                                "REMOVING from sync group! Total write time=%.3fs",
+                                "Error writing chunk %d to player %s: %s - REMOVING from sync group!",
                                 self.chunks_streamed,
                                 player.player_id,
                                 result,
-                                write_elapsed,
                             )
                             players_to_remove.append(player)
 
@@ -338,20 +340,20 @@ class AirPlayStreamSession:
         chunk_number = self.chunks_streamed + 1
         player_id = airplay_player.player_id
 
-        # if the player has an associated FFMpeg instance, use that first
+        # don't write a chunk if we're paused
+        while airplay_player.playback_state == PlaybackState.PAUSED:
+            await asyncio.sleep(0.1)
+
+        # we write the chunk to the player's ffmpeg process which
+        # applies any player-specific filters (e.g. volume, dsp, etc)
+        # and outputs in the correct format for the player stream
+        # to the named pipe associated with the player's stream
         if ffmpeg := self._player_ffmpeg.get(player_id):
             await ffmpeg.write(chunk)
-            chunk_to_send = await ffmpeg.read(len(chunk))
-        else:
-            chunk_to_send = chunk
 
-        assert airplay_player.stream
         stream_write_start = time.time()
-        await airplay_player.stream.write_chunk(chunk_to_send)
         stream_write_elapsed = time.time() - stream_write_start
-
         total_elapsed = time.time() - write_start
-
         # Log only truly abnormal writes (>5s indicates a real stall)
         # Can take up to ~4s if player's latency buffer is being drained
         if total_elapsed > 5.0:
@@ -369,8 +371,6 @@ class AirPlayStreamSession:
         if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
             await ffmpeg.write_eof()
             await ffmpeg.close()
-        assert airplay_player.stream
-        await airplay_player.stream.write_eof()
 
     async def _send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None:
         """Send metadata to all players."""
@@ -391,19 +391,23 @@ class AirPlayStreamSession:
             await ffmpeg.close()
             del ffmpeg
         assert airplay_player.stream  # for type checker
-        # Create optional FFMpeg instance per player if needed
+        # Create the FFMpeg instance per player which accepts our PCM audio
+        # applies any player-specific filters (e.g. volume, dsp, etc)
+        # and outputs in the correct format for the player stream
+        # to the named pipe associated with the player's stream
         filter_params = get_player_filter_params(
             self.mass,
             airplay_player.player_id,
             self.pcm_format,
             airplay_player.stream.pcm_format,
         )
-        if filter_params or self.pcm_format != airplay_player.stream.pcm_format:
-            ffmpeg = FFMpeg(
-                audio_input="-",
-                input_format=self.pcm_format,
-                output_format=airplay_player.stream.pcm_format,
-                filter_params=filter_params,
-            )
-            await ffmpeg.start()
-            self._player_ffmpeg[airplay_player.player_id] = ffmpeg
+        ffmpeg = FFMpeg(
+            audio_input="-",
+            input_format=self.pcm_format,
+            output_format=airplay_player.stream.pcm_format,
+            filter_params=filter_params,
+            audio_output=airplay_player.stream.audio_pipe.path,
+            extra_input_args=["-y"],
+        )
+        await ffmpeg.start()
+        self._player_ffmpeg[airplay_player.player_id] = ffmpeg