# active_source: return player_id of the active queue for this player
# if the player is grouped and a group is active, this will be set to the group's player_id
# otherwise it will be set to the own player_id
- active_source: str = ""
+ active_source: str | None = None
# current_item_id: return item_id/uri of the current active/loaded item on the player
# this may be a MA queue_item_id, url, uri or some provider specific string
"""
Set (raw) single config(entry) value for a provider.
- Note that this only returns the stored value without any validation or default.
+ Note that this only stores the (raw) value without any validation or default.
+ """
+ self.set(f"{CONF_PROVIDERS}/{provider_instance}/{key}", value)
+
+ def set_raw_player_config_value(self, player_id: str, key: str, value: ConfigValueType) -> None:
+ """
+ Set (raw) single config(entry) value for a player.
+
+ Note that this only stores the (raw) value without any validation or default.
"""
- return self.set(f"{CONF_PROVIDERS}/{provider_instance}/{key}", value)
+ self.set(f"{CONF_PLAYERS}/{player_id}/values/{key}", value)
def save(self, immediate: bool = False) -> None:
"""Schedule save of data to disk."""
)
from music_assistant.common.models.errors import (
AlreadyRegisteredError,
- PlayerCommandFailed,
PlayerUnavailableError,
ProviderUnavailableError,
UnsupportedFeaturedException,
# forward to player provider
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_power(player_id, powered)
+ else:
+ # allow the stop command to process and prevent race conditions
+ await asyncio.sleep(0.2)
# always optimistically set the power state to update the UI
# as fast as possible and prevent race conditions
player.powered = powered
raise UnsupportedFeaturedException(
f"Player {parent_player.name} does not support (un)sync commands"
)
- if player_id not in parent_player.can_sync_with:
- raise PlayerCommandFailed(f"Player {player_id} can not be synced to {target_player}.")
if child_player.synced_to:
if child_player.synced_to == parent_player.player_id:
# nothing to do: already synced to this parent
"""Return the active_source id for given player."""
# if player is synced, return group leader's active source
if player.synced_to and (parent_player := self.get(player.synced_to)):
- return self._get_active_source(parent_player)
+ return parent_player.player_id
if active_player_group := self._get_active_player_group(player):
return active_player_group.player_id
# defaults to the player's own player id if not active source set
return player
def get_sync_leader(self, group_player: Player) -> Player | None:
- """Get the sync leader player for a syncgroup or synced player."""
+ """Get the active sync leader player for a syncgroup or synced player."""
if group_player.synced_to:
# should not happen but just in case...
return group_player.synced_to
for child_player in self.iter_group_members(
group_player, only_powered=True, only_playing=False
):
- if not child_player.group_childs:
+ if child_player.synced_to and child_player.synced_to in group_player.group_childs:
+ return self.get(child_player.synced_to)
+ elif child_player.synced_to:
+ # player is already synced to a member outside this group ?!
continue
- return child_player
+ elif child_player.group_childs:
+ return child_player
return None
async def _sync_syncgroup(self, player_id: str) -> None:
device_info=DeviceInfo(model="SyncGroup", manufacturer=provider.title()),
supported_features=first_player.supported_features,
group_childs=set(members),
+ active_source=group_player_id,
)
self.mass.players.register_or_update(player)
return player
self.expected_players: set[str] = set()
self.subscribed_players: dict[str, asyncio.Queue[bytes]] = {}
self.bytes_streamed: int = 0
- self.client_seconds_skipped: dict[str, int] = {}
self._all_clients_connected = asyncio.Event()
self.logger = stream_controller.logger.getChild(f"streamjob_{self.job_id}")
self._finished: bool = False
- self._first_chunk: bytes = b""
+ self.workaround_players_seen: set[str] = set()
+ # start running the audio task in the background
+ self._audio_task = asyncio.create_task(self._stream_job_runner())
@property
def finished(self) -> bool:
"""Return if this Job is running."""
return not self.finished and not self.pending
- def start(self) -> None:
- """Start running this streamjob."""
- # start running the audio task in the background
- self._audio_task = asyncio.create_task(self._stream_job_runner())
-
def stop(self) -> None:
"""Stop running this job."""
self._finished = True
async def subscribe(self, player_id: str) -> AsyncGenerator[bytes, None]:
"""Subscribe consumer and iterate incoming chunks on the queue."""
+ if (
+ player_id in self.stream_controller.workaround_players
+ and player_id not in self.workaround_players_seen
+ ):
+ self.workaround_players_seen.add(player_id)
+ yield b""
+ return
+
try:
self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2)
- if self._first_chunk:
- yield self._first_chunk
-
if self._all_clients_connected.is_set():
# client subscribes while we're already started
- self.logger.debug(
+ self.logger.warning(
"Client %s is joining while the stream is already started", player_id
)
- # calculate how many seconds the client missed so far
- self.client_seconds_skipped[player_id] = (
- self.bytes_streamed / self.pcm_format.pcm_sample_size
- )
else:
self.logger.debug("Subscribed client %s", player_id)
await self._put_chunk(chunk)
- # keep first chunk to workaround (dlna) players that do multiple get requests
- if chunk_num == 1:
- self._first_chunk = chunk
- await asyncio.sleep(0.1)
-
# mark EOF with empty chunk
await self._put_chunk(b"")
"some player specific local control callbacks."
)
self.manifest.icon = "cast-audio"
+ self.workaround_players: set[str] = set()
@property
def base_url(self) -> str:
"to the same stream, playback may be disturbed!",
child_player_id,
)
+ self.workaround_players.add(child_player_id)
# all checks passed, start streaming!
self.logger.debug(
auto_play=False,
)
)
- stream_job.start()
else:
# regular, single player playback
client = self._socket_clients[player_id]
-"""Sample Player provider for Music Assistant."""
+"""
+Sonos Player provider for Music Assistant.
+
+Note that large parts of this code are copied over from the Home Assistant
+integratioon for Sonos.
+"""
from __future__ import annotations
import asyncio
import logging
import time
+from collections import OrderedDict
from contextlib import suppress
-from typing import TYPE_CHECKING, Any
+from dataclasses import dataclass, field
+from typing import TYPE_CHECKING
-import soco
-from soco import config
-from soco.events_base import Event as SonosEvent
-from soco.events_base import SubscriptionBase
+import soco.config as soco_config
+from requests.exceptions import Timeout
+from soco import SoCoException, events_asyncio, zonegroupstate
+from soco.core import SoCo
+from soco.discovery import discover
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE,
ConfigEntryType,
ContentType,
PlayerFeature,
- PlayerState,
PlayerType,
ProviderFeature,
)
from music_assistant.server.helpers.didl_lite import create_didl_metadata
from music_assistant.server.models.player_provider import PlayerProvider
+from .player import SonosPlayer
+
if TYPE_CHECKING:
from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
-LOGGER = logging.getLogger(__name__)
PLAYER_FEATURES = (
PlayerFeature.SYNC,
)
CONF_NETWORK_SCAN = "network_scan"
+SUBSCRIPTION_TIMEOUT = 1200
+ZGS_SUBSCRIPTION_TIMEOUT = 2
-# set event listener port to something other than 1400
-# to allow coextistence with HA on the same host
-config.EVENT_LISTENER_PORT = 1700
HIRES_MODELS = (
"Sonos Roam",
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
) -> ProviderInstanceType:
"""Initialize provider(instance) with given configuration."""
+ # set event listener port to something other than 1400
+ # to allow coextistence with HA on the same host
+ soco_config.EVENT_LISTENER_PORT = 1700
+ soco_config.EVENTS_MODULE = events_asyncio
+ soco_config.REQUEST_TIMEOUT = 9.5
+ soco_config.ZGT_EVENT_FALLBACK = False
+ zonegroupstate.EVENT_CACHE_TIMEOUT = SUBSCRIPTION_TIMEOUT
+ # silence the soco logger a bit
+ logging.getLogger("soco").setLevel(logging.INFO)
+ logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO)
prov = SonosPlayerProvider(mass, manifest, config)
await prov.handle_setup()
return prov
)
-class SonosPlayer:
- """Wrapper around Sonos/SoCo with some additional attributes."""
-
- def __init__(self, sonos_prov: SonosPlayerProvider, soco_device: soco.SoCo) -> None:
- """Initialize SonosPlayer instance."""
- self.sonos_prov = sonos_prov
- self.player_id = soco_device.uid
- self.soco_device = soco_device
- self.is_stereo_pair: bool = False
- self.elapsed_time: int = 0
- self.playback_started: float | None = None
- self.need_elapsed_time_workaround: bool = False
- self.subscriptions: list[SubscriptionBase] = []
- self.transport_info: dict = {}
- self.track_info: dict = {}
- self.speaker_info: dict = {}
- self.rendering_control_info: dict = {}
- self.speaker_info_updated: float = 0.0
- self.transport_info_updated: float = 0.0
- self.track_info_updated: float = 0.0
- self.rendering_control_info_updated: float = 0.0
-
- def update_info(
- self,
- update_transport_info: bool = False,
- update_track_info: bool = False,
- update_speaker_info: bool = False,
- update_rendering_control_info: bool = False,
- ):
- """Poll all info from player (must be run in executor thread)."""
- # transport info
- if update_transport_info:
- transport_info = self.soco_device.get_current_transport_info()
- if transport_info.get("current_transport_state") != "TRANSITIONING":
- self.transport_info = transport_info
- self.transport_info_updated = time.time()
- # track info
- if update_track_info:
- self.track_info = self.soco_device.get_current_track_info()
- # sonos reports bullshit elapsed time while playing radio (or flow mode),
- # trying to be "smart" and resetting the counter when new ICY metadata is detected
- # we try to detect this and work around it
- self.need_elapsed_time_workaround = self.track_info["duration"] == "0:00:00"
- if not self.need_elapsed_time_workaround:
- self.elapsed_time = _timespan_secs(self.track_info["position"]) or 0
- self.track_info_updated = time.time()
-
- # speaker info
- if update_speaker_info:
- self.speaker_info = self.soco_device.get_speaker_info()
- self.speaker_info_updated = time.time()
- # rendering control info
- if update_rendering_control_info:
- self.rendering_control_info["volume"] = self.soco_device.volume
- self.rendering_control_info["mute"] = self.soco_device.mute
- self.rendering_control_info_updated = time.time()
-
- def update_attributes(self):
- """Update attributes of the MA Player from soco.SoCo state."""
- mass_player = self.sonos_prov.mass.players.get(self.player_id)
- if not mass_player:
- return
- now = time.time()
- # generic attributes (speaker_info)
- mass_player.available = True
- mass_player.name = self.speaker_info["zone_name"]
- mass_player.volume_level = int(self.rendering_control_info["volume"])
- mass_player.volume_muted = self.rendering_control_info["mute"]
-
- # transport info (playback state)
- current_transport_state = self.transport_info["current_transport_state"]
- mass_player.state = current_state = _convert_state(current_transport_state)
-
- if self.playback_started is not None and current_state == PlayerState.IDLE:
- self.playback_started = None
- elif self.playback_started is None and current_state == PlayerState.PLAYING:
- self.playback_started = now
- mass_player.powered = True
-
- # media info (track info)
- mass_player.current_item_id = self.track_info["uri"]
- if mass_player.player_id in mass_player.current_item_id:
- mass_player.active_source = mass_player.player_id
- elif "spotify" in mass_player.current_item_id:
- mass_player.active_source = "spotify"
- else:
- mass_player.active_source = self.soco_device.music_source_from_uri(
- self.track_info["uri"]
- )
- if not self.need_elapsed_time_workaround:
- mass_player.elapsed_time = self.elapsed_time
- mass_player.elapsed_time_last_updated = self.track_info_updated
-
- # zone topology (syncing/grouping) details
- if (
- self.soco_device.group
- and self.soco_device.group.coordinator
- and self.soco_device.group.coordinator.uid == self.player_id
- ):
- # this player is the sync leader
- mass_player.synced_to = None
- group_members = {x.uid for x in self.soco_device.group.members if x.is_visible}
- if not group_members:
- # not sure about this ?!
- mass_player.type = PlayerType.PLAYER
- elif group_members == {self.player_id}:
- mass_player.group_childs = set()
- else:
- mass_player.group_childs = group_members
- elif self.soco_device.group and self.soco_device.group.coordinator:
- # player is synced to
- mass_player.group_childs = set()
- mass_player.synced_to = self.soco_device.group.coordinator.uid
- else:
- # unsure
- mass_player.group_childs = set()
-
- async def check_poll(self) -> None:
- """Check if any of the endpoints needs to be polled for info."""
- cur_time = time.time()
- update_transport_info = (cur_time - self.transport_info_updated) > 30
- update_track_info = self.transport_info.get("current_transport_state") == "PLAYING" or (
- (cur_time - self.track_info_updated) > 300
- )
- update_speaker_info = (cur_time - self.speaker_info_updated) > 300
- update_rendering_control_info = (cur_time - self.rendering_control_info_updated) > 30
-
- if not (
- update_transport_info
- or update_track_info
- or update_speaker_info
- or update_rendering_control_info
- ):
- return
-
- await asyncio.to_thread(
- self.update_info,
- update_transport_info,
- update_track_info,
- update_speaker_info,
- update_rendering_control_info,
- )
-
- async def connect(self) -> None:
- """Handle (re)connect of the Sonos player."""
- # poll all endpoints once and update attributes
- self.speaker_info = await asyncio.to_thread(self.soco_device.get_speaker_info, True)
- self.speaker_info_updated = time.time()
- await self.check_poll()
- self.update_attributes()
-
- # handle subscriptions to events
- def subscribe(service, _callback):
- queue = ProcessSonosEventQueue(_callback)
- sub = service.subscribe(auto_renew=True, event_queue=queue)
- self.subscriptions.append(sub)
-
- subscribe(self.soco_device.avTransport, self._handle_av_transport_event)
- subscribe(self.soco_device.renderingControl, self._handle_rendering_control_event)
- subscribe(self.soco_device.zoneGroupTopology, self._handle_zone_group_topology_event)
-
- def disconnect(self) -> None:
- """Handle disconnect."""
- mass_player = self.sonos_prov.mass.players.get(self.player_id)
- mass_player.available = False
- LOGGER.debug("Unsubscribing from events for %s", mass_player.display_name)
- for subscription in self.subscriptions:
- subscription.unsubscribe()
- self.subscriptions = []
-
- async def reconnect(self, soco_device: soco.SoCo) -> None:
- """Handle reconnect."""
- if self.subscriptions:
- # handle reconnect
- self.disconnect()
- self.soco_device = soco_device
- await self.connect()
-
- def _handle_av_transport_event(self, event: SonosEvent):
- """Handle a soco.SoCo AVTransport event."""
- LOGGER.debug("Received AVTransport event for Player %s", self.soco_device.player_name)
-
- if "transport_state" in event.variables:
- new_state = event.variables["transport_state"]
- if new_state == "TRANSITIONING":
- return
- self.transport_info["current_transport_state"] = new_state
-
- if "current_track_uri" in event.variables:
- self.transport_info["uri"] = event.variables["current_track_uri"]
-
- self.transport_info_updated = time.time()
- asyncio.run_coroutine_threadsafe(
- self.sonos_prov.update_player(self), self.sonos_prov.mass.loop
- )
-
- def _handle_rendering_control_event(self, event: SonosEvent):
- """Handle a soco.SoCo RenderingControl event."""
- LOGGER.debug(
- "Received RenderingControl event for Player %s",
- self.soco_device.player_name,
- )
- if "volume" in event.variables:
- self.rendering_control_info["volume"] = event.variables["volume"]["Master"]
- if "mute" in event.variables:
- self.rendering_control_info["mute"] = event.variables["mute"]["Master"] == "1"
- self.rendering_control_info_updated = time.time()
- asyncio.run_coroutine_threadsafe(
- self.sonos_prov.update_player(self), self.sonos_prov.mass.loop
- )
+@dataclass
+class UnjoinData:
+ """Class to track data necessary for unjoin coalescing."""
- def _handle_zone_group_topology_event(self, event: SonosEvent): # noqa: ARG002
- """Handle a soco.SoCo ZoneGroupTopology event."""
- LOGGER.debug(
- "Received ZoneGroupTopology event for Player %s - members: %s",
- self.soco_device.player_name,
- "/".join([x.player_name for x in self.soco_device.group.members]),
- )
- asyncio.run_coroutine_threadsafe(
- self.sonos_prov.update_player(self), self.sonos_prov.mass.loop
- )
+ players: list[SonosPlayer]
+ event: asyncio.Event = field(default_factory=asyncio.Event)
class SonosPlayerProvider(PlayerProvider):
async def handle_setup(self) -> None:
"""Handle async initialization of the provider."""
- self.sonosplayers = {}
+ self.sonosplayers: OrderedDict[str, SonosPlayer] = OrderedDict()
+ self.topology_condition = asyncio.Condition()
+ self.discovery_known: set[str] = set()
+ self.boot_counts: dict[str, int] = {}
+ self.mdns_names: dict[str, str] = {}
+ self.unjoin_data: dict[str, UnjoinData] = {}
self._discovery_running = False
- # silence the soco logger a bit
- logging.getLogger("soco").setLevel(logging.INFO)
- logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO)
+ self.hosts_in_error: dict[str, bool] = {}
+ self.discovery_lock = asyncio.Lock()
+ self.creation_lock = asyncio.Lock()
+ self._known_invisible: set[SoCo] = set()
+
self.mass.create_task(self._run_discovery())
async def unload(self) -> None:
# await any in-progress discovery
while self._discovery_running:
await asyncio.sleep(0.5)
- # cleanup players
- if self.sonosplayers:
- for player_id in list(self.sonosplayers):
- player = self.sonosplayers.pop(player_id)
- player.disconnect()
+ await asyncio.gather(*(player.offline() for player in self.sonosplayers.values()))
+ if events_asyncio.event_listener:
+ await events_asyncio.event_listener.async_stop()
self.sonosplayers = None
async def get_player_config_entries(
default_value=0,
range=(-10, 10),
description="Set the Bass level for the Sonos player",
- value=sonos_player.soco_device.bass,
+ value=sonos_player.soco.bass,
advanced=True,
),
ConfigEntry(
default_value=0,
range=(-10, 10),
description="Set the Treble level for the Sonos player",
- value=sonos_player.soco_device.treble,
+ value=sonos_player.soco.treble,
advanced=True,
),
ConfigEntry(
label="Loudness compensation",
default_value=True,
description="Enable loudness compensation on the Sonos player",
- value=sonos_player.soco_device.loudness,
+ value=sonos_player.soco.loudness,
advanced=True,
),
)
return
if "values/sonos_bass" in changed_keys:
self.mass.create_task(
- sonos_player.soco_device.renderingControl.SetBass,
+ sonos_player.soco.renderingControl.SetBass,
[("InstanceID", 0), ("DesiredBass", config.get_value("sonos_bass"))],
)
if "values/sonos_treble" in changed_keys:
self.mass.create_task(
- sonos_player.soco_device.renderingControl.SetTreble,
+ sonos_player.soco.renderingControl.SetTreble,
[("InstanceID", 0), ("DesiredTreble", config.get_value("sonos_treble"))],
)
if "values/sonos_loudness" in changed_keys:
loudness_value = "1" if config.get_value("sonos_loudness") else "0"
self.mass.create_task(
- sonos_player.soco_device.renderingControl.SetLoudness,
+ sonos_player.soco.renderingControl.SetLoudness,
[
("InstanceID", 0),
("Channel", "Master"),
],
)
+ def is_device_invisible(self, ip_address: str) -> bool:
+ """Check if device at provided IP is known to be invisible."""
+ return any(x for x in self._known_invisible if x.ip_address == ip_address)
+
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
sonos_player = self.sonosplayers[player_id]
- if not sonos_player.soco_device.is_coordinator:
+ if sonos_player.sync_coordinator:
self.logger.debug(
"Ignore STOP command for %s: Player is synced to another player.",
player_id,
)
return
- await asyncio.to_thread(sonos_player.soco_device.stop)
- await asyncio.to_thread(sonos_player.soco_device.clear_queue)
- sonos_player.playback_started = None
+ await asyncio.to_thread(sonos_player.soco.stop)
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY command to given player."""
sonos_player = self.sonosplayers[player_id]
- if not sonos_player.soco_device.is_coordinator:
+ if sonos_player.sync_coordinator:
self.logger.debug(
"Ignore PLAY command for %s: Player is synced to another player.",
player_id,
)
return
- await asyncio.to_thread(sonos_player.soco_device.play)
+ await asyncio.to_thread(sonos_player.soco.play)
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player."""
sonos_player = self.sonosplayers[player_id]
- if not sonos_player.soco_device.is_coordinator:
+ if sonos_player.sync_coordinator:
self.logger.debug(
"Ignore PLAY command for %s: Player is synced to another player.",
player_id,
)
return
- if sonos_player.need_elapsed_time_workaround:
- # no pause allowed when radio/flow mode is active
+ if "Pause" not in sonos_player.soco.available_actions:
+ # pause not possible
await self.cmd_stop(player_id)
return
- await asyncio.to_thread(sonos_player.soco_device.pause)
+ await asyncio.to_thread(sonos_player.soco.pause)
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
def set_volume_level(player_id: str, volume_level: int) -> None:
sonos_player = self.sonosplayers[player_id]
- sonos_player.soco_device.volume = volume_level
+ sonos_player.soco.volume = volume_level
await asyncio.to_thread(set_volume_level, player_id, volume_level)
def set_volume_mute(player_id: str, muted: bool) -> None:
sonos_player = self.sonosplayers[player_id]
- sonos_player.soco_device.mute = muted
+ sonos_player.soco.mute = muted
await asyncio.to_thread(set_volume_mute, player_id, muted)
- target_player: player_id of the syncgroup master or group player.
"""
sonos_player = self.sonosplayers[player_id]
- sonos_master_player = self.sonosplayers[target_player].soco_device
- retries = 0
- while True:
- try:
- await asyncio.to_thread(sonos_player.soco_device.join, sonos_master_player)
- break
- except soco.exceptions.SoCoUPnPException as err:
- if retries >= 3:
- raise err
- retries += 1
- await asyncio.sleep(1)
- # optimistically update player state
- # await self.update_player(sonos_player)
+ sonos_master_player = self.sonosplayers[target_player]
+ await sonos_master_player.join([sonos_player])
async def cmd_unsync(self, player_id: str) -> None:
"""Handle UNSYNC command for given player.
- player_id: player_id of the player to handle the command.
"""
sonos_player = self.sonosplayers[player_id]
- await asyncio.to_thread(sonos_player.soco_device.unjoin)
- # optimistically update player state
- # await self.update_player(sonos_player)
+ await sonos_player.unjoin()
async def play_media(
self,
output_codec=ContentType.FLAC,
seek_position=seek_position,
fade_in=fade_in,
- flow_mode=False,
)
sonos_player = self.sonosplayers[player_id]
mass_player = self.mass.players.get(player_id)
- if not sonos_player.soco_device.is_coordinator:
+ if sonos_player.sync_coordinator:
# this should be already handled by the player manager, but just in case...
raise PlayerCommandFailed(
f"Player {mass_player.display_name} can not "
"accept play_media command, it is synced to another player."
)
metadata = create_didl_metadata(self.mass, url, queue_item)
- await asyncio.to_thread(sonos_player.soco_device.play_uri, url, meta=metadata)
- # optimistically set this timestamp to help figure out elapsed time later
- now = time.time()
- sonos_player.playback_started = now
- mass_player.elapsed_time = 0
- mass_player.elapsed_time_last_updated = now
+ self.mass.create_task(sonos_player.soco.play_uri, url, meta=metadata)
async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
"""Handle PLAY STREAM on given player.
This is a special feature from the Universal Group provider.
"""
- url = stream_job.resolve_stream_url(player_id, ContentType.MP3)
+ url = stream_job.resolve_stream_url(player_id, ContentType.FLAC)
sonos_player = self.sonosplayers[player_id]
mass_player = self.mass.players.get(player_id)
- if not sonos_player.soco_device.is_coordinator:
+ if sonos_player.sync_coordinator:
# this should be already handled by the player manager, but just in case...
raise PlayerCommandFailed(
f"Player {mass_player.display_name} can not "
"accept play_stream command, it is synced to another player."
)
metadata = create_didl_metadata(self.mass, url, None)
- await asyncio.to_thread(sonos_player.soco_device.play_uri, url, meta=metadata)
- # add a special 'command' item to the sonos queue
- # this allows for on-player next buttons/commands to still work
- await self._enqueue_item(
- sonos_player, self.mass.streams.get_command_url(player_id, "next"), None
- )
+ # sonos players do not like our multi client stream
+ # add to the workaround players list
+ self.mass.streams.workaround_players.add(player_id)
+ await self.mass.create_task(sonos_player.soco.play_uri, url, meta=metadata)
# optimistically set this timestamp to help figure out elapsed time later
- now = time.time()
- sonos_player.playback_started = now
mass_player.elapsed_time = 0
- mass_player.elapsed_time_last_updated = now
+ mass_player.elapsed_time_last_updated = time.time()
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
"""
)
# set crossfade according to player setting
crossfade = await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
- if sonos_player.soco_device.cross_fade != crossfade:
+ if sonos_player.soco.cross_fade != crossfade:
def set_crossfade():
with suppress(Exception):
- sonos_player.soco_device.cross_fade = crossfade
+ sonos_player.soco.cross_fade = crossfade
await asyncio.to_thread(set_crossfade)
# based on when we last received info from the device
await sonos_player.check_poll()
# always update the attributes
- await self.update_player(sonos_player, signal_update=False)
+ sonos_player.update_player(signal_update=False)
except ConnectionResetError as err:
raise PlayerUnavailableError from err
"""Discover Sonos players on the network."""
if self._discovery_running:
return
- try:
- self._discovery_running = True
- self.logger.debug("Sonos discovery started...")
- discovered_devices: set[soco.SoCo] = await asyncio.to_thread(
- soco.discover, allow_network_scan=self.config.get_value(CONF_NETWORK_SCAN)
- )
- if discovered_devices is None:
- discovered_devices = set()
- # process new players
- for device in discovered_devices:
- if (existing := self.mass.players.get(device.uid)) and existing.available:
- continue
- try:
- await self._device_discovered(device)
- except Exception as err:
- self.logger.exception(str(err), exc_info=err)
+ allow_network_scan = self.config.get_value(CONF_NETWORK_SCAN)
- finally:
- self._discovery_running = False
+ def do_discover():
+ """Run discovery and add players in executor thread."""
+ self._discovery_running = True
+ try:
+ self.logger.debug("Sonos discovery started...")
+ discovered_devices: set[SoCo] = discover(allow_network_scan=allow_network_scan)
+ if discovered_devices is None:
+ discovered_devices = set()
+ # process new players
+ for soco in discovered_devices:
+ try:
+ self._add_player(soco)
+ except (OSError, SoCoException, Timeout) as err:
+ self.logger.warning(
+ "Failed to add SonosPlayer using %s: %s", soco, err, exc_info=err
+ )
+ finally:
+ self._discovery_running = False
+
+ await self.mass.create_task(do_discover)
def reschedule():
self._discovery_reschedule_timer = None
# reschedule self once finished
self._discovery_reschedule_timer = self.mass.loop.call_later(300, reschedule)
- async def _device_discovered(self, soco_device: soco.SoCo) -> None:
- """Handle discovered Sonos player."""
- player_id = soco_device.uid
+ def _add_player(self, soco: SoCo) -> None:
+ """Add discovered Sonos player."""
+ player_id = soco.uid
+ if player_id in self.sonosplayers:
+ return # already added
+ if not soco.is_visible:
+ return
enabled = self.mass.config.get(f"{CONF_PLAYERS}/{player_id}/enabled", True)
if not enabled:
self.logger.debug("Ignoring disabled player: %s", player_id)
return
- if soco_device not in soco_device.visible_zones:
+ if soco not in soco.visible_zones:
return
- if not (sonos_player := self.sonosplayers.get(player_id)):
- self.sonosplayers[player_id] = sonos_player = SonosPlayer(
- self,
- soco_device,
- )
-
- if not (mass_player := self.mass.players.get(player_id)):
- mass_player = Player(
- player_id=soco_device.uid,
+ speaker_info = soco.get_speaker_info(True, timeout=7)
+ if soco.uid not in self.boot_counts:
+ self.boot_counts[soco.uid] = soco.boot_seqnum
+ self.logger.debug("Adding new player: %s", speaker_info)
+ support_hires = speaker_info["model_name"] in HIRES_MODELS
+ self.sonosplayers[player_id] = sonos_player = SonosPlayer(
+ self,
+ soco=soco,
+ mass_player=Player(
+ player_id=soco.uid,
provider=self.domain,
type=PlayerType.PLAYER,
- name=soco_device.player_name,
+ name=soco.player_name,
available=True,
powered=False,
supported_features=PLAYER_FEATURES,
- device_info=DeviceInfo(),
- max_sample_rate=44100,
- supports_24bit=False,
- )
-
- await sonos_player.reconnect(soco_device)
-
- if sonos_player.speaker_info["model_name"] in HIRES_MODELS:
- mass_player.max_sample_rate = 48000
- mass_player.supports_24bit = True
-
- mass_player.device_info = DeviceInfo(
- model=sonos_player.speaker_info["model_name"],
- address=sonos_player.soco_device.ip_address,
- manufacturer="SONOS",
+ device_info=DeviceInfo(
+ model=speaker_info["model_name"],
+ address=soco.ip_address,
+ manufacturer="SONOS",
+ ),
+ max_sample_rate=48000 if support_hires else 44100,
+ supports_24bit=support_hires,
+ ),
)
-
- self.mass.players.register_or_update(mass_player)
+ sonos_player.setup()
+ self.mass.loop.call_soon_threadsafe(self.mass.players.register, sonos_player.mass_player)
async def _enqueue_item(
self,
"""Enqueue a queue item to the Sonos player Queue."""
metadata = create_didl_metadata(self.mass, url, queue_item)
await asyncio.to_thread(
- sonos_player.soco_device.avTransport.SetNextAVTransportURI,
+ sonos_player.soco.avTransport.SetNextAVTransportURI,
[("InstanceID", 0), ("NextURI", url), ("NextURIMetaData", metadata)],
timeout=60,
)
self.logger.debug(
"Enqued next track (%s) to player %s",
queue_item.name if queue_item else url,
- sonos_player.soco_device.player_name,
- )
-
- async def update_player(self, sonos_player: SonosPlayer, signal_update: bool = True) -> None:
- """Update Sonos Player."""
- mass_player = self.mass.players.get(sonos_player.player_id)
- prev_url = mass_player.current_item_id
- prev_state = mass_player.state
- sonos_player.update_attributes()
- mass_player.can_sync_with = tuple(
- x for x in self.sonosplayers if x != sonos_player.player_id
+ sonos_player.soco.player_name,
)
- current_url = mass_player.current_item_id
- current_state = mass_player.state
-
- if (prev_url != current_url) or (prev_state != current_state):
- # fetch track details on state or url change
- await asyncio.to_thread(
- sonos_player.update_info,
- update_track_info=True,
- )
- sonos_player.update_attributes()
-
- if signal_update:
- # send update to the player manager right away only if we are triggered from an event
- # when we're just updating from a manual poll, the player manager
- # will detect changes to the player object itself
- self.mass.players.update(mass_player.player_id)
-
-
-def _convert_state(sonos_state: str) -> PlayerState:
- """Convert Sonos state to PlayerState."""
- if sonos_state == "PLAYING":
- return PlayerState.PLAYING
- if sonos_state == "TRANSITIONING":
- return PlayerState.PLAYING
- if sonos_state == "PAUSED_PLAYBACK":
- return PlayerState.PAUSED
- return PlayerState.IDLE
-
-
-def _timespan_secs(timespan):
- """Parse a time-span into number of seconds."""
- if timespan in ("", "NOT_IMPLEMENTED", None):
- return None
- return sum(60 ** x[0] * int(x[1]) for x in enumerate(reversed(timespan.split(":"))))
-
-
-class ProcessSonosEventQueue:
- """Queue like object for dispatching sonos events."""
-
- def __init__(
- self,
- callback_handler: callable[[dict], None],
- ) -> None:
- """Initialize Sonos event queue."""
- self._callback_handler = callback_handler
-
- def put(self, info: Any, block=True, timeout=None) -> None: # noqa: ARG002
- """Process event."""
- # noqa: ARG001
- self._callback_handler(info)
--- /dev/null
+"""Helper methods for common tasks."""
+
+from __future__ import annotations
+
+import logging
+from collections.abc import Callable
+from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, overload
+
+from requests.exceptions import Timeout
+from soco import SoCo
+from soco.exceptions import SoCoException, SoCoUPnPException
+
+from music_assistant.common.models.errors import PlayerCommandFailed
+
+if TYPE_CHECKING:
+ from . import SonosPlayer
+ from .media import SonosMedia
+
+UID_PREFIX = "RINCON_"
+UID_POSTFIX = "01400"
+SONOS_SPEAKER_ACTIVITY = "sonos_speaker_activity"
+
+_LOGGER = logging.getLogger(__name__)
+
+_T = TypeVar("_T", bound="SonosPlayer | SonosMedia")
+_R = TypeVar("_R")
+_P = ParamSpec("_P")
+
+_FuncType = Callable[Concatenate[_T, _P], _R]
+_ReturnFuncType = Callable[Concatenate[_T, _P], _R | None]
+
+
+@overload
+def soco_error(
+ errorcodes: None = ...,
+) -> Callable[[_FuncType[_T, _P, _R]], _FuncType[_T, _P, _R]]: ...
+
+
+@overload
+def soco_error(
+ errorcodes: list[str],
+) -> Callable[[_FuncType[_T, _P, _R]], _ReturnFuncType[_T, _P, _R]]: ...
+
+
+def soco_error(
+ errorcodes: list[str] | None = None,
+) -> Callable[[_FuncType[_T, _P, _R]], _ReturnFuncType[_T, _P, _R]]:
+ """Filter out specified UPnP errors and raise exceptions for service calls."""
+
+ def decorator(funct: _FuncType[_T, _P, _R]) -> _ReturnFuncType[_T, _P, _R]:
+ """Decorate functions."""
+
+ def wrapper(self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R | None:
+ """Wrap for all soco UPnP exception."""
+ args_soco = next((arg for arg in args if isinstance(arg, SoCo)), None)
+ try:
+ result = funct(self, *args, **kwargs)
+ except (OSError, SoCoException, SoCoUPnPException, Timeout) as err:
+ error_code = getattr(err, "error_code", None)
+ function = funct.__qualname__
+ if errorcodes and error_code in errorcodes:
+ _LOGGER.debug("Error code %s ignored in call to %s", error_code, function)
+ return None
+
+ if (target := _find_target_identifier(self, args_soco)) is None:
+ raise RuntimeError("Unexpected use of soco_error") from err
+
+ message = f"Error calling {function} on {target}: {err}"
+ raise PlayerCommandFailed(message) from err
+
+ return result
+
+ return wrapper
+
+ return decorator
+
+
+def _find_target_identifier(instance: Any, fallback_soco: SoCo | None) -> str | None:
+ """Extract the best available target identifier from the provided instance object."""
+ if entity_id := getattr(instance, "entity_id", None):
+ # SonosEntity instance
+ return entity_id
+ if zone_name := getattr(instance, "zone_name", None):
+ # SonosSpeaker instance
+ return zone_name
+ if speaker := getattr(instance, "speaker", None):
+ # Holds a SonosSpeaker instance attribute
+ return speaker.zone_name
+ if soco := getattr(instance, "soco", fallback_soco):
+ # Holds a SoCo instance attribute
+ # Only use attributes with no I/O
+ return soco._player_name or soco.ip_address # pylint: disable=protected-access
+ return None
+
+
+def hostname_to_uid(hostname: str) -> str:
+ """Convert a Sonos hostname to a uid."""
+ if hostname.startswith("Sonos-"):
+ baseuid = hostname.removeprefix("Sonos-").replace(".local.", "")
+ elif hostname.startswith("sonos"):
+ baseuid = hostname.removeprefix("sonos").replace(".local.", "")
+ else:
+ raise ValueError(f"{hostname} is not a sonos device.")
+ return f"{UID_PREFIX}{baseuid}{UID_POSTFIX}"
+
+
+def sync_get_visible_zones(soco: SoCo) -> set[SoCo]:
+ """Ensure I/O attributes are cached and return visible zones."""
+ _ = soco.household_id
+ _ = soco.uid
+ return soco.visible_zones
"domain": "sonos",
"name": "SONOS",
"description": "SONOS Playerprovider for Music Assistant.",
- "codeowners": ["@music-assistant"],
- "requirements": ["soco==0.30.2"],
+ "codeowners": [
+ "@music-assistant"
+ ],
+ "requirements": [
+ "soco==0.30.2",
+ "sonos-websocket==0.1.3",
+ "defusedxml==0.7.1"
+ ],
"documentation": "https://github.com/orgs/music-assistant/discussions/1171",
"multi_instance": false,
"builtin": false,
--- /dev/null
+"""
+Sonos Player provider for Music Assistant: SonosPlayer object/model.
+
+Note that large parts of this code are copied over from the Home Assistant
+integration for Sonos.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import contextlib
+import datetime
+import logging
+import time
+from collections.abc import Callable, Coroutine
+from typing import TYPE_CHECKING, Any
+
+import defusedxml.ElementTree as ET # noqa: N817
+from soco import SoCoException
+from soco.core import (
+ MUSIC_SRC_AIRPLAY,
+ MUSIC_SRC_LINE_IN,
+ MUSIC_SRC_RADIO,
+ MUSIC_SRC_SPOTIFY_CONNECT,
+ MUSIC_SRC_TV,
+ SoCo,
+)
+from soco.data_structures import DidlAudioBroadcast, DidlPlaylistContainer
+from soco.events_base import Event as SonosEvent
+from soco.events_base import SubscriptionBase
+from sonos_websocket import SonosWebsocket
+
+from music_assistant.common.helpers.datetime import utc
+from music_assistant.common.models.enums import PlayerFeature, PlayerState
+from music_assistant.common.models.errors import PlayerCommandFailed
+from music_assistant.common.models.player import Player
+from music_assistant.server.providers.sonos.helpers import soco_error
+
+if TYPE_CHECKING:
+ from . import SonosPlayerProvider
+
+CALLBACK_TYPE = Callable[[], None]
+LOGGER = logging.getLogger(__name__)
+
+PLAYER_FEATURES = (
+ PlayerFeature.SYNC,
+ PlayerFeature.VOLUME_MUTE,
+ PlayerFeature.VOLUME_SET,
+ PlayerFeature.ENQUEUE_NEXT,
+)
+DURATION_SECONDS = "duration_in_s"
+POSITION_SECONDS = "position_in_s"
+SUBSCRIPTION_TIMEOUT = 1200
+ZGS_SUBSCRIPTION_TIMEOUT = 2
+AVAILABILITY_CHECK_INTERVAL = datetime.timedelta(minutes=1)
+AVAILABILITY_TIMEOUT = AVAILABILITY_CHECK_INTERVAL.total_seconds() * 4.5
+SONOS_STATE_PLAYING = "PLAYING"
+SONOS_STATE_TRANSITIONING = "TRANSITIONING"
+NEVER_TIME = -1200.0
+RESUB_COOLDOWN_SECONDS = 10.0
+SUBSCRIPTION_SERVICES = {
+ # "alarmClock",
+ "avTransport",
+ # "contentDirectory",
+ "deviceProperties",
+ "renderingControl",
+ "zoneGroupTopology",
+}
+SUPPORTED_VANISH_REASONS = ("powered off", "sleeping", "switch to bluetooth", "upgrade")
+UNUSED_DEVICE_KEYS = ["SPID", "TargetRoomName"]
+LINEIN_SOURCES = (MUSIC_SRC_TV, MUSIC_SRC_LINE_IN)
+SOURCE_AIRPLAY = "AirPlay"
+SOURCE_LINEIN = "Line-in"
+SOURCE_SPOTIFY_CONNECT = "Spotify Connect"
+SOURCE_TV = "TV"
+SOURCE_MAPPING = {
+ MUSIC_SRC_AIRPLAY: SOURCE_AIRPLAY,
+ MUSIC_SRC_TV: SOURCE_TV,
+ MUSIC_SRC_LINE_IN: SOURCE_LINEIN,
+ MUSIC_SRC_SPOTIFY_CONNECT: SOURCE_SPOTIFY_CONNECT,
+}
+
+HIRES_MODELS = (
+ "Sonos Roam",
+ "Sonos Arc",
+ "Sonos Beam",
+ "Sonos Five",
+ "Sonos Move",
+ "Sonos One SL",
+ "Sonos Port",
+ "Sonos Amp",
+ "SYMFONISK Bookshelf",
+ "SYMFONISK Table Lamp",
+ "Sonos Era 100",
+ "Sonos Era 300",
+)
+
+
+class SonosSubscriptionsFailed(PlayerCommandFailed):
+ """Subscription creation failed."""
+
+
+class SonosUpdateError(PlayerCommandFailed):
+ """Update failed."""
+
+
+class SonosPlayer:
+ """Wrapper around Sonos/SoCo with some additional attributes."""
+
+ def __init__(
+ self,
+ sonos_prov: SonosPlayerProvider,
+ soco: SoCo,
+ mass_player: Player,
+ ) -> None:
+ """Initialize SonosPlayer instance."""
+ self.sonos_prov = sonos_prov
+ self.mass = sonos_prov.mass
+ self.player_id = soco.uid
+ self.soco = soco
+ self.logger = sonos_prov.logger.getChild(soco.uid)
+ self.household_id: str = soco.household_id
+ self.subscriptions: list[SubscriptionBase] = []
+ self.websocket: SonosWebsocket | None = None
+ self.mass_player: Player = mass_player
+ self.available: bool = True
+ # cached attributes
+ self.play_mode: str | None = None
+ self.playback_status: str | None = None
+ self.channel: str | None = None
+ self.duration: float | None = None
+ self.image_url: str | None = None
+ self.source_name: str | None = None
+ self.title: str | None = None
+ self.uri: str | None = None
+ self.position: int | None = None
+ self.position_updated_at: datetime.datetime | None = None
+ # Subscriptions and events
+ self._subscriptions: list[SubscriptionBase] = []
+ self._subscription_lock: asyncio.Lock | None = None
+ self._last_activity: float = NEVER_TIME
+ self._resub_cooldown_expires_at: float | None = None
+ self._needs_poll: bool = False
+ # Grouping
+ self.sync_coordinator: SonosPlayer | None = None
+ self.group_members: list[SonosPlayer] = [self]
+ self.group_members_ids: list[str] = []
+ self._group_members_missing: set[str] = set()
+
+ def __hash__(self) -> int:
+ """Return a hash of self."""
+ return hash(self.player_id)
+
+ @property
+ def zone_name(self) -> str:
+ """Return zone name."""
+ if self.mass_player:
+ return self.mass_player.display_name
+ return self.soco.speaker_info["zone_name"]
+
+ @property
+ def subscription_address(self) -> str:
+ """Return the current subscription callback address."""
+ assert len(self._subscriptions) > 0
+ addr, port = self._subscriptions[0].event_listener.address
+ return ":".join([addr, str(port)])
+
+ @property
+ def missing_subscriptions(self) -> set[str]:
+ """Return a list of missing service subscriptions."""
+ subscribed_services = {sub.service.service_type for sub in self._subscriptions}
+ return SUBSCRIPTION_SERVICES - subscribed_services
+
+ def setup(self) -> None:
+ """Run initial setup of the speaker (NOT async friendly)."""
+ # update volume
+ self.mass_player.volume_level = self.soco.volume
+ self.mass_player.volume_muted = self.soco.mute
+ self.update_groups()
+ if not self.sync_coordinator:
+ self.poll_media()
+
+ async def do_async_setup() -> None:
+ """Complete setup in async context."""
+ self.websocket = SonosWebsocket(
+ self.soco.ip_address,
+ player_id=self.soco.uid,
+ session=self.mass.http_session,
+ )
+
+ future = asyncio.run_coroutine_threadsafe(do_async_setup(), self.mass.loop)
+ future.result(timeout=10)
+ asyncio.run_coroutine_threadsafe(self.subscribe(), self.mass.loop)
+
+ async def offline(self) -> None:
+ """Handle removal of speaker when unavailable."""
+ if not self.available:
+ return
+
+ if self._resub_cooldown_expires_at is None and not self.mass.closing:
+ self._resub_cooldown_expires_at = time.monotonic() + RESUB_COOLDOWN_SECONDS
+ self.logger.debug("Starting resubscription cooldown for %s", self.zone_name)
+
+ self.available = False
+ self.mass.players.update(self.player_id)
+ self._share_link_plugin = None
+
+ if self._poll_timer:
+ self._poll_timer()
+ self._poll_timer = None
+
+ await self.unsubscribe()
+ self.sonos_prov.discovery_known.discard(self.soco.uid)
+
+ def log_subscription_result(self, result: Any, event: str, level: int = logging.DEBUG) -> None:
+ """Log a message if a subscription action (create/renew/stop) results in an exception."""
+ if not isinstance(result, Exception):
+ return
+
+ if isinstance(result, asyncio.exceptions.TimeoutError):
+ message = "Request timed out"
+ exc_info = None
+ else:
+ message = str(result)
+ exc_info = result if not str(result) else None
+
+ self.logger.log(
+ level,
+ "%s failed for %s: %s",
+ event,
+ self.zone_name,
+ message,
+ exc_info=exc_info,
+ )
+
+ async def subscribe(self) -> None:
+ """Initiate event subscriptions under an async lock."""
+ if not self._subscription_lock:
+ self._subscription_lock = asyncio.Lock()
+
+ async with self._subscription_lock:
+ try:
+ # Create event subscriptions.
+ subscriptions = [
+ self._subscribe_target(getattr(self.soco, service), self._handle_event)
+ for service in self.missing_subscriptions
+ ]
+ if not subscriptions:
+ return
+ self.logger.debug("Creating subscriptions for %s", self.zone_name)
+ results = await asyncio.gather(*subscriptions, return_exceptions=True)
+ for result in results:
+ self.log_subscription_result(result, "Creating subscription", logging.WARNING)
+ if any(isinstance(result, Exception) for result in results):
+ raise SonosSubscriptionsFailed
+ except SonosSubscriptionsFailed:
+ self.logger.warning("Creating subscriptions failed for %s", self.zone_name)
+ assert self._subscription_lock is not None
+ async with self._subscription_lock:
+ await self.offline()
+
+ async def unsubscribe(self) -> None:
+ """Cancel all subscriptions."""
+ if not self._subscriptions:
+ return
+ self.logger.debug("Unsubscribing from events for %s", self.zone_name)
+ results = await asyncio.gather(
+ *(subscription.unsubscribe() for subscription in self._subscriptions),
+ return_exceptions=True,
+ )
+ for result in results:
+ self.log_subscription_result(result, "Unsubscribe")
+ self._subscriptions = []
+
+ async def check_poll(self) -> None:
+ """Validate availability of the speaker based on recent activity."""
+ if not (self._needs_poll or (time.monotonic() - self._last_activity) > 600):
+ return
+ try:
+ await self.mass.create_task(self.ping)
+ except SonosUpdateError:
+ self.logger.warning(
+ "No recent activity and cannot reach %s, marking unavailable",
+ self.zone_name,
+ )
+ self._needs_poll = True
+ await self.offline()
+
+ @soco_error()
+ def ping(self) -> None:
+ """Test device availability. Failure will raise SonosUpdateError."""
+ self.soco.renderingControl.GetVolume([("InstanceID", 0), ("Channel", "Master")], timeout=1)
+
+ async def join(
+ self,
+ members: list[SonosPlayer],
+ ) -> None:
+ """Sync given players/speakers with this player."""
+ async with self.sonos_prov.topology_condition:
+ group: list[SonosPlayer] = await self.mass.create_task(self._join, members)
+ await self.wait_for_groups([group])
+
+ async def unjoin(self) -> None:
+ """Unjoin player from all/any groups."""
+ async with self.sonos_prov.topology_condition:
+ await self.mass.create_task(self._unjoin)
+ await self.wait_for_groups([[self]])
+
+ def update_player(self, signal_update: bool = True) -> None:
+ """Update Sonos Player."""
+ self._update_attributes()
+ if signal_update:
+ # send update to the player manager right away only if we are triggered from an event
+ # when we're just updating from a manual poll, the player manager
+ # will detect changes to the player object itself
+ self.sonos_prov.mass.players.update(self.player_id)
+
+ @soco_error()
+ def poll_track_info(self) -> dict[str, Any]:
+ """Poll the speaker for current track info.
+
+ Add converted position values (NOT async fiendly).
+ """
+ track_info: dict[str, Any] = self.soco.get_current_track_info()
+ track_info[DURATION_SECONDS] = _timespan_secs(track_info.get("duration"))
+ track_info[POSITION_SECONDS] = _timespan_secs(track_info.get("position"))
+ return track_info
+
+ @soco_error()
+ def poll_media(self) -> None:
+ """Poll information about currently playing media."""
+ transport_info = self.soco.get_current_transport_info()
+ new_status = transport_info["current_transport_state"]
+
+ if new_status == SONOS_STATE_TRANSITIONING:
+ return
+
+ update_position = new_status != self.playback_status
+ self.playback_status = new_status
+ self.play_mode = self.soco.play_mode
+ self._set_basic_track_info(update_position=update_position)
+ self.update_player()
+
+ async def _subscribe_target(self, target: SubscriptionBase, sub_callback: Callable) -> None:
+ """Create a Sonos subscription for given target."""
+ subscription = await target.subscribe(
+ auto_renew=True, requested_timeout=SUBSCRIPTION_TIMEOUT
+ )
+
+ def on_renew_failed(exception: Exception) -> None:
+ """Handle a failed subscription renewal callback."""
+ self.mass.create_task(self._renew_failed(exception))
+
+ subscription.callback = sub_callback
+ subscription.auto_renew_fail = on_renew_failed
+ self._subscriptions.append(subscription)
+
+ async def _renew_failed(self, exception: Exception) -> None:
+ """Mark the speaker as offline after a subscription renewal failure.
+
+ This is to reset the state to allow a future clean subscription attempt.
+ """
+ if not self.available:
+ return
+
+ self.log_subscription_result(exception, "Subscription renewal", logging.WARNING)
+ await self.offline()
+
+ def _handle_event(self, event: SonosEvent) -> None:
+ """Handle SonosEvent callback."""
+ if self._needs_poll:
+ self.logger.debug("Received event, cancelling poll timer for %s", self.zone_name)
+ self._needs_poll = False
+
+ service_type: str = event.service.service_type
+ self._speaker_activity(f"{service_type} subscription")
+
+ if service_type == "DeviceProperties":
+ self.update_player()
+ return
+ if service_type == "AVTransport":
+ self._handle_avtransport_event(event)
+ return
+ if service_type == "RenderingControl":
+ self._handle_rendering_control_event(event)
+ return
+ if service_type == "ZoneGroupTopology":
+ self._handle_zone_group_topology_event(event)
+ return
+
+ def _handle_avtransport_event(self, event: SonosEvent) -> None:
+ """Update information about currently playing media from an event."""
+ # NOTE: The new coordinator can be provided in a media update event but
+ # before the ZoneGroupState updates. If this happens the playback
+ # state will be incorrect and should be ignored. Switching to the
+ # new coordinator will use its media. The regrouping process will
+ # be completed during the next ZoneGroupState update.
+ av_transport_uri = event.variables.get("av_transport_uri", "")
+ current_track_uri = event.variables.get("current_track_uri", "")
+ if av_transport_uri == current_track_uri and av_transport_uri.startswith("x-rincon:"):
+ new_coordinator_uid = av_transport_uri.split(":")[-1]
+ if new_coordinator_speaker := self.sonos_prov.sonosplayers.get(new_coordinator_uid):
+ self.logger.debug(
+ "Media update coordinator (%s) received for %s",
+ new_coordinator_speaker.zone_name,
+ self.zone_name,
+ )
+ self.sync_coordinator = new_coordinator_speaker
+ else:
+ self.logger.debug(
+ "Media update coordinator (%s) for %s not yet available",
+ new_coordinator_uid,
+ self.zone_name,
+ )
+ return
+
+ if crossfade := event.variables.get("current_crossfade_mode"):
+ self.logger.debug("crossfade changed to %s", crossfade)
+
+ # Missing transport_state indicates a transient error
+ if (new_status := event.variables.get("transport_state")) is None:
+ return
+
+ # Ignore transitions, we should get the target state soon
+ if new_status == SONOS_STATE_TRANSITIONING:
+ return
+
+ evars = event.variables
+ new_status = evars["transport_state"]
+ state_changed = new_status != self.playback_status
+
+ self.play_mode = evars["current_play_mode"]
+ self.playback_status = new_status
+
+ track_uri = evars["enqueued_transport_uri"] or evars["current_track_uri"]
+ audio_source = self.soco.music_source_from_uri(track_uri)
+
+ self._set_basic_track_info(update_position=state_changed)
+
+ if (ct_md := evars["current_track_meta_data"]) and not self.image_url: # noqa: SIM102
+ if album_art_uri := getattr(ct_md, "album_art_uri", None):
+ # TODO: handle library mess here
+ self.image_url = album_art_uri
+
+ et_uri_md = evars["enqueued_transport_uri_meta_data"]
+ if isinstance(et_uri_md, DidlPlaylistContainer):
+ self.playlist_name = et_uri_md.title
+
+ if queue_size := evars.get("number_of_tracks", 0):
+ self.queue_size = int(queue_size)
+
+ if audio_source == MUSIC_SRC_RADIO:
+ if et_uri_md:
+ self.channel = et_uri_md.title
+
+ # Extra guards for S1 compatibility
+ if ct_md and hasattr(ct_md, "radio_show") and ct_md.radio_show:
+ radio_show = ct_md.radio_show.split(",")[0]
+ self.channel = " • ".join(filter(None, [self.channel, radio_show]))
+
+ if isinstance(et_uri_md, DidlAudioBroadcast):
+ self.title = self.title or self.channel
+
+ self.update_player()
+
+ def _handle_rendering_control_event(self, event: SonosEvent) -> None:
+ """Update information about currently volume settings."""
+ variables = event.variables
+
+ if "volume" in variables:
+ volume = variables["volume"]
+ self.mass_player.volume_level = int(volume["Master"])
+
+ if mute := variables.get("mute"):
+ self.mass_player.volume_muted = mute["Master"] == "1"
+
+ if loudness := variables.get("loudness"):
+ # TODO: handle this is a better way
+ self.mass.config.set_raw_player_config_value(
+ self.player_id, "sonos_loudness", loudness["Master"] == "1"
+ )
+
+ for int_var in (
+ "bass",
+ "treble",
+ ):
+ if int_var in variables:
+ # TODO: handle this is a better way
+ self.mass.config.set_raw_player_config_value(
+ self.player_id, f"sonos_{int_var}", variables[int_var]
+ )
+
+ self.update_player()
+
+ def _handle_zone_group_topology_event(self, event: SonosEvent) -> None:
+ """Handle callback for topology change event."""
+ if xml := event.variables.get("zone_group_state"):
+ zgs = ET.fromstring(xml)
+ for vanished_device in zgs.find("VanishedDevices") or []:
+ if (reason := vanished_device.get("Reason")) not in SUPPORTED_VANISH_REASONS:
+ self.logger.debug(
+ "Ignoring %s marked %s as vanished with reason: %s",
+ self.zone_name,
+ vanished_device.get("ZoneName"),
+ reason,
+ )
+ continue
+ self.mass.create_task(self._vanished(reason))
+
+ if "zone_player_uui_ds_in_group" not in event.variables:
+ return
+ asyncio.run_coroutine_threadsafe(self.create_update_groups_coro(event), self.mass.loop)
+
+ async def _vanished(self, reason: str) -> None:
+ """Handle removal of speaker when marked as vanished."""
+ if not self.available:
+ return
+ self.logger.debug("%s has vanished (%s), marking unavailable", self.zone_name, reason)
+ await self.offline()
+
+ async def _rebooted(self) -> None:
+ """Handle a detected speaker reboot."""
+ self.logger.debug("%s rebooted, reconnecting", self.zone_name)
+ await self.offline()
+ self._speaker_activity("reboot")
+
+ def update_groups(self) -> None:
+ """Update group topology when polling."""
+ asyncio.run_coroutine_threadsafe(self.create_update_groups_coro(), self.mass.loop)
+
+ def update_group_for_uid(self, uid: str) -> None:
+ """Update group topology if uid is missing."""
+ if uid not in self._group_members_missing:
+ return
+ missing_zone = self.sonos_prov.sonosplayers[uid].zone_name
+ self.logger.debug("%s was missing, adding to %s group", missing_zone, self.zone_name)
+ self.update_groups()
+
+ def create_update_groups_coro(self, event: SonosEvent | None = None) -> Coroutine:
+ """Handle callback for topology change event."""
+
+ def _get_soco_group() -> list[str]:
+ """Ask SoCo cache for existing topology."""
+ coordinator_uid = self.soco.uid
+ joined_uids = []
+ with contextlib.suppress(OSError, SoCoException):
+ if self.soco.group and self.soco.group.coordinator:
+ coordinator_uid = self.soco.group.coordinator.uid
+ joined_uids = [
+ p.uid
+ for p in self.soco.group.members
+ if p.uid != coordinator_uid and p.is_visible
+ ]
+
+ return [coordinator_uid] + joined_uids
+
+ async def _extract_group(event: SonosEvent | None) -> list[str]:
+ """Extract group layout from a topology event."""
+ group = event and event.zone_player_uui_ds_in_group
+ if group:
+ assert isinstance(group, str)
+ return group.split(",")
+ return await self.mass.create_task(_get_soco_group)
+
+ def _regroup(group: list[str]) -> None:
+ """Rebuild internal group layout (async safe)."""
+ if group == [self.soco.uid] and self.group_members == [self] and self.group_members_ids:
+ # Skip updating existing single speakers in polling mode
+ return
+
+ group_members = []
+ group_members_ids = []
+
+ for uid in group:
+ speaker = self.sonos_prov.sonosplayers.get(uid)
+ if speaker:
+ self._group_members_missing.discard(uid)
+ group_members.append(speaker)
+ group_members_ids.append(uid)
+ else:
+ self._group_members_missing.add(uid)
+ self.logger.debug(
+ "%s group member unavailable (%s), will try again",
+ self.zone_name,
+ uid,
+ )
+ return
+
+ if self.group_members_ids == group_members_ids:
+ # Useful in polling mode for speakers with stereo pairs or surrounds
+ # as those "invisible" speakers will bypass the single speaker check
+ return
+
+ self.sync_coordinator = None
+ self.group_members = group_members
+ self.group_members_ids = group_members_ids
+ self.mass.players.update(self.player_id)
+
+ for joined_uid in group[1:]:
+ joined_speaker: SonosPlayer = self.sonos_prov.sonosplayers.get(joined_uid)
+ if joined_speaker:
+ joined_speaker.sync_coordinator = self
+ joined_speaker.group_members = group_members
+ joined_speaker.group_members_ids = group_members_ids
+ joined_speaker.update_player()
+
+ self.logger.debug("Regrouped %s: %s", self.zone_name, self.group_members_ids)
+ self.update_player()
+
+ async def _handle_group_event(event: SonosEvent | None) -> None:
+ """Get async lock and handle event."""
+ async with self.sonos_prov.topology_condition:
+ group = await _extract_group(event)
+ if self.soco.uid == group[0]:
+ _regroup(group)
+ self.sonos_prov.topology_condition.notify_all()
+
+ return _handle_group_event(event)
+
+ async def wait_for_groups(self, groups: list[list[SonosPlayer]]) -> None:
+ """Wait until all groups are present, or timeout."""
+
+ def _test_groups(groups: list[list[SonosPlayer]]) -> bool:
+ """Return whether all groups exist now."""
+ for group in groups:
+ coordinator = group[0]
+
+ # Test that coordinator is coordinating
+ current_group = coordinator.group_members
+ if coordinator != current_group[0]:
+ return False
+
+ # Test that joined members match
+ if set(group[1:]) != set(current_group[1:]):
+ return False
+
+ return True
+
+ try:
+ async with asyncio.timeout(5):
+ while not _test_groups(groups):
+ await self.sonos_prov.topology_condition.wait()
+ except asyncio.TimeoutError:
+ self.logger.warning("Timeout waiting for target groups %s", groups)
+
+ any_speaker = next(iter(self.sonos_prov.sonosplayers.values()))
+ any_speaker.soco.zone_group_state.clear_cache()
+
+ def _update_attributes(self):
+ """Update attributes of the MA Player from SoCo state."""
+ # generic attributes (player_info)
+ self.mass_player.available = self.available
+
+ # transport info (playback state)
+ self.mass_player.state = current_state = _convert_state(self.playback_status)
+
+ # power 'on' player if we detect its playing
+ if not self.mass_player.powered and (
+ current_state == PlayerState.PLAYING
+ or (
+ self.sync_coordinator
+ and self.sync_coordinator.mass_player.state == PlayerState.PLAYING
+ )
+ ):
+ self.mass_player.powered = True
+
+ # media info (track info)
+ self.mass_player.current_item_id = self.uri
+ if self.uri and self.player_id in self.uri:
+ self.mass_player.active_source = self.player_id
+ else:
+ self.mass_player.active_source = self.source_name
+ if self.position is not None and self.position_updated_at is not None:
+ self.mass_player.elapsed_time = self.position
+ self.mass_player.elapsed_time_last_updated = self.position_updated_at.timestamp()
+
+ # zone topology (syncing/grouping) details
+ self.mass_player.can_sync_with = tuple(
+ x.player_id for x in self.sonos_prov.sonosplayers.values() if x.sync_coordinator is None
+ )
+ if self.sync_coordinator:
+ # player is syned to another player
+ self.mass_player.synced_to = self.sync_coordinator.player_id
+ self.mass_player.group_childs = set()
+ self.mass_player.active_source = self.sync_coordinator.mass_player.active_source
+ elif len(self.group_members_ids) > 1:
+ # this player is the sync leader in a group
+ self.mass_player.synced_to = None
+ self.mass_player.group_childs = set(self.group_members_ids)
+ else:
+ # standalone player, not synced
+ self.mass_player.synced_to = None
+ self.mass_player.group_childs = set()
+
+ def _set_basic_track_info(self, update_position: bool = False) -> None:
+ """Query the speaker to update media metadata and position info."""
+ self.channel = None
+ self.duration = None
+ self.image_url = None
+ self.source_name = None
+ self.title = None
+ self.uri = None
+
+ track_info = self.poll_track_info()
+ if not track_info["uri"]:
+ return
+ self.uri = track_info["uri"]
+
+ audio_source = self.soco.music_source_from_uri(self.uri)
+ if source := SOURCE_MAPPING.get(audio_source):
+ self.source_name = source
+ if audio_source in LINEIN_SOURCES:
+ self.position = None
+ self.position_updated_at = None
+ self.title = source
+ return
+
+ self.artist = track_info.get("artist")
+ self.album_name = track_info.get("album")
+ self.title = track_info.get("title")
+ self.image_url = track_info.get("album_art")
+
+ playlist_position = int(track_info.get("playlist_position", -1))
+ if playlist_position > 0:
+ self.queue_position = playlist_position
+
+ self._update_media_position(track_info, force_update=update_position)
+
+ def _update_media_position(
+ self, position_info: dict[str, int], force_update: bool = False
+ ) -> None:
+ """Update state when playing music tracks."""
+ duration = position_info.get(DURATION_SECONDS)
+ current_position = position_info.get(POSITION_SECONDS)
+
+ if not (duration or current_position):
+ self.position = None
+ self.position_updated_at = None
+ return
+
+ should_update = force_update
+ self.duration = duration
+
+ # player started reporting position?
+ if current_position is not None and self.position is None:
+ should_update = True
+
+ # position jumped?
+ if current_position is not None and self.position is not None:
+ if self.playback_status == SONOS_STATE_PLAYING:
+ assert self.position_updated_at is not None
+ time_delta = utc() - self.position_updated_at
+ time_diff = time_delta.total_seconds()
+ else:
+ time_diff = 0
+
+ calculated_position = self.position + time_diff
+
+ if abs(calculated_position - current_position) > 1.5:
+ should_update = True
+
+ if current_position is None:
+ self.position = None
+ self.position_updated_at = None
+ elif should_update:
+ self.position = current_position
+ self.position_updated_at = utc()
+
+ def _speaker_activity(self, source: str) -> None:
+ """Track the last activity on this speaker, set availability and resubscribe."""
+ if self._resub_cooldown_expires_at:
+ if time.monotonic() < self._resub_cooldown_expires_at:
+ self.logger.debug(
+ "Activity on %s from %s while in cooldown, ignoring",
+ self.zone_name,
+ source,
+ )
+ return
+ self._resub_cooldown_expires_at = None
+
+ self.logger.debug("Activity on %s from %s", self.zone_name, source)
+ self._last_activity = time.monotonic()
+ was_available = self.available
+ self.available = True
+ if not was_available:
+ self.mass.players.update(self.player_id)
+ self.mass.create_task(self.subscribe())
+
+ @soco_error()
+ def _join(self, members: list[SonosPlayer]) -> list[SonosPlayer]:
+ if self.sync_coordinator:
+ self.unjoin()
+ group = [self]
+ else:
+ group = self.group_members.copy()
+
+ for player in members:
+ if player.soco.uid != self.soco.uid and player not in group:
+ player.soco.join(self.soco)
+ player.sync_coordinator = self
+ group.append(player)
+
+ return group
+
+ @soco_error()
+ def _unjoin(self) -> None:
+ if self.group_members == [self]:
+ return
+ self.soco.unjoin()
+ self.sync_coordinator = None
+
+
+def _convert_state(sonos_state: str) -> PlayerState:
+ """Convert Sonos state to PlayerState."""
+ if sonos_state == "PLAYING":
+ return PlayerState.PLAYING
+ if sonos_state == "TRANSITIONING":
+ return PlayerState.PLAYING
+ if sonos_state == "PAUSED_PLAYBACK":
+ return PlayerState.PAUSED
+ return PlayerState.IDLE
+
+
+def _timespan_secs(timespan):
+ """Parse a time-span into number of seconds."""
+ if timespan in ("", "NOT_IMPLEMENTED", None):
+ return None
+ return sum(60 ** x[0] * int(x[1]) for x in enumerate(reversed(timespan.split(":"))))
if member is None:
continue
tg.create_task(player_prov.play_stream(member.player_id, stream_job))
- stream_job.start()
async def poll_player(self, player_id: str) -> None:
"""Poll player for state updates."""
task.add_done_callback(task_done_callback)
return task
+ def call_later(
+ self,
+ delay: float,
+ target: Coroutine | Awaitable | Callable | asyncio.Future,
+ *args: Any,
+ task_id: str | None = None,
+ **kwargs: Any,
+ ) -> asyncio.Task | asyncio.Future:
+ """Run callable/awaitable after given delay."""
+
+ def _create_task():
+ self.create_task(target, *args, task_id=task_id, **kwargs)
+
+ self.loop.call_later(delay, _create_task)
+
def get_task(self, task_id: str) -> asyncio.Task | asyncio.Future:
"""Get existing scheduled task."""
if existing := self._tracked_tasks.get(task_id):
asyncio-throttle==1.0.2
colorlog==6.8.0
cryptography==41.0.7
+defusedxml==0.7.1
faust-cchardet>=2.1.18
git+https://github.com/MarvinSchenkel/pytube.git
git+https://github.com/music-assistant/deezer-python-async@v0.1.2
shortuuid==1.0.11
snapcast-mod==2.4.3
soco==0.30.2
+sonos-websocket==0.1.3
tidalapi==0.7.3
unidecode==1.3.8
uvloop==0.19.0