from random import randint, randrange
from typing import TYPE_CHECKING, cast
-from pyatv import connect, exceptions, interface, scan
+from pyatv import connect, interface, scan
from pyatv.conf import AppleTV as ATVConf
from pyatv.const import DeviceModel, DeviceState, PowerState, Protocol
from pyatv.convert import model_str
from pyatv.interface import AppleTV as AppleTVInterface
from pyatv.interface import DeviceListener
from pyatv.protocols.raop import RaopStream
-from zeroconf import ServiceInfo
+from zeroconf.asyncio import AsyncServiceInfo
from music_assistant.common.helpers.datetime import utc
from music_assistant.common.helpers.util import get_ip_pton, select_free_port
self.discovery_info = discovery_info
self.mass = mass
self.atv: AppleTVInterface | None = None
- self.is_on = False
self.connected = False
self._connection_attempts = 0
self._connection_was_lost = False
if self.atv:
self.atv.close()
self.atv = None
- self._start_connect_loop()
async def connect(self):
"""Connect to device."""
- self.is_on = True
if self.connected:
return
- self._start_connect_loop()
- await asyncio.sleep(2)
+ try:
+ await self._connect(self.discovery_info)
+ except Exception:
+ # retry with scanning for the device
+ if conf := await self._scan():
+ await self._connect(conf)
+ raise
async def disconnect(self):
"""Disconnect from device."""
self.logger.debug("sending command %s", command)
await self.mass.create_task(send_data)
- def _start_connect_loop(self):
- """Start background connect loop to device."""
- if not self._task and self.atv is None and self.is_on:
- self._task = asyncio.create_task(self._connect_loop())
- else:
- self.logger.debug("Not starting connect loop (%s, %s)", self.atv is None, self.is_on)
-
- async def connect_once(self) -> None:
- """Try to connect once."""
- try:
- if conf := await self._scan():
- await self._connect(conf)
- except exceptions.AuthenticationError:
- await self.disconnect()
- self.logger.exception(
- "Authentication failed for %s, try reconfiguring device",
- self.discovery_info.name,
- )
- return
- except asyncio.CancelledError:
- pass
- except Exception: # pylint: disable=broad-except
- self.logger.exception("Failed to connect")
- self.atv = None
-
- async def _connect_loop(self):
- """Connect loop background task function."""
- self.logger.debug("Starting connect loop")
-
- # Try to find device and connect as long as the user has said that
- # we are allowed to connect and we are not already connected.
- while self.is_on and self.atv is None:
- await self.connect_once()
- if self.atv is not None:
- break
- self._connection_attempts += 1
- backoff = min(
- max(
- BACKOFF_TIME_LOWER_LIMIT,
- randrange(2**self._connection_attempts),
- ),
- BACKOFF_TIME_UPPER_LIMIT,
- )
-
- self.logger.debug("Reconnecting in %d seconds", backoff)
- await asyncio.sleep(backoff)
-
- self.logger.debug("Connect loop ended")
- self._task = None
-
async def _scan(self) -> ATVConf | None:
"""Try to find device by scanning for it."""
address: str = self.discovery_info.address
atvs = await scan(
self.mass.loop,
identifier=self.discovery_info.identifier,
+ aiozc=self.mass.aiozc,
hosts=[address],
)
if atvs:
self.discovery_info.name,
address,
)
- # We no longer multicast scan for the device since as soon as async_step_zeroconf runs,
- # it will update the address and reload the config entry when the device is found.
return None
async def _connect(self, conf: ATVConf) -> None:
self.logger.debug("Connecting to device %s", name)
session = self.mass.http_session
-
self.atv = await connect(conf, self.mass.loop, session=session)
self.connected = True
self.atv.power.listener = self
self._connection_attempts = 0
if self._connection_was_lost:
self.logger.info(
- 'Connection was re-established to device "%s"',
+ 'Connection was (re)established to device "%s"',
name,
)
self._connection_was_lost = False
"""Update the player attributes."""
mass_player = self.mass.players.get(self.player_id)
mass_player.volume_level = int(self.atv.audio.volume)
- mass_player.powered = self.is_on
+ mass_player.powered = self.connected or self.cliraop_proc and not self.cliraop_proc.closed
if self.cliraop_proc and not self.cliraop_proc.closed:
mass_player.state = self.optimistic_state
# NOTE: alapsed time is pushed from cliraop
_cliraop_bin: str | None = None
_stream_tasks: dict[str, asyncio.Task]
_dacp_server: asyncio.Server = None
- _dacp_info: ServiceInfo = None
+ _dacp_info: AsyncServiceInfo = None
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
)
zeroconf_type = "_dacp._tcp.local."
server_id = f"iTunes_Ctrl_{dacp_id}.{zeroconf_type}"
- self._dacp_info = ServiceInfo(
+ self._dacp_info = AsyncServiceInfo(
zeroconf_type,
name=server_id,
addresses=[await get_ip_pton(self.mass.streams.publish_ip)],
},
server=f"{socket.gethostname()}.local",
)
- await self.mass.zeroconf.async_register_service(self._dacp_info)
+ await self.mass.aiozc.async_register_service(self._dacp_info)
async def loaded_in_mass(self) -> None:
"""Call after the provider has been loaded."""
self._dacp_server.close()
# shutdown DACP zeroconf service
if self._dacp_info:
- await self.mass.zeroconf.async_unregister_service(self._dacp_info)
+ await self.mass.aiozc.async_unregister_service(self._dacp_info)
async def _handle_dacp_request( # noqa: PLR0915
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
# stop existing streams first
await self.cmd_stop(player_id)
# power on player if needed
+ await self.cmd_power(player_id, True)
# start streaming the queue (pcm) audio in a background task
queue = self.mass.player_queues.get_active_queue(player_id)
self._stream_tasks[player_id] = asyncio.create_task(
async with asyncio.TaskGroup() as tg:
for atv_player in self._get_sync_clients(player_id):
if not atv_player.atv:
- # just in case...
+ # should not be possible, but just in case...
await atv_player.connect()
tg.create_task(self._init_cliraop(atv_player, ntp))
prev_metadata_checksum: str = ""
sync_clients.append(client)
return sync_clients
- async def _init_cliraop(self, atv_player: AirPlayPlayer, ntp: int) -> None:
+ async def _init_cliraop(self, atv_player: AirPlayPlayer, ntp: int) -> None: # noqa: PLR0915
"""Initiatlize CLIRaop process for a player."""
stream: RaopStream | None = next(
(x for x in atv_player.atv.stream.instances if isinstance(x, RaopStream)), None
# if we reach this point, the process exited
if cliraop_proc._proc.returncode is not None:
cliraop_proc.closed = True
+ atv_player.optimistic_state = PlayerState.IDLE
+ atv_player.update_attributes()
logger.debug(
"CLIRaop process stopped with errorcode %s",
cliraop_proc._proc.returncode,
if self.mass.config.get_raw_player_config_value(
atv_player.player_id, CONF_ENCRYPTION, False
):
- extra_args += ["-et"]
+ extra_args += ["-u"]
if self.mass.config.get_raw_player_config_value(
atv_player.player_id, CONF_ALAC_ENCODE, False
):
import aiofiles
from aiohttp import ClientSession, TCPConnector
-from zeroconf import InterfaceChoice, NonUniqueNameException, ServiceInfo, Zeroconf
+from zeroconf import IPVersion, NonUniqueNameException, ServiceStateChange, Zeroconf
+from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf
from music_assistant.common.helpers.util import get_ip_pton
from music_assistant.common.models.api import ServerInfoMessage
loop: asyncio.AbstractEventLoop
http_session: ClientSession
- zeroconf: Zeroconf
+ aiozc: AsyncZeroconf
+ aiobrowser: AsyncServiceBrowser
config: ConfigController
webserver: WebserverController
cache: CacheController
self.running_as_hass_addon = await is_hass_supervisor()
self.version = await get_package_version("music_assistant")
# create shared zeroconf instance
- self.zeroconf = Zeroconf(interfaces=InterfaceChoice.All)
+ # TODO: enumerate interfaces and enable IPv6 support
+ self.aiozc = AsyncZeroconf(ip_version=IPVersion.V4Only)
+ # self.aiobrowser = AsyncServiceBrowser(
+ # self.aiozc.zeroconf,
+ # [],
+ # handlers=[self._on_mdns_service_state_change],
+ # )
# create shared aiohttp ClientSession
self.http_session = ClientSession(
loop=self.loop,
"""Make this Music Assistant instance discoverable on the network."""
zeroconf_type = "_mass._tcp.local."
server_id = self.server_id
-
- info = ServiceInfo(
+ # register MA on mdns to be discovered
+ LOGGER.debug("Starting Zeroconf broadcast...")
+ info = AsyncServiceInfo(
zeroconf_type,
name=f"{server_id}.{zeroconf_type}",
addresses=[await get_ip_pton(self.webserver.publish_ip)],
properties=self.get_server_info().to_dict(),
server="mass.local.",
)
- LOGGER.debug("Starting Zeroconf broadcast...")
try:
existing = getattr(self, "mass_zc_service_set", None)
if existing:
- await self.zeroconf.async_update_service(info)
+ await self.aiozc.async_update_service(info)
else:
- await self.zeroconf.async_register_service(info)
+ await self.aiozc.async_register_service(info)
self.mass_zc_service_set = True
except NonUniqueNameException:
LOGGER.exception(
"Music Assistant instance with identical name present in the local network!"
)
+ def _on_mdns_service_state_change(
+ self,
+ zeroconf: Zeroconf, # pylint: disable=unused-argument
+ service_type: str,
+ name: str,
+ state_change: ServiceStateChange,
+ ) -> None:
+ """Handle MDNS service state callback."""
+
async def __aenter__(self) -> Self:
"""Return Context manager."""
await self.start()