LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.helpers.process")
-DEFAULT_CHUNKSIZE = 128000
+
+DEFAULT_CHUNKSIZE = 64000
# pylint: disable=invalid-name
args: list[str],
stdin: bool | int | AsyncGenerator[bytes, None] | None = None,
stdout: bool | int | None = None,
- stderr: bool | int | None = False,
+ stderr: bool | int | None = None,
name: str | None = None,
) -> None:
"""Initialize AsyncProcess."""
self.attached_tasks: list[asyncio.Task] = []
self.logger = LOGGER.getChild(name)
self._args = args
- self._stdin = stdin
- self._stdout = stdout
- self._stderr = stderr
- self._stdin_enabled = stdin not in (None, False)
- self._stdout_enabled = stdout not in (None, False)
- self._stderr_enabled = stderr not in (None, False)
+ self._stdin = None if stdin is False else stdin
+ self._stdout = None if stdout is False else stdout
+ self._stderr = asyncio.subprocess.DEVNULL if stderr is False else stderr
self._close_called = False
self._returncode: bool | None = None
else self._stdin,
stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout,
stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr,
+ # because we're exchanging big amounts of (audio) data with pipes
+ # it makes sense to extend the pipe size and (buffer) limits a bit
+ limit=1000000,
+ pipesize=1000000,
)
self.logger.debug("Process %s started with PID %s", self.name, self.proc.pid)
if isinstance(self._stdin, AsyncGenerator):
for task in self.attached_tasks:
if not task.done():
task.cancel()
- with suppress(asyncio.CancelledError):
- await task
if send_signal and self.returncode is None:
self.proc.send_signal(SIGINT)
- # allow the process a bit of time to respond to the signal before we go nuclear
- await asyncio.sleep(0.5)
+
+ # abort existing readers on stderr/stdout first before we send communicate
+ if self.proc.stdout and self.proc.stdout._waiter is not None:
+ self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
+ self.proc.stdout._waiter = None
+ if self.proc.stderr and self.proc.stderr._waiter is not None:
+ self.proc.stderr._waiter.set_exception(asyncio.CancelledError())
+ self.proc.stderr._waiter = None
# make sure the process is really cleaned up.
# especially with pipes this can cause deadlocks if not properly guarded
while True:
try:
async with asyncio.timeout(5):
- # abort existing readers on stderr/stdout first before we send communicate
- if self.proc.stdout and self.proc.stdout._waiter is not None:
- self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
- self.proc.stdout._waiter = None
- if self.proc.stderr and self.proc.stderr._waiter is not None:
- self.proc.stderr._waiter.set_exception(asyncio.CancelledError())
- self.proc.stderr._waiter = None
# use communicate to flush all pipe buffers
await self.proc.communicate()
if self.returncode is not None:
async def wait(self) -> int:
"""Wait for the process and return the returncode."""
- if self.returncode is not None:
- return self.returncode
- self._returncode = await self.proc.wait()
+ if self._returncode is None:
+ self._returncode = await self.proc.wait()
return self._returncode
async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
from __future__ import annotations
import asyncio
-import fcntl
import logging
import os
import platform
import socket
import time
-from collections.abc import AsyncGenerator
from contextlib import suppress
from dataclasses import dataclass
from random import randint, randrange
from zeroconf.asyncio import AsyncServiceInfo
from music_assistant.common.helpers.datetime import utc
-from music_assistant.common.helpers.util import get_ip_pton, select_free_port
+from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
# always generate a new active remote id to prevent race conditions
# with the named pipe used to send audio
self.active_remote_id: str = str(randint(1000, 8000))
- self.start_ntp: int | None = None # use as checksum
self.prevent_playback: bool = False
self.running = True
+ # audio_source_task will only exist for the main player in a sync group
+ self.audio_source_task: asyncio.Task | None = None
self._log_reader_task: asyncio.Task | None = None
- self._audio_reader_task: asyncio.Task | None = None
self._cliraop_proc: AsyncProcess | None = None
self._ffmpeg_proc: AsyncProcess | None = None
- async def start(self, start_ntp: int) -> None:
+ async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
"""Initialize CLIRaop process for a player."""
- self.start_ntp = start_ntp
extra_args = []
player_id = self.airplay_player.player_id
mass_player = self.mass.players.get(player_id)
"-port",
str(self.airplay_player.discovery_info.port),
"-wait",
- str(2500 - sync_adjust),
+ str(wait_start - sync_adjust),
"-volume",
str(mass_player.volume_level),
*extra_args,
if platform.system() == "Darwin":
os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
- # launch ffmpeg, feeding (player specific) audio chunks on stdout
- # one could argue that the intermediate ffmpeg towards cliraop is not needed
- # when there are no player specific filters or extras but in this case
- # ffmpeg serves as a small buffer towards the realtime cliraop streamer
-
- # create pipes to interconnect ffmpeg with cliraop
- def create_pipes() -> tuple[int, int]:
- read, write = os.pipe()
- if platform.system() == "Linux":
- # extend the pipe buffer a bit for smoother playback
- fcntl.fcntl(read, 1031, 1000000)
- fcntl.fcntl(write, 1031, 1000000)
- return (read, write)
-
- read, write = await asyncio.to_thread(create_pipes)
-
+ # ffmpeg handles the player specific stream + filters and pipes
+ # audio to the cliraop process
+ read, write = await asyncio.to_thread(os.pipe)
ffmpeg_args = get_ffmpeg_args(
input_format=self.input_format,
output_format=AIRPLAY_PCM_FORMAT,
filter_params=get_player_filter_params(self.mass, player_id),
+ loglevel="fatal",
)
self._ffmpeg_proc = AsyncProcess(
ffmpeg_args,
name="cliraop_ffmpeg",
)
await self._ffmpeg_proc.start()
- os.close(write)
+ await asyncio.to_thread(os.close, write)
self._cliraop_proc = AsyncProcess(
cliraop_args, stdin=read, stdout=False, stderr=True, name="cliraop"
)
await self._cliraop_proc.start()
- os.close(read)
+ await asyncio.to_thread(os.close, read)
self._log_reader_task = asyncio.create_task(self._log_watcher())
- async def stop(self, wait: bool = True):
+ async def stop(self):
"""Stop playback and cleanup."""
self.running = False
+ if self.audio_source_task and not self.audio_source_task.done():
+ self.audio_source_task.cancel()
+ if not self._cliraop_proc.closed:
+ await self.send_cli_command("ACTION=STOP")
+ await self._cliraop_proc.wait()
- async def _stop() -> None:
- # ffmpeg MUST be stopped before cliraop due to the chained pipes
- if not self._ffmpeg_proc.closed:
- await self._ffmpeg_proc.close(True)
- # allow the cliraop process to stop gracefully first
- if not self._cliraop_proc.closed:
- await self.send_cli_command("ACTION=STOP")
- with suppress(TimeoutError):
- await asyncio.wait_for(self._cliraop_proc.wait(), 5)
- # send regular close anyway (which also logs the returncode)
- await self._cliraop_proc.close(True)
-
- task = self.mass.create_task(_stop())
- if wait:
- await task
+ # ffmpeg can sometimes hang due to the connected pipes
+ # we handle closing it but it can be a bit slow so do that in the background
+ if not self._ffmpeg_proc.closed:
+ self.mass.create_task(self._ffmpeg_proc.close(True))
async def write_chunk(self, chunk: bytes) -> None:
- """Write a (pcm) audio chunk to ffmpeg."""
+ """Write a (pcm) audio chunk."""
await self._ffmpeg_proc.write(chunk)
async def write_eof(self) -> None:
- """Write EOF to the ffmpeg stdin."""
+ """Write EOF."""
await self._ffmpeg_proc.write_eof()
async def send_cli_command(self, command: str) -> None:
if not self._cliraop_proc or self._cliraop_proc.closed:
return
- named_pipe = f"/tmp/fifo-{self.active_remote_id}" # noqa: S108
if not command.endswith("\n"):
command += "\n"
with suppress(BrokenPipeError), open(named_pipe, "w") as f:
f.write(command)
+ named_pipe = f"/tmp/raop-{self.active_remote_id}" # noqa: S108
self.airplay_player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
await self.mass.create_task(send_data)
if "lost packet out of backlog" in line:
lost_packets += 1
if lost_packets == 100:
- logger.error("High packet loss detected, stopping playback...")
- await self.stop(False)
- elif lost_packets % 10 == 0:
+ logger.error("High packet loss detected, restarting playback...")
+ self.mass.create_task(self.mass.player_queues.resume(queue.queue_id))
+ else:
logger.warning("Packet loss detected!")
logger.log(VERBOSE_LOG_LEVEL, line)
if airplay_player.active_stream == self:
mass_player.state = PlayerState.IDLE
self.mass.players.update(airplay_player.player_id)
- # ensure we're cleaned up afterwards
- if self._ffmpeg_proc.returncode is None or self._cliraop_proc.returncode is None:
- await self.stop()
+ # ensure we're cleaned up afterwards (this also logs the returncode)
+ await self.stop()
async def _send_metadata(self, queue: PlayerQueue) -> None:
"""Send metadata to player (and connected sync childs)."""
async with asyncio.TaskGroup() as tg:
for airplay_player in self._get_sync_clients(player_id):
if airplay_player.active_stream and airplay_player.active_stream.running:
- tg.create_task(airplay_player.active_stream.stop(wait=False))
+ tg.create_task(airplay_player.active_stream.stop())
# select audio source
if media.media_type == MediaType.ANNOUNCEMENT:
# special case: stream announcement
input_format=AudioFormat(ContentType.try_parse(media.uri)),
output_format=AIRPLAY_PCM_FORMAT,
)
- self.mass.create_task(self._handle_stream_audio, player_id, audio_source, input_format)
- async def _handle_stream_audio(
- self, player_id: str, audio_source: AsyncGenerator[bytes, None], input_format: AudioFormat
- ) -> None:
- """Handle streaming of audio to one or more airplay players."""
# Python is not suitable for realtime audio streaming so we do the actual streaming
# of (RAOP) audio using a small executable written in C based on libraop to do the actual
# timestamped playback, which reads pcm audio from stdin
# and we can send some interactive commands using a named pipe.
- # get current ntp before we start
- _, stdout = await check_output(f"{self.cliraop_bin} -ntp")
- start_ntp = int(stdout.strip())
-
- # setup Raop process for player and its sync childs
- for airplay_player in self._get_sync_clients(player_id):
+ # setup AirplayStream for player and its sync childs
+ sync_clients = self._get_sync_clients(player_id)
+ for airplay_player in sync_clients:
airplay_player.active_stream = AirplayStream(
self, airplay_player, input_format=input_format
)
- self.mass.create_task(airplay_player.active_stream.start(start_ntp))
-
- async for chunk in audio_source:
- active_clients = 0
- async with asyncio.TaskGroup() as tg:
- for airplay_player in self._get_sync_clients(player_id):
- if not airplay_player.active_stream or not airplay_player.active_stream.running:
- # player stopped or switched to a new stream
- continue
- if airplay_player.active_stream.start_ntp != start_ntp:
- # checksum mismatch
- continue
- tg.create_task(airplay_player.active_stream.write_chunk(chunk))
- active_clients += 1
-
- if active_clients == 0:
- # no more clients
- return
- # entire stream consumed: send EOF (empty chunk)
+
+ # use a buffer here to consume small hiccups as the
+ # raop streaming is pretty much realtime and without a buffer to stdin
+ buffer: asyncio.Queue[bytes] = asyncio.Queue(10)
+
+ async def fill_buffer() -> None:
+ async for chunk in audio_source:
+ await buffer.put(chunk)
+ await buffer.put(b"EOF")
+
+ fill_buffer_task = asyncio.create_task(fill_buffer())
+
+ async def audio_streamer() -> None:
+ try:
+ while True:
+ chunk = await buffer.get()
+ if chunk == b"EOF":
+ break
+ async with asyncio.TaskGroup() as tg:
+ for airplay_player in sync_clients:
+ tg.create_task(airplay_player.active_stream.write_chunk(chunk))
+
+ # entire stream consumed: send EOF
+ for airplay_player in sync_clients:
+ self.mass.create_task(airplay_player.active_stream.write_eof())
+ finally:
+ if not fill_buffer_task.done():
+ fill_buffer_task.cancel()
+ # make sure the stdin generator is also properly closed
+ # by propagating a cancellederror within
+ task = asyncio.create_task(audio_source.__anext__())
+ task.cancel()
+ empty_queue(buffer)
+
+ # get current ntp and start cliraop
+ _, stdout = await check_output(f"{self.cliraop_bin} -ntp")
+ start_ntp = int(stdout.strip())
+ wait_start = 1000 + (500 * len(sync_clients))
async with asyncio.TaskGroup() as tg:
for airplay_player in self._get_sync_clients(player_id):
- tg.create_task(airplay_player.active_stream.write_eof())
+ tg.create_task(airplay_player.active_stream.start(start_ntp, wait_start))
+ self._players[player_id].active_stream.audio_source_task = asyncio.create_task(
+ audio_streamer()
+ )
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player.
count = 0
if not (airplay_player := self._players.get(player_id)):
return
- prev_ntp = airplay_player.active_stream.start_ntp
+ prev_active_remote_id = airplay_player.active_stream.active_remote_id
while count < 40:
count += 1
if not (airplay_player := self._players.get(player_id)):
return
if not (active_stream := airplay_player.active_stream):
return
- if active_stream.start_ntp != prev_ntp:
+ if active_stream.active_remote_id != prev_active_remote_id:
# checksum
return
if not active_stream.prevent_playback: