from typing import TYPE_CHECKING
from music_assistant_models.enums import PlayerState
+from music_assistant_models.errors import PlayerCommandFailed
from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
from music_assistant.helpers.audio import get_player_filter_params
from music_assistant.helpers.ffmpeg import FFMpeg
from music_assistant.helpers.process import AsyncProcess, check_output
-from music_assistant.helpers.util import close_async_generator
+from music_assistant.helpers.util import TaskManager, close_async_generator
from .const import (
AIRPLAY_PCM_FORMAT,
self._sync_clients = sync_clients
self._audio_source = audio_source
self._audio_source_task: asyncio.Task[None] | None = None
- self._stopped: bool = False
self._lock = asyncio.Lock()
async def start(self) -> None:
"""Initialize RaopStreamSession."""
# initialize raop stream for all players
- for airplay_player in self._sync_clients:
- if airplay_player.raop_stream and airplay_player.raop_stream.running:
- raise RuntimeError("Player already has an active stream")
- airplay_player.raop_stream = RaopStream(self, airplay_player)
async def audio_streamer() -> None:
"""Stream audio to all players."""
generator_exhausted = False
try:
async for chunk in self._audio_source:
- if not self._sync_clients:
- return
async with self._lock:
+ sync_clients = [
+ x for x in self._sync_clients if x.raop_stream and x.raop_stream.running
+ ]
+ if not sync_clients:
+ return
await asyncio.gather(
*[
x.raop_stream.write_chunk(chunk)
- for x in self._sync_clients
+ for x in sync_clients
if x.raop_stream
],
return_exceptions=True,
generator_exhausted = True
async with self._lock:
await asyncio.gather(
- *[x.raop_stream.write_eof() for x in self._sync_clients if x.raop_stream],
+ *[
+ x.raop_stream.write_eof()
+ for x in self._sync_clients
+ if x.raop_stream and x.raop_stream.running
+ ],
return_exceptions=True,
)
except Exception as err:
str(err) or err.__class__.__name__,
exc_info=err if logger.isEnabledFor(logging.DEBUG) else None,
)
+ raise
finally:
if not generator_exhausted:
await close_async_generator(self._audio_source)
assert self.prov.cliraop_bin
_, stdout = await check_output(self.prov.cliraop_bin, "-ntp")
start_ntp = int(stdout.strip())
- wait_start = 1500 + (250 * len(self._sync_clients))
- async with self._lock:
- await asyncio.gather(
- *[
- x.raop_stream.start(start_ntp, wait_start)
- for x in self._sync_clients
- if x.raop_stream
- ],
- return_exceptions=True,
- )
+ wait_start = 1750 + (250 * len(self._sync_clients))
+
+ async def _start_client(raop_player: AirPlayPlayer) -> None:
+ # stop existing stream if running
+ if raop_player.raop_stream and raop_player.raop_stream.running:
+ await raop_player.raop_stream.stop()
+
+ raop_player.raop_stream = RaopStream(self, raop_player)
+ await raop_player.raop_stream.start(start_ntp, wait_start)
+
+ async with TaskManager(self.mass) as tm:
+ for _raop_player in self._sync_clients:
+ tm.create_task(_start_client(_raop_player))
self._audio_source_task = asyncio.create_task(audio_streamer())
async def stop(self) -> None:
"""Stop playback and cleanup."""
- if self._stopped:
- return
- self._stopped = True
- if self._audio_source_task:
+ if self._audio_source_task and not self._audio_source_task.done():
self._audio_source_task.cancel()
with suppress(asyncio.CancelledError):
await self._audio_source_task
assert airplay_player.raop_stream.session == self
async with self._lock:
self._sync_clients.remove(airplay_player)
- await airplay_player.raop_stream.stop()
+ await airplay_player.raop_stream.stop()
airplay_player.raop_stream = None
async def add_client(self, airplay_player: AirPlayPlayer) -> None:
# with the named pipe used to send audio
self.active_remote_id: str = str(randint(1000, 8000))
self.prevent_playback: bool = False
- self._log_reader_task: asyncio.Task[None] | asyncio.Future[None] | None = None
+ self._stderr_reader_task: asyncio.Task[None] | None = None
self._cliraop_proc: AsyncProcess | None = None
self._ffmpeg_proc: AsyncProcess | None = None
self._started = asyncio.Event()
@property
def running(self) -> bool:
"""Return boolean if this stream is running."""
- return not self._stopped and self._started.is_set()
+ return (
+ not self._stopped
+ and self._started.is_set()
+ and self._cliraop_proc is not None
+ and not self._cliraop_proc.closed
+ )
async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
"""Initialize CLIRaop process for a player."""
read_ahead = await self.mass.config.get_player_config_value(
player_id, CONF_READ_AHEAD_BUFFER
)
-
# create os pipes to pipe ffmpeg to cliraop
read, write = await asyncio.to_thread(os.pipe)
-
# ffmpeg handles the player specific stream + filters and pipes
# audio to the cliraop process
self._ffmpeg_proc = FFMpeg(
await asyncio.to_thread(os.close, write)
# cliraop is the binary that handles the actual raop streaming to the player
+ # this is a slightly modified bversion of philippe44's libraop
+ # https://github.com/music-assistant/libraop
+ # we use this intermediate binary to do the actual streaming because attempts to do
+ # so using pure python (e.g. pyatv) were not successful due to the realtime nature
+ # TODO: Either enhance libraop with airplay 2 support or find a better alternative
cliraop_args = [
self.prov.cliraop_bin,
"-ntpstart",
os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
await self._cliraop_proc.start()
await asyncio.to_thread(os.close, read)
- self._started.set()
- self._log_reader_task = self.mass.create_task(self._log_watcher())
+ # read first 10 lines of stderr to get the initial status
+ for _ in range(10):
+ line = (await self._cliraop_proc.read_stderr()).decode("utf-8", errors="ignore")
+ self.airplay_player.logger.debug(line)
+ if "connected to " in line:
+ self._started.set()
+ break
+ if "Cannot connect to AirPlay device" in line:
+ raise PlayerCommandFailed("Cannot connect to AirPlay device")
+ # start reading the stderr of the cliraop process from another task
+ self._stderr_reader_task = self.mass.create_task(self._stderr_reader())
async def stop(self) -> None:
"""Stop playback and cleanup."""
- if self._stopped:
- return
- if not self._cliraop_proc:
+ if self._stopped or not self._cliraop_proc:
return
if self._cliraop_proc.proc and not self._cliraop_proc.closed:
await self.send_cli_command("ACTION=STOP")
- self._stopped = True # set after send_cli command!
+ self._stopped = True
+ with suppress(asyncio.TimeoutError):
+ await self._cliraop_proc.wait_with_timeout(5)
+ if self._stderr_reader_task and not self._stderr_reader_task.done():
+ self._stderr_reader_task.cancel()
if self._cliraop_proc.proc and not self._cliraop_proc.closed:
await self._cliraop_proc.close(True)
if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
await self._ffmpeg_proc.close(True)
- if self._log_reader_task:
- self._log_reader_task.cancel()
- with suppress(asyncio.CancelledError):
- await self._log_reader_task
self._cliraop_proc = None
self._ffmpeg_proc = None
async def write_chunk(self, chunk: bytes) -> None:
"""Write a (pcm) audio chunk."""
if self._stopped:
- return
+ raise RuntimeError("Stream is already stopped")
await self._started.wait()
assert self._ffmpeg_proc
await self._ffmpeg_proc.write(chunk)
async def write_eof(self) -> None:
"""Write EOF."""
if self._stopped:
- return
+ raise RuntimeError("Stream is already stopped")
await self._started.wait()
assert self._ffmpeg_proc
await self._ffmpeg_proc.write_eof()
async def send_cli_command(self, command: str) -> None:
"""Send an interactive command to the running CLIRaop binary."""
if self._stopped:
- return
+ raise RuntimeError("Stream is already stopped")
await self._started.wait()
if not command.endswith("\n"):
self.airplay_player.last_command_sent = time.time()
await asyncio.to_thread(send_data)
- async def _log_watcher(self) -> None:
+ async def _stderr_reader(self) -> None: # noqa: PLR0915
"""Monitor stderr for the running CLIRaop process."""
airplay_player = self.airplay_player
mass_player = self.mass.players.get(airplay_player.player_id)
mass_player.state = PlayerState.IDLE
self.mass.players.update(airplay_player.player_id)
# ensure we're cleaned up afterwards (this also logs the returncode)
- await self.stop()
+ if not self._stopped:
+ await self.stop()
async def _send_metadata(self, queue: PlayerQueue) -> None:
"""Send metadata to player (and connected sync childs)."""