from random import randint, randrange
from typing import TYPE_CHECKING
-from zeroconf import ServiceStateChange
+from zeroconf import IPVersion, ServiceStateChange
from zeroconf.asyncio import AsyncServiceInfo
from music_assistant.common.helpers.datetime import utc
-from music_assistant.common.helpers.util import get_ip_pton, select_free_port
+from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
DOMAIN = "airplay"
-CONF_LATENCY = "latency"
-DEFAULT_LATENCY = 2000
CONF_ENCRYPTION = "encryption"
CONF_ALAC_ENCODE = "alac_encode"
CONF_VOLUME_START = "volume_start"
PLAYER_CONFIG_ENTRIES = (
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
- ConfigEntry(
- key=CONF_LATENCY,
- type=ConfigEntryType.INTEGER,
- range=(500, 4000),
- default_value=DEFAULT_LATENCY,
- label="Latency",
- description="Sets the number of milliseconds of audio buffer in the player. "
- "This is important to absorb network throughput jitter. \n"
- "Increase this value if you notice network dropouts at the cost of a slower "
- "response to commands.",
- advanced=True,
- ),
ConfigEntry(
key=CONF_ENCRYPTION,
type=ConfigEntryType.BOOLEAN,
return (manufacturer, model)
+def get_primary_ip_address(discovery_info: AsyncServiceInfo) -> str | None:
+ """Get primary IP address from zeroconf discovery info."""
+ return next(
+ (x for x in discovery_info.parsed_addresses(IPVersion.V4Only) if x != "127.0.0.1"), None
+ )
+
+
class AirplayStreamJob:
"""Object that holds the details of a stream job."""
# with the named pipe used to send commands
self.active_remote_id: str = str(randint(1000, 8000))
self.start_ntp: int | None = None # use as checksum
+ self._audio_buffer = asyncio.Queue(2)
self._log_reader_task: asyncio.Task | None = None
+ self._audio_reader_task: asyncio.Task | None = None
self._cliraop_proc: asyncio.subprocess.Process | None = None
+ self._stop_requested = False
@property
def running(self) -> bool:
"""Return bool if we're running."""
- return self._cliraop_proc and self._cliraop_proc.returncode is None
+ return (
+ not self._stop_requested
+ and self._cliraop_proc
+ and self._cliraop_proc.returncode is None
+ )
async def init_cliraop(self, start_ntp: int) -> None:
"""Initialize CLIRaop process for a player."""
extra_args = []
player_id = self.airplay_player.player_id
mass_player = self.mass.players.get(player_id)
- latency = self.mass.config.get_raw_player_config_value(
- player_id, CONF_LATENCY, DEFAULT_LATENCY
- )
- extra_args += ["-l", str(latency)]
if self.mass.config.get_raw_player_config_value(player_id, CONF_ENCRYPTION, False):
extra_args += ["-e"]
if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True):
"-p",
str(self.airplay_player.discovery_info.port),
"-w",
- str(2500 - sync_adjust),
+ str(2000 - sync_adjust),
"-v",
str(mass_player.volume_level),
*extra_args,
close_fds=True,
)
self._log_reader_task = asyncio.create_task(self._log_watcher())
+ self._audio_reader_task = asyncio.create_task(self._audio_reader())
async def stop(self):
"""Stop playback and cleanup."""
if not self.running:
return
- # prefer interactive command to our streamer
await self.send_cli_command("ACTION=STOP")
- # use communicate to clear stdin/stdout and wait for exit
- try:
- await asyncio.wait_for(self._cliraop_proc.wait(), 5)
- except TimeoutError:
- self.airplay_player.logger.error( # noqa: TRY400
- "RAOP process did not stop on time, attempting forced close."
- )
- self._cliraop_proc.kill()
- await asyncio.wait_for(self._cliraop_proc.wait(), 5)
- finally:
- # stop background task
- if self._log_reader_task and not self._log_reader_task.done():
- self._log_reader_task.cancel()
+ self._stop_requested = True
+ # stop background tasks
+ if self._log_reader_task and not self._log_reader_task.done():
+ self._log_reader_task.cancel()
+ if self._audio_reader_task and not self._audio_reader_task.done():
+ self._audio_reader_task.cancel()
+
+ empty_queue(self._audio_buffer)
+ await asyncio.wait_for(self._cliraop_proc.communicate(), 30)
async def send_cli_command(self, command: str) -> None:
"""Send an interactive command to the running CLIRaop binary."""
if not line:
continue
if "elapsed milliseconds:" in line:
+ # do not log this line, its too verbose
millis = int(line.split("elapsed milliseconds: ")[1])
mass_player.elapsed_time = millis / 1000
mass_player.elapsed_time_last_updated = time.time()
- continue # do not log this line, its too verbose
+ continue
if "set pause" in line or "Pause at" in line:
+ logger.info("raop streaming paused")
mass_player.state = PlayerState.PAUSED
self.mass.players.update(airplay_player.player_id)
- elif "Restarted at" in line or "restarting w/ pause" in line:
+ continue
+ if "Restarted at" in line or "restarting w/ pause" in line:
+ logger.info("raop streaming restarted after pause")
mass_player.state = PlayerState.PLAYING
self.mass.players.update(airplay_player.player_id)
- elif "Stopped at" in line:
+ continue
+ if "Stopped at" in line:
+ logger.info("raop streaming stopped")
mass_player.state = PlayerState.IDLE
self.mass.players.update(airplay_player.player_id)
- elif "restarting w/o pause" in line:
+ continue
+ if "restarting w/o pause" in line:
# streaming has started
+ logger.info("raop streaming started")
mass_player.state = PlayerState.PLAYING
mass_player.elapsed_time = 0
mass_player.elapsed_time_last_updated = time.time()
self.mass.players.update(airplay_player.player_id)
+ continue
+ if "lost packet out of backlog" in line:
+ logger.warning(line)
+ continue
+ # debug log everything else
logger.debug(line)
# if we reach this point, the process exited
- airplay_player.logger.debug("Log watcher task finished...")
- mass_player.state = PlayerState.IDLE
- self.mass.players.update(airplay_player.player_id)
logger.debug(
"CLIRaop process stopped with errorcode %s",
self._cliraop_proc.returncode,
)
+ if (
+ airplay_player.active_stream
+ and airplay_player.active_stream.active_remote_id == self.active_remote_id
+ ):
+ mass_player.state = PlayerState.IDLE
+ self.mass.players.update(airplay_player.player_id)
+
+ async def _audio_reader(self) -> None:
+ """Read audio chunks from buffer and send them to the cliraop process."""
+ logger = self.airplay_player.logger
+ logger.debug("Audio reader started")
+ while self.running:
+ chunk = await self._audio_buffer.get()
+ if chunk == b"EOF":
+ # EOF chunk
+ break
+ self._cliraop_proc.stdin.write(chunk)
+ with suppress(BrokenPipeError):
+ await self._cliraop_proc.stdin.drain()
+ # send EOF
+ if self._cliraop_proc.returncode is None and not self._cliraop_proc.stdin.is_closing():
+ self._cliraop_proc.stdin.write_eof()
+ with suppress(BrokenPipeError):
+ await self._cliraop_proc.stdin.drain()
+ logger.debug("Audio reader finished")
async def write_chunk(self, data: bytes) -> None:
- """Write a chunk of (pcm) data to the stdin of CLIRaop."""
- if not self.running or not self._cliraop_proc.stdin.can_write_eof():
- return
- self._cliraop_proc.stdin.write(data)
- if not self.running or not self._cliraop_proc.stdin.can_write_eof():
+ """Write a chunk of (pcm) data to the audio buffer."""
+ if not self.running:
return
- with suppress(BrokenPipeError):
- await self._cliraop_proc.stdin.drain()
+ await self._audio_buffer.put(data)
- async def write_eof(self, data: bytes) -> None:
- """Write a chunk of (pcm) data to the stdin of CLIRaop."""
- if not self.running or not self._cliraop_proc.stdin.can_write_eof():
- return
- self._cliraop_proc.stdin.write_eof()
- if not self.running or not self._cliraop_proc.stdin.can_write_eof():
+ async def write_eof(self) -> None:
+ """Write end-of-file chunk to the audo buffer."""
+ if not self.running:
return
- with suppress(BrokenPipeError):
- await self._cliraop_proc.stdin.drain()
+ await self._audio_buffer.put(b"EOF")
@dataclass
player_id: str
discovery_info: AsyncServiceInfo
+ address: str
logger: logging.Logger
active_stream: AirplayStreamJob | None = None
# handle update for existing device
if airplay_player := self._players.get(player_id):
if mass_player := self.mass.players.get(player_id):
- cur_address = info.parsed_addresses()[0]
- prev_address = airplay_player.discovery_info.parsed_addresses()[0]
- if cur_address != prev_address:
+ cur_address = get_primary_ip_address(info)
+ if cur_address and cur_address != airplay_player.address:
+ airplay_player.address = cur_address
airplay_player.logger.info(
- "Address updated from %s to %s", prev_address, cur_address
+ "Address updated from %s to %s", airplay_player.address, cur_address
)
mass_player.device_info = DeviceInfo(
model=mass_player.device_info.model,
- player_id: player_id of the player to handle the command.
"""
+ if existing_stream := self._stream_tasks.get(player_id):
+ existing_stream.cancel()
async def stop_player(airplay_player: AirPlayPlayer) -> None:
if airplay_player.active_stream:
self._resync_handle.cancel()
self._resync_handle = None
# always stop existing stream first
- await self.cmd_stop(player_id)
+ if existing_stream := self._stream_tasks.get(player_id):
+ existing_stream.cancel()
+ for airplay_player in self._get_sync_clients(player_id):
+ if airplay_player.active_stream and airplay_player.active_stream.running:
+ self.mass.create_task(airplay_player.active_stream.stop())
# start streaming the queue (pcm) audio in a background task
queue = self.mass.player_queues.get_active_queue(player_id)
self._stream_tasks[player_id] = asyncio.create_task(
self._resync_handle.cancel()
self._resync_handle = None
# always stop existing stream first
- await self.cmd_stop(player_id)
+ if existing_stream := self._stream_tasks.get(player_id):
+ existing_stream.cancel()
+ async with asyncio.TaskGroup() as tg:
+ for airplay_player in self._get_sync_clients(player_id):
+ if airplay_player.active_stream and airplay_player.active_stream.running:
+ tg.create_task(airplay_player.active_stream.stop())
if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 44100:
# TODO: resample on the fly here ?
raise RuntimeError("Unsupported PCM format")
async for pcm_chunk in audio_iterator:
# send audio chunk to player(s)
available_clients = 0
- async with asyncio.TaskGroup() as tg:
- for airplay_player in self._get_sync_clients(player_id):
- if (
- not airplay_player.active_stream
- or not airplay_player.active_stream.running
- or airplay_player.active_stream.start_ntp != start_ntp
- ):
- # catch when this stream is no longer active on the player
- continue
- available_clients += 1
- tg.create_task(airplay_player.active_stream.write_chunk(pcm_chunk))
- # send the progress report every 5 seconds
- now = time.time()
- if now - prev_progress_report >= 5:
- prev_progress_report = now
- tg.create_task(
- airplay_player.active_stream.send_cli_command(
- f"PROGRESS={int(queue.elapsed_time)}\n"
- )
- )
+ for airplay_player in self._get_sync_clients(player_id):
+ if (
+ not airplay_player.active_stream
+ or not airplay_player.active_stream.running
+ or airplay_player.active_stream.start_ntp != start_ntp
+ ):
+ # catch when this stream is no longer active on the player
+ continue
+ available_clients += 1
+ await airplay_player.active_stream.write_chunk(pcm_chunk)
if not available_clients:
# this streamjob is no longer active
return
# send metadata to player(s) if needed
# NOTE: this must all be done in separate tasks to not disturb audio
+ now = time.time()
if queue and queue.current_item and queue.current_item.streamdetails:
metadata_checksum = (
queue.current_item.streamdetails.stream_title
)
if prev_metadata_checksum != metadata_checksum:
prev_metadata_checksum = metadata_checksum
+ prev_progress_report = now
self.mass.create_task(self._send_metadata(player_id, queue))
+ # send the progress report every 5 seconds
+ elif now - prev_progress_report >= 5:
+ prev_progress_report = now
+ self.mass.create_task(self._send_progress(player_id, queue))
# end of stream reached - write eof
+ self.logger.debug(
+ "Finished RAOP stream for Queue %s to %s",
+ queue.display_name,
+ "/".join(synced_player_ids),
+ )
for airplay_player in self._get_sync_clients(player_id):
if (
not airplay_player.active_stream
group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True)
group_leader.group_childs.remove(player_id)
player.synced_to = None
+ # guard if this was the last sync child of the group player
+ if group_leader.group_childs == {group_leader.player_id}:
+ group_leader.group_childs.remove(group_leader.player_id)
await self.cmd_stop(player_id)
self.mass.players.update(player_id)
self, player_id: str, display_name: str, info: AsyncServiceInfo
) -> None:
"""Handle setup of a new player that is discovered using mdns."""
- address = info.parsed_addresses()[0]
- # some guards if our info is valid/complete
- if address == "127.0.0.1":
+ address = get_primary_ip_address(info)
+ if address is None:
return
+ # some guards if our info is valid/complete
if "md" not in info.decoded_properties:
return
if "et" not in info.decoded_properties:
return
self.logger.debug("Discovered Airplay device %s on %s", display_name, address)
self._players[player_id] = AirPlayPlayer(
- player_id, discovery_info=info, logger=self.logger.getChild(player_id)
+ player_id, discovery_info=info, address=address, logger=self.logger.getChild(player_id)
)
manufacturer, model = get_model_from_am(info.decoded_properties.get("am"))
if "apple tv" in model.lower():
player_id = airplay_player.player_id
mass_player = self.mass.players.get(player_id)
+ active_queue = self.mass.player_queues.get_active_queue(player_id)
if path == "/ctrl-int/1/nextitem":
- self.mass.create_task(self.mass.player_queues.next(player_id))
+ self.mass.create_task(self.mass.player_queues.next(active_queue.queue_id))
elif path == "/ctrl-int/1/previtem":
- self.mass.create_task(self.mass.player_queues.previous(player_id))
+ self.mass.create_task(self.mass.player_queues.previous(active_queue.queue_id))
elif path == "/ctrl-int/1/play":
- self.mass.create_task(self.mass.player_queues.play(player_id))
+ self.mass.create_task(self.mass.player_queues.play(active_queue.queue_id))
elif path == "/ctrl-int/1/playpause":
- self.mass.create_task(self.mass.player_queues.play_pause(player_id))
+ self.mass.create_task(self.mass.player_queues.play_pause(active_queue.queue_id))
elif path == "/ctrl-int/1/stop":
- self.mass.create_task(self.cmd_stop(player_id))
+ self.mass.create_task(self.mass.player_queues.stop(active_queue.queue_id))
elif path == "/ctrl-int/1/volumeup":
self.mass.create_task(self.mass.players.cmd_volume_up(player_id))
elif path == "/ctrl-int/1/volumedown":
elif path == "/ctrl-int/1/shuffle_songs":
queue = self.mass.player_queues.get(player_id)
self.mass.create_task(
- self.mass.player_queues.set_shuffle(player_id, not queue.shuffle_enabled)
+ self.mass.player_queues.set_shuffle(
+ active_queue.queue_id, not queue.shuffle_enabled
+ )
)
elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"):
- self.mass.create_task(self.mass.player_queues.pause(player_id))
+ self.mass.create_task(self.mass.player_queues.pause(active_queue.queue_id))
elif "dmcp.device-volume=" in path:
raop_volume = float(path.split("dmcp.device-volume=", 1)[-1])
volume = convert_airplay_volume(raop_volume)
album = _album.name
cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n"
- cmd += f"DURATION={duration}\nACTION=SENDMETA\n"
+ cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n"
for airplay_player in self._get_sync_clients(player_id):
- if not airplay_player.active_stream:
+ if not airplay_player.active_stream or not airplay_player.active_stream.running:
continue
await airplay_player.active_stream.send_cli_command(cmd)
queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg"
)
for airplay_player in self._get_sync_clients(player_id):
- if not airplay_player.active_stream:
+ if not airplay_player.active_stream or not airplay_player.active_stream.running:
continue
await airplay_player.active_stream.send_cli_command(f"ARTWORK={image_url}\n")
+
+ async def _send_progress(self, player_id: str, queue: PlayerQueue) -> None:
+ """Send progress report to player (and connected sync childs)."""
+ if not queue or not queue.current_item:
+ return
+ progress = int(queue.corrected_elapsed_time)
+ for airplay_player in self._get_sync_clients(player_id):
+ if not airplay_player.active_stream or not airplay_player.active_stream.running:
+ continue
+ await airplay_player.active_stream.send_cli_command(f"PROGRESS={progress}\n")