# prefer interactive command to our streamer
await self.send_cli_command("ACTION=STOP")
# use communicate to clear stdin/stdout and wait for exit
- await self._cliraop_proc.wait()
- # stop background task
- if self._log_reader_task and not self._log_reader_task.done():
- self._log_reader_task.cancel()
+ try:
+ await asyncio.wait_for(self._cliraop_proc.wait(), 5)
+ except TimeoutError:
+ self.airplay_player.logger.error( # noqa: TRY400
+ "RAOP process did not stop on time, attempting forced close."
+ )
+ self._cliraop_proc.kill()
+ await asyncio.wait_for(self._cliraop_proc.wait(), 5)
+ finally:
+ # stop background task
+ if self._log_reader_task and not self._log_reader_task.done():
+ self._log_reader_task.cancel()
async def send_cli_command(self, command: str) -> None:
"""Send an interactive command to the running CLIRaop binary."""
line = line.decode().strip() # noqa: PLW2901
if not line:
continue
- logger.debug(line)
- if "set pause" in line:
+ if "elapsed milliseconds:" in line:
+ millis = int(line.split("elapsed milliseconds: ")[1])
+ mass_player.elapsed_time = millis / 1000
+ mass_player.elapsed_time_last_updated = time.time()
+ continue # do not log this line, its too verbose
+ if "set pause" in line or "Pause at" in line:
mass_player.state = PlayerState.PAUSED
self.mass.players.update(airplay_player.player_id)
- elif "Restarted at" in line:
+ elif "Restarted at" in line or "restarting w/ pause" in line:
mass_player.state = PlayerState.PLAYING
self.mass.players.update(airplay_player.player_id)
- elif "after start), played" in line:
- millis = int(line.split("played ")[1].split(" ")[0])
- mass_player.elapsed_time = millis / 1000
- mass_player.elapsed_time_last_updated = time.time()
+ elif "Stopped at" in line:
+ mass_player.state = PlayerState.IDLE
+ self.mass.players.update(airplay_player.player_id)
elif "restarting w/o pause" in line:
# streaming has started
mass_player.state = PlayerState.PLAYING
mass_player.elapsed_time = 0
mass_player.elapsed_time_last_updated = time.time()
self.mass.players.update(airplay_player.player_id)
+ logger.debug(line)
# if we reach this point, the process exited
airplay_player.logger.debug("Log watcher task finished...")
server=f"{socket.gethostname()}.local",
)
await self.mass.aiozc.async_register_service(self._dacp_info)
+ self._resync_handle: asyncio.TimerHandle | None = None
async def on_mdns_service_state_change(
self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
- entries = await super().get_player_config_entries(player_id)
- return entries + PLAYER_CONFIG_ENTRIES
+ base_entries = await super().get_player_config_entries(player_id)
+ if player_id not in self._players:
+ # most probably a syncgroup
+ return base_entries
+ return base_entries + PLAYER_CONFIG_ENTRIES
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player.
- seek_position: Optional seek to this position.
- fade_in: Optionally fade in the item at playback start.
"""
+ # fix race condition where resync and play media are called at more or less the same time
+ if self._resync_handle:
+ self._resync_handle.cancel()
+ self._resync_handle = None
# always stop existing stream first
await self.cmd_stop(player_id)
# start streaming the queue (pcm) audio in a background task
This is a special feature from the Universal Group provider.
"""
+ # fix race condition where resync and play media are called at more or less the same time
+ if self._resync_handle:
+ self._resync_handle.cancel()
+ self._resync_handle = None
# always stop existing stream first
await self.cmd_stop(player_id)
if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 44100:
if player.synced_to:
# should not happen, but just in case
raise RuntimeError("Player is synced")
+ synced_player_ids = [x.player_id for x in self._get_sync_clients(player_id)]
+ self.logger.info(
+ "Starting RAOP stream for Queue %s to %s",
+ queue.display_name,
+ "/".join(synced_player_ids),
+ )
# Python is not suitable for realtime audio streaming.
# So, I've decided to go the fancy route here. I've created a small binary
airplay_player.active_stream = AirplayStreamJob(self, airplay_player)
await airplay_player.active_stream.init_cliraop(start_ntp)
prev_metadata_checksum: str = ""
+ prev_progress_report: float = 0
async for pcm_chunk in audio_iterator:
# send audio chunk to player(s)
available_clients = 0
continue
available_clients += 1
tg.create_task(airplay_player.active_stream.write_chunk(pcm_chunk))
- # always send the progress
- tg.create_task(
- airplay_player.active_stream.send_cli_command(
- f"PROGRESS={int(queue.elapsed_time)}\n"
+ # send the progress report every 5 seconds
+ now = time.time()
+ if now - prev_progress_report >= 5:
+ prev_progress_report = now
+ tg.create_task(
+ airplay_player.active_stream.send_cli_command(
+ f"PROGRESS={int(queue.elapsed_time)}\n"
+ )
)
- )
if not available_clients:
# this streamjob is no longer active
return
- player_id: player_id of the player to handle the command.
- target_player: player_id of the syncgroup master or group player.
"""
- player = self.mass.players.get(player_id, raise_unavailable=True)
- group_leader = self.mass.players.get(target_player, raise_unavailable=True)
- if group_leader.synced_to:
+ child_player = self.mass.players.get(player_id)
+ assert child_player # guard
+ parent_player = self.mass.players.get(target_player)
+ assert parent_player # guard
+ if parent_player.synced_to:
raise RuntimeError("Player is already synced")
- player.synced_to = target_player
- group_leader.group_childs.add(player_id)
- self.mass.players.update(target_player)
- if group_leader.powered:
- await self.mass.players.cmd_power(player_id, True)
- active_queue = self.mass.player_queues.get_active_queue(group_leader.player_id)
+ if child_player.synced_to and child_player.synced_to != target_player:
+ raise RuntimeError("Player is already synced to another player")
+ # always make sure that the parent player is part of the sync group
+ parent_player.group_childs.add(parent_player.player_id)
+ parent_player.group_childs.add(child_player.player_id)
+ child_player.synced_to = parent_player.player_id
+ # mark players as powered
+ parent_player.powered = True
+ child_player.powered = True
+ # check if we should (re)start or join a stream session
+ active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id)
if active_queue.state == PlayerState.PLAYING:
- self.mass.create_task(self.mass.player_queues.resume(active_queue.queue_id))
+ # playback needs to be restarted to form a new multi client stream session
+ def resync() -> None:
+ self._resync_handle = None
+ self.mass.create_task(
+ self.mass.player_queues.resume(active_queue.queue_id, fade_in=False)
+ )
+
+ # this could potentially be called by multiple players at the exact same time
+ # so we debounce the resync a bit here with a timer
+ if self._resync_handle:
+ self._resync_handle.cancel()
+ self._resync_handle = self.mass.loop.call_later(0.5, resync)
+ else:
+ # make sure that the player manager gets an update
+ self.mass.players.update(child_player.player_id, skip_forward=True)
+ self.mass.players.update(parent_player.player_id, skip_forward=True)
async def cmd_unsync(self, player_id: str) -> None:
"""Handle UNSYNC command for given player.
This is a special feature from the Universal Group provider.
"""
+ # fix race condition where resync and play media are called at more or less the same time
+ if self._resync_handle:
+ self._resync_handle.cancel()
+ self._resync_handle = None
# forward command to player and any connected sync members
sync_clients = self._get_sync_clients(player_id)
async with asyncio.TaskGroup() as tg:
assert child_player # guard
parent_player = self.mass.players.get(target_player)
assert parent_player # guard
+ if parent_player.synced_to:
+ raise RuntimeError("Player is already synced")
+ if child_player.synced_to and child_player.synced_to != target_player:
+ raise RuntimeError("Player is already synced to another player")
# always make sure that the parent player is part of the sync group
parent_player.group_childs.add(parent_player.player_id)
parent_player.group_childs.add(child_player.player_id)
child_player.synced_to = parent_player.player_id
# check if we should (re)start or join a stream session
active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id)
- if parent_player.state == PlayerState.PLAYING:
+ if active_queue.state == PlayerState.PLAYING:
# playback needs to be restarted to form a new multi client stream session
def resync() -> None:
self._resync_handle = None
self._resync_handle = self.mass.loop.call_later(0.5, resync)
else:
# make sure that the player manager gets an update
- self.mass.players.update(child_player.player_id)
- self.mass.players.update(parent_player.player_id)
+ self.mass.players.update(child_player.player_id, skip_forward=True)
+ self.mass.players.update(parent_player.player_id, skip_forward=True)
async def cmd_unsync(self, player_id: str) -> None:
"""Handle UNSYNC command for given player."""