async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
"""Replace the audio source of the stream."""
- # cancel the per-player ffmpeg reader tasks
- for _raop_player in self._sync_clients:
- assert _raop_player.raop_stream # for type checker
- assert _raop_player.raop_stream.ffmpeg_reader_task # for type checker
- _raop_player.raop_stream.ffmpeg_reader_task.cancel()
- with suppress(asyncio.CancelledError):
- await _raop_player.raop_stream.ffmpeg_reader_task
# cancel the current audio source task
assert self._audio_source_task # for type checker
self._audio_source_task.cancel()
# set new audio source and restart the stream
self._audio_source = audio_source
self._audio_source_task = asyncio.create_task(self._audio_streamer())
- for _raop_player in self._sync_clients:
- assert _raop_player.raop_stream # for type checker
- _raop_player.raop_stream.ffmpeg_reader_task = self.mass.create_task(
- _raop_player.raop_stream.ffmpeg_reader()
- )
+ # restart the (player-specific) ffmpeg stream for all players
+ # this is the easiest way to ensure the new audio source is used
+ # as quickly as possible, without waiting for the buffers to be drained
+ # it also allows to change the player settings such as DSP on the fly
+ for sync_client in self._sync_clients:
+ if not sync_client.raop_stream:
+ continue # guard
+ sync_client.raop_stream.start_ffmpeg_stream()
async def _audio_streamer(self) -> None:
"""Stream audio to all players."""
self._stderr_reader_task: asyncio.Task[None] | None = None
self._cliraop_proc: AsyncProcess | None = None
self._ffmpeg_proc: AsyncProcess | None = None
- self.ffmpeg_reader_task: asyncio.Task[None] | None = None
+ self._ffmpeg_reader_task: asyncio.Task[None] | None = None
self._started = asyncio.Event()
self._stopped = False
self._total_bytes_sent = 0
)
# ffmpeg handles the player specific stream + filters and pipes
# audio to the cliraop process
- self.ffmpeg_reader_task = self.mass.create_task(self.ffmpeg_reader())
+ self.start_ffmpeg_stream()
# cliraop is the binary that handles the actual raop streaming to the player
- # this is a slightly modified bversion of philippe44's libraop
+ # this is a slightly modified version of philippe44's libraop
# https://github.com/music-assistant/libraop
# we use this intermediate binary to do the actual streaming because attempts to do
# so using pure python (e.g. pyatv) were not successful due to the realtime nature
if platform.system() == "Darwin":
os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
await self._cliraop_proc.start()
- # read first 10 lines of stderr to get the initial status
- for _ in range(10):
+ # read first 20 lines of stderr to get the initial status
+ for _ in range(20):
line = (await self._cliraop_proc.read_stderr()).decode("utf-8", errors="ignore")
self.airplay_player.logger.debug(line)
if "connected to " in line:
self._started.set()
break
if "Cannot connect to AirPlay device" in line:
- self.ffmpeg_reader_task.cancel()
+ if self._ffmpeg_reader_task:
+ self._ffmpeg_reader_task.cancel()
raise PlayerCommandFailed("Cannot connect to AirPlay device")
# repeat sending the volume level to the player because some players seem
# to ignore it the first time
async def stop(self) -> None:
"""Stop playback and cleanup."""
- if self._stopped or not self._cliraop_proc:
- return
- if self._cliraop_proc.proc and not self._cliraop_proc.closed:
- await self.send_cli_command("ACTION=STOP")
+ await self.send_cli_command("ACTION=STOP")
self._stopped = True
- with suppress(asyncio.TimeoutError):
- await self._cliraop_proc.wait_with_timeout(2)
if self._stderr_reader_task and not self._stderr_reader_task.done():
self._stderr_reader_task.cancel()
- if self.ffmpeg_reader_task and not self.ffmpeg_reader_task.done():
- self.ffmpeg_reader_task.cancel()
- if self._cliraop_proc.proc and not self._cliraop_proc.closed:
+ if self._ffmpeg_reader_task and not self._ffmpeg_reader_task.done():
+ self._ffmpeg_reader_task.cancel()
+ if self._cliraop_proc and not self._cliraop_proc.closed:
await self._cliraop_proc.close(True)
+ if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
+ await self._ffmpeg_proc.close(True)
async def write_chunk(self, chunk: bytes) -> None:
"""Write a (pcm) audio chunk."""
async def send_cli_command(self, command: str) -> None:
"""Send an interactive command to the running CLIRaop binary."""
- if self._stopped:
- raise RuntimeError("Stream is already stopped")
+ if self._stopped or not self._cliraop_proc or self._cliraop_proc.closed:
+ return
await self._started.wait()
if not command.endswith("\n"):
self.airplay_player.last_command_sent = time.time()
await asyncio.to_thread(send_data)
- async def ffmpeg_reader(self) -> None:
+ def start_ffmpeg_stream(self) -> None:
+ """Start (or replace) the player-specific ffmpeg stream to feed cliraop."""
+ # cancel existing ffmpeg reader task
+ if self._ffmpeg_reader_task and not self._ffmpeg_reader_task.done():
+ self._ffmpeg_reader_task.cancel()
+ if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
+ self.mass.create_task(self._ffmpeg_proc.close(True))
+ # start new ffmpeg reader task
+ self._ffmpeg_reader_task = self.mass.create_task(self._ffmpeg_reader())
+
+ async def _ffmpeg_reader(self) -> None:
"""Read audio from the audio source and pipe it to the CLIRaop process."""
self._ffmpeg_proc = FFMpeg(
audio_input="-",
self._stream_bytes_sent = 0
mass_player = self.mass.players.get(self.airplay_player.player_id)
assert mass_player # for type checker
- try:
- await self._ffmpeg_proc.start()
- chunksize = get_chunksize(AIRPLAY_PCM_FORMAT)
- # wait for cliraop to be ready
- await asyncio.wait_for(self._started.wait(), 20)
- async for chunk in self._ffmpeg_proc.iter_chunked(chunksize):
- if self._stopped:
- break
- if not self._cliraop_proc or self._cliraop_proc.closed:
- break
- await self._cliraop_proc.write(chunk)
- self._stream_bytes_sent += len(chunk)
- self._total_bytes_sent += len(chunk)
- del chunk
- # we base elapsed time on the amount of bytes sent
- # so we can account for reusing the same session for multiple streams
- mass_player.elapsed_time = self._stream_bytes_sent / chunksize
- mass_player.elapsed_time_last_updated = time.time()
- if self._cliraop_proc and not self._cliraop_proc.closed:
- await self._cliraop_proc.write_eof()
- finally:
- await self._ffmpeg_proc.close()
+ await self._ffmpeg_proc.start()
+ chunksize = get_chunksize(AIRPLAY_PCM_FORMAT)
+ # wait for cliraop to be ready
+ await asyncio.wait_for(self._started.wait(), 20)
+ async for chunk in self._ffmpeg_proc.iter_chunked(chunksize):
+ if self._stopped:
+ break
+ if not self._cliraop_proc or self._cliraop_proc.closed:
+ break
+ await self._cliraop_proc.write(chunk)
+ self._stream_bytes_sent += len(chunk)
+ self._total_bytes_sent += len(chunk)
+ del chunk
+ # we base elapsed time on the amount of bytes sent
+ # so we can account for reusing the same session for multiple streams
+ mass_player.elapsed_time = self._stream_bytes_sent / chunksize
+ mass_player.elapsed_time_last_updated = time.time()
+ # if we reach this point, the process exited, most likely because the stream ended
+ if self._cliraop_proc and not self._cliraop_proc.closed:
+ await self._cliraop_proc.write_eof()
async def _stderr_reader(self) -> None:
"""Monitor stderr for the running CLIRaop process."""
if "end of stream reached" in line:
logger.debug("End of stream reached")
break
-
logger.log(VERBOSE_LOG_LEVEL, line)
# if we reach this point, the process exited
mass_player.state = PlayerState.IDLE
self.mass.players.update(airplay_player.player_id)
# ensure we're cleaned up afterwards (this also logs the returncode)
- if not self._stopped:
- await self.stop()
+ await self.stop()
async def _send_metadata(self, queue: PlayerQueue) -> None:
"""Send metadata to player (and connected sync childs)."""