import logging
import pathlib
import random
+import re
import socket
import time
from contextlib import suppress
from typing import TYPE_CHECKING, Final, cast
+from bidict import bidict
from snapcast.control import create_server
from snapcast.control.client import Snapclient
from zeroconf import NonUniqueNameException
_use_builtin_server: bool
_snapserver_runner: asyncio.Task | None
_snapserver_started = asyncio.Event | None
+ _ids_map = bidict # ma_id / snapclient_id
+
+ def _get_snapclient_id(self, player_id: str) -> str:
+ search_dict = self._ids_map
+ return search_dict.get(player_id)
+
+ def _get_ma_id(self, snap_client_id: str) -> str:
+ search_dict = self._ids_map.inverse
+ return search_dict.get(snap_client_id)
+
+ def _generate_and_register_id(self, snap_client_id) -> str:
+ search_dict = self._ids_map.inverse
+ if snap_client_id not in search_dict:
+ new_id = "ma_" + str(re.sub(r"\W+", "", snap_client_id))
+ self._ids_map[new_id] = snap_client_id
+ return new_id
+ else:
+ return self._get_ma_id(snap_client_id)
+
+ def _can_sync_with(self, snap_client: Snapclient) -> dict:
+ return tuple(
+ self._get_ma_id(x.identifier)
+ for x in self._snapserver.clients
+ if x.identifier != snap_client.identifier and x.connected
+ )
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
self._snapcast_server_control_port = self.config.get_value(CONF_SERVER_CONTROL_PORT)
self._stream_tasks = {}
+ self._ids_map = bidict({})
if self._use_builtin_server:
# start our own builtin snapserver
"""Call (by config manager) when the configuration of a player is removed."""
super().on_player_config_removed(player_id)
if self._use_builtin_server:
- self.mass.create_task(self._snapserver.delete_client(player_id))
+ self.mass.create_task(
+ self._snapserver.delete_client(self._get_snapclient_id(player_id))
+ )
def _handle_update(self) -> None:
"""Process Snapcast init Player/Group and set callback ."""
def _handle_player_init(self, snap_client: Snapclient) -> None:
"""Process Snapcast add to Player controller."""
- player_id = snap_client.identifier
+ player_id = self._generate_and_register_id(snap_client.identifier)
player = self.mass.players.get(player_id, raise_unavailable=False)
if not player:
- snap_client = cast(Snapclient, self._snapserver.client(player_id))
+ snap_client = cast(
+ Snapclient, self._snapserver.client(self._get_snapclient_id(player_id))
+ )
player = Player(
player_id=player_id,
provider=self.instance_id,
def _handle_player_update(self, snap_client: Snapclient) -> None:
"""Process Snapcast update to Player controller."""
- player_id = snap_client.identifier
+ player_id = self._get_ma_id(snap_client.identifier)
player = self.mass.players.get(player_id)
player.name = snap_client.friendly_name
player.volume_level = snap_client.volume
player.volume_muted = snap_client.muted
player.available = snap_client.connected
- player.can_sync_with = tuple(
- x.identifier
- for x in self._snapserver.clients
- if x.identifier != player_id and x.connected
- )
+ player.can_sync_with = self._can_sync_with(snap_client)
player.synced_to = self._synced_to(player_id)
player.group_childs = self._group_childs(player_id)
if player.active_group is None:
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
+ snap_client_id = self._get_snapclient_id(player_id)
await self._snapserver.client_volume(
- player_id, {"percent": volume_level, "muted": volume_level == 0}
+ snap_client_id, {"percent": volume_level, "muted": volume_level == 0}
)
async def cmd_stop(self, player_id: str) -> None:
async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
"""Send MUTE command to given player."""
- await self._snapserver.client(player_id).set_muted(muted)
+ snap_client_id = self._get_snapclient_id(player_id)
+ await self._snapserver.client(snap_client_id).set_muted(muted)
async def cmd_sync(self, player_id: str, target_player: str) -> None:
"""Sync Snapcast player."""
group = self._get_snapgroup(target_player)
- await group.add_client(player_id)
+ await group.add_client(self._get_snapclient_id(player_id))
async def cmd_unsync(self, player_id: str) -> None:
"""Unsync Snapcast player."""
+ snap_client_id = self._get_snapclient_id(player_id)
group = self._get_snapgroup(player_id)
- await group.remove_client(player_id)
+ await group.remove_client(snap_client_id)
# assign default/empty stream to the player
await self._get_snapgroup(player_id).set_stream("default")
self._handle_update()
def _get_snapgroup(self, player_id: str) -> Snapgroup:
"""Get snapcast group for given player_id."""
- client: Snapclient = self._snapserver.client(player_id)
+ snap_client_id = self._get_snapclient_id(player_id)
+ client: Snapclient = self._snapserver.client(snap_client_id)
return client.group
def _get_snapstream(self, player_id: str) -> Snapstream | None:
def _synced_to(self, player_id: str) -> str | None:
"""Return player_id of the player this player is synced to."""
snap_group = self._get_snapgroup(player_id)
- if player_id != snap_group.clients[0]:
- return snap_group.clients[0]
+ if player_id != self._get_ma_id(snap_group.clients[0]):
+ return self._get_ma_id(snap_group.clients[0])
return None
def _group_childs(self, player_id: str) -> set[str]:
"""Return player_ids of the players synced to this player."""
snap_group = self._get_snapgroup(player_id)
- return {snap_client for snap_client in snap_group.clients if snap_client != player_id}
+ return {
+ self._get_ma_id(snap_client)
+ for snap_client in snap_group.clients
+ if snap_client != player_id
+ }
async def _create_stream(self) -> tuple[Snapstream, int]:
"""Create new stream on snapcast server."""