import socket
import time
from random import randrange
-from typing import TYPE_CHECKING
+from typing import cast
from music_assistant_models.config_entries import ConfigEntry
from music_assistant_models.enums import (
CONF_ENTRY_SYNC_ADJUST,
create_sample_rates_config_entry,
)
-from music_assistant.helpers.audio import get_ffmpeg_stream
from music_assistant.helpers.datetime import utc
+from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
from music_assistant.helpers.process import check_output
from music_assistant.helpers.util import TaskManager, get_ip_pton, lock, select_free_port
from music_assistant.models.player_provider import PlayerProvider
from music_assistant.providers.airplay.raop import RaopStreamSession
+from music_assistant.providers.player_group import PlayerGroupProvider
from .const import (
AIRPLAY_FLOW_PCM_FORMAT,
)
from .player import AirPlayPlayer
-if TYPE_CHECKING:
- from music_assistant.providers.player_group import PlayerGroupProvider
-
-
PLAYER_CONFIG_ENTRIES = (
CONF_ENTRY_FLOW_MODE_ENFORCED,
CONF_ENTRY_CROSSFADE,
class AirplayProvider(PlayerProvider):
"""Player provider for Airplay based players."""
- cliraop_bin: str | None = None
+ cliraop_bin: str | None
_players: dict[str, AirPlayPlayer]
- _dacp_server: asyncio.Server = None
- _dacp_info: AsyncServiceInfo = None
+ _dacp_server: asyncio.Server
+ _dacp_info: AsyncServiceInfo
@property
def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.SYNC_PLAYERS,)
+ return {ProviderFeature.SYNC_PLAYERS}
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
) -> None:
"""Handle MDNS service state callback."""
+ if not info:
+ return
if "@" in name:
raw_id, display_name = name.split(".")[0].split("@", 1)
- elif "deviceid" in info.decoded_properties:
- raw_id = info.decoded_properties["deviceid"].replace(":", "")
+ elif deviceid := info.decoded_properties.get("deviceid"):
+ raw_id = deviceid.replace(":", "")
display_name = info.name.split(".")[0]
else:
return
- player_id: player_id of the player to handle the command.
"""
player = self.mass.players.get(player_id)
+ if not player:
+ return
if player.group_childs:
# pause is not supported while synced, use stop instead
self.logger.debug("Player is synced, using STOP instead of PAUSE")
) -> None:
"""Handle PLAY MEDIA on given player."""
player = self.mass.players.get(player_id)
+ if not player:
+ return
# set the active source for the player to the media queue
# this accounts for syncgroups and linked players (e.g. sonos)
player.active_source = media.queue_id
)
elif media.queue_id and media.queue_id.startswith("ugp_"):
# special case: UGP stream
- ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
+ ugp_provider = cast(PlayerGroupProvider, self.mass.get_provider("player_group"))
ugp_stream = ugp_provider.ugp_streams[media.queue_id]
input_format = ugp_stream.output_format
audio_source = ugp_stream.subscribe()
if airplay_player.raop_stream and airplay_player.raop_stream.running:
await airplay_player.raop_stream.send_cli_command(f"VOLUME={volume_level}\n")
mass_player = self.mass.players.get(player_id)
+ if not mass_player:
+ return
mass_player.volume_level = volume_level
mass_player.volume_muted = volume_level == 0
self.mass.players.update(player_id)
- player_id: player_id of the player to handle the command.
"""
mass_player = self.mass.players.get(player_id, raise_unavailable=True)
- if not mass_player.synced_to:
+ if not mass_player or not mass_player.synced_to:
return
ap_player = self._players[player_id]
if ap_player.raop_stream and ap_player.raop_stream.running:
await ap_player.raop_stream.session.remove_client(ap_player)
group_leader = self.mass.players.get(mass_player.synced_to, raise_unavailable=True)
+ assert group_leader
if player_id in group_leader.group_childs:
group_leader.group_childs.remove(player_id)
mass_player.synced_to = None
airplay_player = self._players.get(player_id)
- await airplay_player.cmd_stop()
+ if airplay_player:
+ await airplay_player.cmd_stop()
# make sure that the player manager gets an update
self.mass.players.update(mass_player.player_id, skip_forward=True)
self.mass.players.update(group_leader.player_id, skip_forward=True)
- async def _getcliraop_binary(self):
+ async def _getcliraop_binary(self) -> str:
"""Find the correct raop/airplay binary belonging to the platform."""
# ruff: noqa: SIM102
if self.cliraop_bin is not None:
self.cliraop_bin = cliraop_path
return cliraop_path
except OSError:
- return None
+ pass
+ return None
base_path = os.path.join(os.path.dirname(__file__), "bin")
system = platform.system().lower().replace("darwin", "macos")
def _get_sync_clients(self, player_id: str) -> list[AirPlayPlayer]:
"""Get all sync clients for a player."""
mass_player = self.mass.players.get(player_id, True)
+ assert mass_player
sync_clients: list[AirPlayPlayer] = []
# we need to return the player itself too
group_child_ids = {player_id}
else:
headers_raw = request
body = ""
- headers_raw = headers_raw.split("\r\n")
+ headers_split = headers_raw.split("\r\n")
headers = {}
- for line in headers_raw[1:]:
+ for line in headers_split[1:]:
if ":" not in line:
continue
x, y = line.split(":", 1)
headers[x.strip()] = y.strip()
active_remote = headers.get("Active-Remote")
- _, path, _ = headers_raw[0].split(" ")
+ _, path, _ = headers_split[0].split(" ")
airplay_player = next(
(
x
player_id = airplay_player.player_id
mass_player = self.mass.players.get(player_id)
+ if not mass_player:
+ return
active_queue = self.mass.player_queues.get_active_queue(player_id)
if path == "/ctrl-int/1/nextitem":
self.mass.create_task(self.mass.player_queues.next(active_queue.queue_id))
self.mass.create_task(self.mass.players.cmd_volume_down(player_id))
elif path == "/ctrl-int/1/shuffle_songs":
queue = self.mass.player_queues.get(player_id)
- self.mass.loop.call_soon(
- self.mass.player_queues.set_shuffle(
- active_queue.queue_id, not queue.shuffle_enabled
- )
+ if not queue:
+ return
+ self.mass.player_queues.set_shuffle(
+ active_queue.queue_id, not queue.shuffle_enabled
)
elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"):
# sometimes this request is sent by a device as confirmation of a play command
finally:
writer.close()
- async def monitor_prevent_playback(self, player_id: str):
+ async def monitor_prevent_playback(self, player_id: str) -> None:
"""Monitor the prevent playback state of an airplay player."""
count = 0
if not (airplay_player := self._players.get(player_id)):
return
+ if not airplay_player.raop_stream:
+ return
prev_active_remote_id = airplay_player.raop_stream.active_remote_id
while count < 40:
count += 1
self.input_format = input_format
self._sync_clients = sync_clients
self._audio_source = audio_source
- self._audio_source_task: asyncio.Task | None = None
+ self._audio_source_task: asyncio.Task[None] | None = None
self._stopped: bool = False
self._lock = asyncio.Lock()
return
async with self._lock:
await asyncio.gather(
- *[x.raop_stream.write_chunk(chunk) for x in self._sync_clients],
+ *[
+ x.raop_stream.write_chunk(chunk)
+ for x in self._sync_clients
+ if x.raop_stream
+ ],
return_exceptions=True,
)
# entire stream consumed: send EOF
generator_exhausted = True
async with self._lock:
await asyncio.gather(
- *[x.raop_stream.write_eof() for x in self._sync_clients],
+ *[x.raop_stream.write_eof() for x in self._sync_clients if x.raop_stream],
return_exceptions=True,
)
finally:
await close_async_generator(self._audio_source)
# get current ntp and start RaopStream per player
+ assert self.prov.cliraop_bin
_, stdout = await check_output(self.prov.cliraop_bin, "-ntp")
start_ntp = int(stdout.strip())
wait_start = 1500 + (250 * len(self._sync_clients))
async with self._lock:
await asyncio.gather(
- *[x.raop_stream.start(start_ntp, wait_start) for x in self._sync_clients],
+ *[
+ x.raop_stream.start(start_ntp, wait_start)
+ for x in self._sync_clients
+ if x.raop_stream
+ ],
return_exceptions=True,
)
self._audio_source_task = asyncio.create_task(audio_streamer())
"""Remove a sync client from the session."""
if airplay_player not in self._sync_clients:
return
+ assert airplay_player.raop_stream
assert airplay_player.raop_stream.session == self
async with self._lock:
self._sync_clients.remove(airplay_player)
# with the named pipe used to send audio
self.active_remote_id: str = str(randint(1000, 8000))
self.prevent_playback: bool = False
- self._log_reader_task: asyncio.Task | None = None
+ self._log_reader_task: asyncio.Task[None] | asyncio.Future[None] | None = None
self._cliraop_proc: AsyncProcess | None = None
self._ffmpeg_proc: AsyncProcess | None = None
self._started = asyncio.Event()
extra_args = []
player_id = self.airplay_player.player_id
mass_player = self.mass.players.get(player_id)
+ if not mass_player:
+ return
bind_ip = await self.mass.config.get_provider_config_value(
self.prov.instance_id, CONF_BIND_INTERFACE
)
self._started.set()
self._log_reader_task = self.mass.create_task(self._log_watcher())
- async def stop(self):
+ async def stop(self) -> None:
"""Stop playback and cleanup."""
if self._stopped:
return
+ if not self._cliraop_proc:
+ return
if self._cliraop_proc.proc and not self._cliraop_proc.closed:
await self.send_cli_command("ACTION=STOP")
self._stopped = True # set after send_cli command!
if self._stopped:
return
await self._started.wait()
+ assert self._ffmpeg_proc
await self._ffmpeg_proc.write(chunk)
async def write_eof(self) -> None:
if self._stopped:
return
await self._started.wait()
+ assert self._ffmpeg_proc
await self._ffmpeg_proc.write_eof()
async def send_cli_command(self, command: str) -> None:
if not command.endswith("\n"):
command += "\n"
- def send_data():
+ def send_data() -> None:
with suppress(BrokenPipeError), open(named_pipe, "w") as f:
f.write(command)
"""Monitor stderr for the running CLIRaop process."""
airplay_player = self.airplay_player
mass_player = self.mass.players.get(airplay_player.player_id)
+ if not mass_player:
+ return
queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
logger = airplay_player.logger
lost_packets = 0
prev_metadata_checksum: str = ""
prev_progress_report: float = 0
+ if not self._cliraop_proc:
+ return
async for line in self._cliraop_proc.iter_stderr():
if "elapsed milliseconds:" in line:
# this is received more or less every second while playing