from collections.abc import Callable, Coroutine
from typing import TYPE_CHECKING, Any
-import defusedxml.ElementTree as ET # noqa: N817
from soco import SoCoException
from soco.core import (
MUSIC_SRC_AIRPLAY,
def _handle_zone_group_topology_event(self, event: SonosEvent) -> None:
"""Handle callback for topology change event."""
- if xml := event.variables.get("zone_group_state"):
- zgs = ET.fromstring(xml)
- vanished_devices = zgs.find("VanishedDevices")
- if vanished_devices is not None:
- for vanished_device in vanished_devices:
- if (reason := vanished_device.get("Reason")) not in SUPPORTED_VANISH_REASONS:
- self.logger.debug(
- "Ignoring %s marked %s as vanished with reason: %s",
- self.zone_name,
- vanished_device.get("ZoneName"),
- reason,
- )
- continue
- self.mass.create_task(self._vanished(reason))
-
if "zone_player_uui_ds_in_group" not in event.variables:
return
asyncio.run_coroutine_threadsafe(self.create_update_groups_coro(event), self.mass.loop)
- async def _vanished(self, reason: str) -> None:
- """Handle removal of speaker when marked as vanished."""
- if not self.available:
- return
- self.logger.debug("%s has vanished (%s), marking unavailable", self.zone_name, reason)
- await self.offline()
-
async def _rebooted(self) -> None:
"""Handle a detected speaker reboot."""
self.logger.debug("%s rebooted, reconnecting", self.zone_name)