# with the named pipe used to send audio
self.active_remote_id: str = str(randint(1000, 8000))
self.prevent_playback: bool = False
- self.running = True
# audio_source_task will only exist for the main player in a sync group
self.audio_source_task: asyncio.Task | None = None
self._log_reader_task: asyncio.Task | None = None
self._cliraop_proc: AsyncProcess | None = None
self._ffmpeg_proc: AsyncProcess | None = None
+ self._started = asyncio.Event()
+ self._stopped = False
async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
"""Initialize CLIRaop process for a player."""
)
await self._cliraop_proc.start()
await asyncio.to_thread(os.close, read)
+ self._started.set()
self._log_reader_task = asyncio.create_task(self._log_watcher())
async def stop(self):
"""Stop playback and cleanup."""
- self.running = False
- if self.audio_source_task and not self.audio_source_task.done():
- self.audio_source_task.cancel()
+ if self._stopped:
+ return
if not self._cliraop_proc.closed:
await self.send_cli_command("ACTION=STOP")
+ self._stopped = True # set after send_cli command!
+ if self.audio_source_task and not self.audio_source_task.done():
+ self.audio_source_task.cancel()
try:
await asyncio.wait_for(self._cliraop_proc.wait(), 5)
except TimeoutError:
async def write_chunk(self, chunk: bytes) -> None:
"""Write a (pcm) audio chunk."""
+ if self._stopped:
+ return
+ await self._started.wait()
await self._ffmpeg_proc.write(chunk)
async def write_eof(self) -> None:
"""Write EOF."""
+ if self._stopped:
+ return
+ await self._started.wait()
await self._ffmpeg_proc.write_eof()
async def send_cli_command(self, command: str) -> None:
"""Send an interactive command to the running CLIRaop binary."""
- if not self._cliraop_proc or self._cliraop_proc.closed:
+ if self._stopped:
return
+ await self._started.wait()
if not command.endswith("\n"):
command += "\n"
# always stop existing stream first
async with asyncio.TaskGroup() as tg:
for airplay_player in self._get_sync_clients(player_id):
- if airplay_player.active_stream and airplay_player.active_stream.running:
+ if airplay_player.active_stream and airplay_player.active_stream:
tg.create_task(airplay_player.active_stream.stop())
# select audio source
if media.media_type == MediaType.ANNOUNCEMENT: