"editor.defaultFormatter": "charliermarsh.ruff",
"[github-actions-workflow]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
- }
+ },
+ "python.analysis.extraPaths": ["../aiosonos/"]
}
flow_mode_start_index: int = 0
stream_finished: bool | None = None
end_of_track_reached: bool | None = None
+ queue_items_last_updated: float = time.time()
@property
def corrected_elapsed_time(self) -> float:
# return imageproxy url for images that need to be resolved
# the original path is double encoded
encoded_url = urllib.parse.quote(urllib.parse.quote(image.path))
- return f"{self.mass.streams.base_url}/imageproxy?path={encoded_url}&provider={image.provider}&size={size}&fmt={image_format}" # noqa: E501
+ return (
+ f"{self.mass.streams.base_url}/imageproxy?path={encoded_url}"
+ f"&provider={image.provider}&size={size}&fmt={image_format}"
+ )
return image.path
async def get_thumbnail(
"""Signal state changed of given queue."""
queue = self._queues[queue_id]
if items_changed:
+ queue.queue_items_last_updated = time.time()
self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, object_id=queue_id, data=queue)
# save items in cache
self.mass.create_task(
base_key=queue_id,
)
)
-
# always send the base event
self.mass.signal_event(EventType.QUEUE_UPDATED, object_id=queue_id, data=queue)
# save state
get_icy_stream,
get_player_filter_params,
get_silence,
+ get_stream_details,
parse_loudnorm,
strip_silence,
)
if not queue_item:
raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
if not queue_item.streamdetails:
- raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
+ # raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
+ queue_item.streamdetails = await get_stream_details(
+ mass=self.mass, queue_item=queue_item
+ )
# work out output format/details
output_format = await self._get_output_format(
output_format_str=request.match_info["fmt"],
else:
title = "Music Assistant"
metadata = f"StreamTitle='{title}';".encode()
- if current_item and current_item.image:
- metadata += f"StreamURL='{current_item.image.path}'".encode()
while len(metadata) % 16 != 0:
metadata += b"\x00"
length = len(metadata)
if default_sample_rate in supported_sample_rates:
output_sample_rate = default_sample_rate
else:
- output_sample_rate = min(supported_sample_rates)
+ output_sample_rate = max(supported_sample_rates)
output_bit_depth = min(default_bit_depth, player_max_bit_depth)
output_channels_str = self.mass.config.get_raw_player_config_value(
player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
title = stream_title
# set album to radio station name
album = queue.current_item.name
- if media_item := queue.current_item.media_item:
+ elif media_item := queue.current_item.media_item:
+ title = media_item.name
if artist_str := getattr(media_item, "artist_str", None):
artist = artist_str
if _album := getattr(media_item, "album", None):
cliraop_bin: str | None = None
_players: dict[str, AirPlayPlayer]
- _discovery_running: bool = False
_dacp_server: asyncio.Server = None
_dacp_info: AsyncServiceInfo = None
_play_media_lock: asyncio.Lock = asyncio.Lock()
-"""
-Sonos Player provider for Music Assistant.
-
-Note that large parts of this code are copied over from the Home Assistant
-integratioon for Sonos.
-"""
+"""Sonos Player provider for Music Assistant for speakers running the S2 firmware."""
from __future__ import annotations
import asyncio
import logging
-from collections import OrderedDict
-from dataclasses import dataclass, field
+from time import time
from typing import TYPE_CHECKING
-import soco.config as soco_config
-from requests.exceptions import RequestException
-from soco import events_asyncio, zonegroupstate
-from soco.discovery import discover
-from sonos_websocket.exception import SonosWebsocketError
+from aiohttp import web
+from aiosonos.api.models import DiscoveryInfo as SonosDiscoveryInfo
+from aiosonos.api.models import PlayBackState as SonosPlayBackState
+from aiosonos.api.models import SonosCapability
+from aiosonos.client import SonosLocalApiClient
+from aiosonos.const import EventType as SonosEventType
+from aiosonos.const import SonosEvent
+from aiosonos.utils import get_discovery_info
+from zeroconf import IPVersion, ServiceStateChange
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE,
create_sample_rates_config_entry,
)
from music_assistant.common.models.enums import (
- ConfigEntryType,
PlayerFeature,
+ PlayerState,
PlayerType,
ProviderFeature,
+ RepeatMode,
)
-from music_assistant.common.models.errors import PlayerCommandFailed, PlayerUnavailableError
+from music_assistant.common.models.errors import PlayerCommandFailed
from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
-from music_assistant.constants import CONF_CROSSFADE, SYNCGROUP_PREFIX, VERBOSE_LOG_LEVEL
-from music_assistant.server.helpers.didl_lite import create_didl_metadata
+from music_assistant.constants import (
+ CONF_CROSSFADE,
+ MASS_LOGO_ONLINE,
+ SYNCGROUP_PREFIX,
+ VERBOSE_LOG_LEVEL,
+)
from music_assistant.server.helpers.util import TaskManager
from music_assistant.server.models.player_provider import PlayerProvider
-from .player import SonosPlayer
-
if TYPE_CHECKING:
- from soco.core import SoCo
+ from zeroconf.asyncio import AsyncServiceInfo
- from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
+ from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
+PLAYBACK_STATE_MAP = {
+ SonosPlayBackState.PLAYBACK_STATE_BUFFERING: PlayerState.PLAYING,
+ SonosPlayBackState.PLAYBACK_STATE_IDLE: PlayerState.IDLE,
+ SonosPlayBackState.PLAYBACK_STATE_PAUSED: PlayerState.PAUSED,
+ SonosPlayBackState.PLAYBACK_STATE_PLAYING: PlayerState.PLAYING,
+}
-PLAYER_FEATURES = (
+PLAYER_FEATURES_BASE = {
PlayerFeature.SYNC,
PlayerFeature.VOLUME_MUTE,
- PlayerFeature.VOLUME_SET,
PlayerFeature.ENQUEUE_NEXT,
PlayerFeature.PAUSE,
- PlayerFeature.PLAY_ANNOUNCEMENT,
-)
+}
-CONF_NETWORK_SCAN = "network_scan"
-SUBSCRIPTION_TIMEOUT = 1200
-ZGS_SUBSCRIPTION_TIMEOUT = 2
-
-
-S2_MODELS = (
- "Sonos Roam",
- "Sonos Arc",
- "Sonos Beam",
- "Sonos Five",
- "Sonos Move",
- "Sonos One SL",
- "Sonos Port",
- "Sonos Amp",
- "SYMFONISK Bookshelf",
- "SYMFONISK Table Lamp",
- "Sonos Era 100",
- "Sonos Era 300",
-)
-
-CONF_ENTRY_SAMPLE_RATES_SONOS_S2 = create_sample_rates_config_entry(48000, 24, 48000, 24, True)
-CONF_ENTRY_SAMPLE_RATES_SONOS_S1 = create_sample_rates_config_entry(48000, 16, 48000, 16, True)
+SOURCE_LINE_IN = "line_in"
+SOURCE_AIRPLAY = "airplay"
+SOURCE_SPOTIFY = "spotify"
+SOURCE_UNKNOWN = "unknown"
+SOURCE_RADIO = "radio"
async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
) -> ProviderInstanceType:
"""Initialize provider(instance) with given configuration."""
- # set event listener port to something other than 1400
- # to allow coextistence with HA on the same host
- soco_config.EVENT_LISTENER_PORT = 1700
- soco_config.EVENTS_MODULE = events_asyncio
- soco_config.REQUEST_TIMEOUT = 9.5
- soco_config.ZGT_EVENT_FALLBACK = False
- zonegroupstate.EVENT_CACHE_TIMEOUT = SUBSCRIPTION_TIMEOUT
prov = SonosPlayerProvider(mass, manifest, config)
- # set-up soco logging
+ # set-up aiosonos logging
if prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
- logging.getLogger("soco").setLevel(logging.DEBUG)
+ logging.getLogger("aiosonos").setLevel(logging.DEBUG)
else:
- logging.getLogger("soco").setLevel(prov.logger.level + 10)
+ logging.getLogger("aiosonos").setLevel(prov.logger.level + 10)
return prov
values: the (intermediate) raw values for config entries sent with the action.
"""
# ruff: noqa: ARG001
- return (
- ConfigEntry(
- key=CONF_NETWORK_SCAN,
- type=ConfigEntryType.BOOLEAN,
- label="Enable network scan for discovery",
- default_value=False,
- description="Enable network scan for discovery of players. \n"
- "Can be used if (some of) your players are not automatically discovered.",
- ),
- )
+ return ()
-@dataclass
-class UnjoinData:
- """Class to track data necessary for unjoin coalescing."""
+class SonosPlayer:
+ """Holds the details of the (discovered) Sonosplayer."""
- players: list[SonosPlayer]
- event: asyncio.Event = field(default_factory=asyncio.Event)
+ def __init__(
+ self,
+ prov: SonosPlayerProvider,
+ player_id: str,
+ discovery_info: SonosDiscoveryInfo,
+ ip_address: str,
+ ) -> None:
+ """Initialize the SonosPlayer."""
+ self.prov = prov
+ self.mass = prov.mass
+ self.player_id = player_id
+ self.discovery_info = discovery_info
+ self.ip_address = ip_address
+ self.logger = prov.logger.getChild(player_id)
+ self.connected: bool = False
+ self.client = SonosLocalApiClient(self.ip_address, self.mass.http_session)
+ self.mass_player: Player | None = None
+ self._listen_task: asyncio.Task | None = None
+ # Sonos speakers can optionally have airplay (most S2 speakers do)
+ # and this airplay player can also be a player within MA
+ # we can do some smart stuff if we link them together where possible
+ # the player if we can just guess from the sonos player id (mac address)
+ self._airplay_player_id = f"ap{self.player_id[7:-5].lower()}"
+
+ async def connect(self) -> None:
+ """Connect to the Sonos player."""
+ if self._listen_task and not self._listen_task.done():
+ self.logger.debug("Already connected to Sonos player: %s", self.player_id)
+ return
+ await self.client.connect()
+ self.connected = True
+ self.logger.debug("Connected to player API")
+ init_ready = asyncio.Event()
+
+ async def _listener() -> None:
+ try:
+ await self.client.start_listening(init_ready)
+ except Exception as err:
+ self.logger.exception("Error in Sonos player listener: %s", err)
+ if self.connected:
+ self.connected = False
+ self.mass.call_later(5, self.connect)
+ finally:
+ self.connected = False
+
+ self._listen_task = asyncio.create_task(_listener())
+ await init_ready.wait()
+
+ async def disconnect(self) -> None:
+ """Disconnect the client and cleanup."""
+ self.connected = False
+ if self._listen_task and not self._listen_task.done():
+ self._listen_task.cancel()
+ if self.client:
+ await self.client.disconnect()
+ self.logger.debug("Disconnected from player API")
+
+ def update_attributes(self) -> None: # noqa: PLR0915
+ """Update the player attributes."""
+ if not self.mass_player:
+ return
+ if self.client.player.has_fixed_volume:
+ self.mass_player.volume_level = 100
+ else:
+ self.mass_player.volume_level = self.client.player.volume_level or 100
+ self.mass_player.volume_muted = self.client.player.volume_muted
+
+ group_parent = None
+ if self.client.player.is_coordinator:
+ # player is group coordinator
+ active_group = self.client.player.group
+ self.mass_player.group_childs = (
+ self.client.player.group_members
+ if len(self.client.player.group_members) > 1
+ else set()
+ )
+ self.mass_player.synced_to = None
+ else:
+ # player is group child (synced to another player)
+ group_parent = self.prov.sonos_players.get(self.client.player.group.coordinator_id)
+ if not group_parent:
+ # handle race condition where the group parent is not yet discovered
+ return
+ active_group = group_parent.client.player.group
+ self.mass_player.group_childs = set()
+ self.mass_player.synced_to = active_group.coordinator_id
+ self.mass_player.active_source = active_group.coordinator_id
+
+ # map playback state
+ self.mass_player.state = PLAYBACK_STATE_MAP[active_group.playback_state]
+ if (
+ not self.mass_player.powered
+ and active_group.playback_state == SonosPlayBackState.PLAYBACK_STATE_PLAYING
+ ):
+ self.mass_player.powered = True
+
+ self.mass_player.elapsed_time = active_group.position
+ # work out 'can sync with' for this player
+ self.mass_player.can_sync_with = tuple(
+ x
+ for x in self.prov.sonos_players
+ if x != self.player_id
+ and x in self.prov.sonos_players
+ and self.prov.sonos_players[x].client.household_id == self.client.household_id
+ )
+
+ # figure out the active source based on the container
+ if container := active_group.playback_metadata.get("container"):
+ if group_parent and group_parent.mass_player:
+ self.mass_player.active_source = group_parent.mass_player.active_source
+ elif container.get("type") == "linein":
+ self.mass_player.active_source = SOURCE_LINE_IN
+ elif container.get("type") == "linein.airplay":
+ # check if the MA airplay player is active
+ airplay_player = self.mass.players.get(self._airplay_player_id)
+ if airplay_player and airplay_player.powered:
+ self.mass_player.active_source = airplay_player.active_source
+ else:
+ self.mass_player.active_source = SOURCE_AIRPLAY
+ elif container.get("type") == "station":
+ self.mass_player.active_source = SOURCE_RADIO
+ elif container.get("id", {}).get("objectId") == f"mass:queue:{self.player_id}":
+ # mass queue is active
+ self.mass_player.active_source = self.player_id
+ elif container.get("id", {}).get("serviceId") == "9":
+ self.mass_player.active_source = SOURCE_SPOTIFY
+ else:
+ self.mass_player.active_source = SOURCE_UNKNOWN
+
+ # parse current media
+ if (current_item := active_group.playback_metadata.get("currentItem")) and (
+ (track := current_item.get("track")) and track.get("name")
+ ):
+ track_images = track.get("images", [])
+ track_image_url = track_images[0].get("url") if track_images else None
+ track_duration_millis = track.get("durationMillis")
+ self.mass_player.current_media = PlayerMedia(
+ uri=track.get("id", {}).get("objectId") or track.get("mediaUrl"),
+ title=track["name"],
+ artist=track.get("artist", {}).get("name"),
+ album=track.get("album", {}).get("name"),
+ duration=track_duration_millis / 1000 if track_duration_millis else None,
+ image_url=track_image_url,
+ )
+ elif (
+ container and container.get("name") and active_group.playback_metadata.get("streamInfo")
+ ):
+ images = container.get("images", [])
+ image_url = images[0].get("url") if images else None
+ self.mass_player.current_media = PlayerMedia(
+ uri=container.get("id", {}).get("objectId"),
+ title=active_group.playback_metadata["streamInfo"],
+ album=container["name"],
+ image_url=image_url,
+ )
+ elif container and container.get("name") and container.get("id"):
+ images = container.get("images", [])
+ image_url = images[0].get("url") if images else None
+ self.mass_player.current_media = PlayerMedia(
+ uri=container["id"]["objectId"],
+ title=container["name"],
+ image_url=image_url,
+ )
+ else:
+ self.mass_player.current_media = None
class SonosPlayerProvider(PlayerProvider):
"""Sonos Player provider."""
- sonosplayers: dict[str, SonosPlayer] | None = None
- _discovery_running: bool = False
- _discovery_reschedule_timer: asyncio.TimerHandle | None = None
+ sonos_players: dict[str, SonosPlayer]
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
- self.sonosplayers: OrderedDict[str, SonosPlayer] = OrderedDict()
- self.topology_condition = asyncio.Condition()
- self.boot_counts: dict[str, int] = {}
- self.mdns_names: dict[str, str] = {}
- self.unjoin_data: dict[str, UnjoinData] = {}
- self._discovery_running = False
- self.hosts_in_error: dict[str, bool] = {}
- self.discovery_lock = asyncio.Lock()
- self.creation_lock = asyncio.Lock()
- self._known_invisible: set[SoCo] = set()
-
- async def loaded_in_mass(self) -> None:
- """Call after the provider has been loaded."""
- await self._run_discovery()
+ self.sonos_players: dict[str, SonosPlayer] = {}
+ self.mass.streams.register_dynamic_route(
+ "/sonos_queue/v2.3/itemWindow", self._handle_sonos_queue_itemwindow
+ )
+ self.mass.streams.register_dynamic_route(
+ "/sonos_queue/v2.3/version", self._handle_sonos_queue_version
+ )
+ self.mass.streams.register_dynamic_route(
+ "/sonos_queue/v2.3/context", self._handle_sonos_queue_context
+ )
+ self.mass.streams.register_dynamic_route(
+ "/sonos_queue/v2.3/timePlayed", self._handle_sonos_queue_time_played
+ )
async def unload(self) -> None:
"""Handle close/cleanup of the provider."""
- if self._discovery_reschedule_timer:
- self._discovery_reschedule_timer.cancel()
- self._discovery_reschedule_timer = None
- # await any in-progress discovery
- while self._discovery_running:
- await asyncio.sleep(0.5)
- await asyncio.gather(*(player.offline() for player in self.sonosplayers.values()))
- if events_asyncio.event_listener:
- await events_asyncio.event_listener.async_stop()
- self.sonosplayers = None
+ # disconnect all players
+ await asyncio.gather(*(player.disconnect() for player in self.sonos_players.values()))
+ self.sonos_players = None
+ self.mass.streams.unregister_dynamic_route("/sonos_queue/v2.3/itemWindow")
+ self.mass.streams.unregister_dynamic_route("/sonos_queue/v2.3/version")
+ self.mass.streams.unregister_dynamic_route("/sonos_queue/v2.3/context")
+ self.mass.streams.unregister_dynamic_route("/sonos_queue/v2.3/timePlayed")
+
+ async def on_mdns_service_state_change(
+ self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
+ ) -> None:
+ """Handle MDNS service state callback."""
+ if not info:
+ self.logger.error(
+ "No info in MDNS service state change for %s - state change: %s", name, state_change
+ )
+ return
+ if "uuid" not in info.decoded_properties:
+ # not a S2 player
+ return
+ name = name.split("@", 1)[1] if "@" in name else name
+ player_id = info.decoded_properties["uuid"]
+ # handle removed player
+ if state_change == ServiceStateChange.Removed:
+ if mass_player := self.mass.players.get(player_id):
+ if not mass_player.available:
+ return
+ # the player has become unavailable
+ self.logger.debug("Player offline: %s", mass_player.display_name)
+ mass_player.available = False
+ self.mass.players.update(player_id)
+ return
+ # handle update for existing device
+ if sonos_player := self.sonos_players.get(player_id):
+ if mass_player := self.mass.players.get(player_id):
+ cur_address = get_primary_ip_address(info)
+ if cur_address and cur_address != sonos_player.ip_address:
+ sonos_player.logger.debug(
+ "Address updated from %s to %s", sonos_player.ip_address, cur_address
+ )
+ sonos_player.ip_address = cur_address
+ mass_player.device_info = DeviceInfo(
+ model=mass_player.device_info.model,
+ manufacturer=mass_player.device_info.manufacturer,
+ address=str(cur_address),
+ )
+ if not mass_player.available:
+ self.logger.debug("Player back online: %s", mass_player.display_name)
+ sonos_player.client.player_ip = cur_address
+ await sonos_player.connect(self.mass.http_session)
+ mass_player.available = True
+ # always update the latest discovery info
+ sonos_player.discovery_info = info
+ self.mass.players.update(player_id)
+ return
+ # handle new player
+ await self._setup_player(player_id, name, info)
async def get_player_config_entries(
self,
) -> tuple[ConfigEntry, ...]:
"""Return Config Entries for the given player."""
base_entries = await super().get_player_config_entries(player_id)
- if not (sonos_player := self.sonosplayers.get(player_id)):
+ if not self.sonos_players.get(player_id):
# most probably a syncgroup
return (*base_entries, CONF_ENTRY_CROSSFADE, CONF_ENTRY_FLOW_MODE_HIDDEN_DISABLED)
- is_s2 = sonos_player.soco.speaker_info["model_name"] in S2_MODELS
return (
*base_entries,
CONF_ENTRY_CROSSFADE,
- ConfigEntry(
- key="sonos_bass",
- type=ConfigEntryType.INTEGER,
- label="Bass",
- default_value=sonos_player.bass,
- value=sonos_player.bass,
- range=(-10, 10),
- description="Set the Bass level for the Sonos player",
- category="advanced",
- ),
- ConfigEntry(
- key="sonos_treble",
- type=ConfigEntryType.INTEGER,
- label="Treble",
- default_value=sonos_player.treble,
- value=sonos_player.treble,
- range=(-10, 10),
- description="Set the Treble level for the Sonos player",
- category="advanced",
- ),
- ConfigEntry(
- key="sonos_loudness",
- type=ConfigEntryType.BOOLEAN,
- label="Loudness compensation",
- default_value=sonos_player.loudness,
- value=sonos_player.loudness,
- description="Enable loudness compensation on the Sonos player",
- category="advanced",
- ),
- CONF_ENTRY_SAMPLE_RATES_SONOS_S2 if is_s2 else CONF_ENTRY_SAMPLE_RATES_SONOS_S1,
CONF_ENTRY_FLOW_MODE_HIDDEN_DISABLED,
+ create_sample_rates_config_entry(48000, 24, 48000, 24, True),
)
- def on_player_config_changed(
- self,
- config: PlayerConfig,
- changed_keys: set[str],
- ) -> None:
- """Call (by config manager) when the configuration of a player changes."""
- super().on_player_config_changed(config, changed_keys)
- if "enabled" in changed_keys:
- # run discovery to catch any re-enabled players
- self.mass.create_task(self._run_discovery())
- if not (sonos_player := self.sonosplayers.get(config.player_id)):
- return
- if "values/sonos_bass" in changed_keys:
- self.mass.create_task(
- sonos_player.soco.renderingControl.SetBass,
- [("InstanceID", 0), ("DesiredBass", config.get_value("sonos_bass"))],
- )
- if "values/sonos_treble" in changed_keys:
- self.mass.create_task(
- sonos_player.soco.renderingControl.SetTreble,
- [("InstanceID", 0), ("DesiredTreble", config.get_value("sonos_treble"))],
- )
- if "values/sonos_loudness" in changed_keys:
- loudness_value = "1" if config.get_value("sonos_loudness") else "0"
- self.mass.create_task(
- sonos_player.soco.renderingControl.SetLoudness,
- [
- ("InstanceID", 0),
- ("Channel", "Master"),
- ("DesiredLoudness", loudness_value),
- ],
- )
-
- def is_device_invisible(self, ip_address: str) -> bool:
- """Check if device at provided IP is known to be invisible."""
- return any(x for x in self._known_invisible if x.ip_address == ip_address)
-
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
- sonos_player = self.sonosplayers[player_id]
- if sonos_player.sync_coordinator:
+ sonos_player = self.sonos_players[player_id]
+ if sonos_player.client.player.is_passive:
self.logger.debug(
"Ignore STOP command for %s: Player is synced to another player.",
- sonos_player.zone_name,
+ player_id,
)
return
- if "Stop" not in sonos_player.soco.available_actions:
- self.logger.debug(
- "Ignore STOP command for %s: Player reports this action is not available now.",
- sonos_player.zone_name,
- )
- await asyncio.to_thread(sonos_player.soco.stop)
+ await sonos_player.client.player.group.stop()
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY command to given player."""
- sonos_player = self.sonosplayers[player_id]
- if sonos_player.sync_coordinator:
+ sonos_player = self.sonos_players[player_id]
+ if sonos_player.client.player.is_passive:
self.logger.debug(
"Ignore PLAY command for %s: Player is synced to another player.",
player_id,
)
return
- if "Play" not in sonos_player.soco.available_actions:
- self.logger.debug(
- "Ignore STOP command for %s: Player reports this action is not available now.",
- sonos_player.zone_name,
- )
- await asyncio.to_thread(sonos_player.soco.play)
+ await sonos_player.client.player.group.play()
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player."""
- sonos_player = self.sonosplayers[player_id]
- if sonos_player.sync_coordinator:
+ sonos_player = self.sonos_players[player_id]
+ if sonos_player.client.player.is_passive:
self.logger.debug(
"Ignore PLAY command for %s: Player is synced to another player.",
player_id,
)
return
- if "Pause" not in sonos_player.soco.available_actions:
- # pause not possible
- await self.cmd_stop(player_id)
- return
- await asyncio.to_thread(sonos_player.soco.pause)
+ await sonos_player.client.player.group.pause()
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
-
- def set_volume_level(player_id: str, volume_level: int) -> None:
- sonos_player = self.sonosplayers[player_id]
- sonos_player.soco.volume = volume_level
-
- await asyncio.to_thread(set_volume_level, player_id, volume_level)
+ sonos_player = self.sonos_players[player_id]
+ await sonos_player.client.player.set_volume(volume_level)
async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
"""Send VOLUME MUTE command to given player."""
-
- def set_volume_mute(player_id: str, muted: bool) -> None:
- sonos_player = self.sonosplayers[player_id]
- sonos_player.soco.mute = muted
-
- await asyncio.to_thread(set_volume_mute, player_id, muted)
-
- async def cmd_sync_many(self, target_player: str, child_player_ids: list[str]) -> None:
- """Create temporary sync group by joining given players to target player."""
- sonos_master_player = self.sonosplayers[target_player]
- await sonos_master_player.join(
- [self.sonosplayers[player_id] for player_id in child_player_ids]
- )
+ sonos_player = self.sonos_players[player_id]
+ await sonos_player.client.player.set_volume(muted=muted)
async def cmd_sync(self, player_id: str, target_player: str) -> None:
"""Handle SYNC command for given player.
- player_id: player_id of the player to handle the command.
- target_player: player_id of the syncgroup master or group player.
"""
- sonos_player = self.sonosplayers[player_id]
- sonos_master_player = self.sonosplayers[target_player]
- await sonos_master_player.join([sonos_player])
+ await self.cmd_sync_many(target_player, [player_id])
+
+ async def cmd_sync_many(self, target_player: str, child_player_ids: list[str]) -> None:
+ """Create temporary sync group by joining given players to target player."""
+ sonos_player = self.sonos_players[target_player]
+ await sonos_player.client.player.group.modify_group_members(
+ player_ids_to_add=child_player_ids, player_ids_to_remove=[]
+ )
async def cmd_unsync(self, player_id: str) -> None:
"""Handle UNSYNC command for given player.
- player_id: player_id of the player to handle the command.
"""
- sonos_player = self.sonosplayers[player_id]
- await sonos_player.unjoin()
+ sonos_player = self.sonos_players[player_id]
+ await sonos_player.client.player.leave_group()
async def play_media(
self,
media: PlayerMedia,
) -> None:
"""Handle PLAY MEDIA on given player."""
- sonos_player = self.sonosplayers[player_id]
+ sonos_player = self.sonos_players[player_id]
mass_player = self.mass.players.get(player_id)
- if sonos_player.sync_coordinator:
+ if sonos_player.client.player.is_passive:
# this should be already handled by the player manager, but just in case...
msg = (
f"Player {mass_player.display_name} can not "
"accept play_media command, it is synced to another player."
)
raise PlayerCommandFailed(msg)
-
- didl_metadata = create_didl_metadata(media)
- await asyncio.to_thread(sonos_player.soco.play_uri, media.uri, meta=didl_metadata)
+ mass_queue = self.mass.player_queues.get(media.queue_id)
+
+ # create a sonos cloud queue and load it
+ cloud_queue_url = f"{self.mass.streams.base_url}/sonos_queue/v2.3/"
+ await sonos_player.client.player.group.play_cloud_queue(
+ cloud_queue_url,
+ http_authorization=media.queue_id,
+ item_id=media.queue_item_id,
+ queue_version=str(mass_queue.queue_items_last_updated),
+ )
async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle enqueuing of the next queue item on the player."""
- sonos_player = self.sonosplayers[player_id]
- didl_metadata = create_didl_metadata(media)
- # set crossfade according to player setting
+ sonos_player = self.sonos_players[player_id]
+ if session_id := sonos_player.client.player.group.active_session_id:
+ await sonos_player.client.api.playback_session.refresh_cloud_queue(session_id)
+ # sync play modes from player queue --> sonos
+ mass_queue = self.mass.player_queues.get(media.queue_id)
crossfade = await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
- if sonos_player.crossfade != crossfade:
-
- def set_crossfade() -> None:
- try:
- sonos_player.soco.cross_fade = crossfade
- sonos_player.crossfade = crossfade
- except Exception as err:
- self.logger.warning(
- "Unable to set crossfade for player %s: %s", sonos_player.zone_name, err
- )
-
- await asyncio.to_thread(set_crossfade)
-
- try:
- await asyncio.to_thread(
- sonos_player.soco.avTransport.SetNextAVTransportURI,
- [("InstanceID", 0), ("NextURI", media.uri), ("NextURIMetaData", didl_metadata)],
- timeout=60,
- )
- except Exception as err:
- self.logger.warning(
- "Unable to enqueue next track on player: %s: %s", sonos_player.zone_name, err
- )
- else:
- self.logger.debug(
- "Enqued next track (%s) to player %s",
- media.title or media.uri,
- sonos_player.soco.player_name,
+ repeat_single_enabled = mass_queue.repeat_mode == RepeatMode.ONE
+ repeat_all_enabled = mass_queue.repeat_mode == RepeatMode.ALL
+ play_modes = sonos_player.client.player.group.play_modes
+ if (
+ play_modes.crossfade != crossfade
+ or play_modes.repeat != repeat_all_enabled
+ or play_modes.repeat_one != repeat_single_enabled
+ ):
+ await sonos_player.client.player.group.set_play_modes(
+ crossfade=crossfade, repeat=repeat_all_enabled, repeat_one=repeat_single_enabled
)
async def play_announcement(
self.play_announcement(child_player_id, announcement, volume_level)
)
return
- sonos_player = self.sonosplayers[player_id]
+ sonos_player = self.sonos_players[player_id]
self.logger.debug(
"Playing announcement %s using websocket audioclip on %s",
announcement.uri,
sonos_player.zone_name,
)
volume_level = self.mass.players.get_announcement_volume(player_id, volume_level)
- try:
- response, _ = await sonos_player.websocket.play_clip(
- announcement.uri,
- volume=volume_level,
- )
- except SonosWebsocketError as exc:
- raise PlayerCommandFailed(f"Error when calling Sonos websocket: {exc}") from exc
- if response["success"]:
- return
+ await sonos_player.client.player.play_audio_clip(announcement.uri, volume_level)
- async def poll_player(self, player_id: str) -> None:
- """Poll player for state updates."""
- if player_id not in self.sonosplayers:
+ async def _setup_player(self, player_id: str, name: str, info: AsyncServiceInfo) -> None:
+ """Handle setup of a new player that is discovered using mdns."""
+ address = get_primary_ip_address(info)
+ if address is None:
return
- sonos_player = self.sonosplayers[player_id]
- try:
- # the check_poll logic will work out what endpoints need polling now
- # based on when we last received info from the device
- await sonos_player.check_poll()
- # always update the attributes
- sonos_player.update_player(signal_update=False)
- except ConnectionResetError as err:
- raise PlayerUnavailableError from err
-
- async def _run_discovery(self) -> None:
- """Discover Sonos players on the network."""
- if self._discovery_running:
+ if not self.mass.config.get_raw_player_config_value(player_id, "enabled", True):
+ self.logger.debug("Ignoring %s in discovery as it is disabled.", name)
return
+ if not (discovery_info := await get_discovery_info(self.mass.http_session, address)):
+ self.logger.debug("Ignoring %s in discovery as it is not reachable.", name)
+ return
+ display_name = discovery_info["device"].get("name") or name
+ if SonosCapability.PLAYBACK not in discovery_info["device"]["capabilities"]:
+ # this will happen for satellite speakers in a surround/stereo setup
+ self.logger.debug(
+ "Ignoring %s in discovery as it is a passive satellite.", display_name
+ )
+ return
+ self.logger.debug("Discovered Sonos device %s on %s", name, address)
+ self.sonos_players[player_id] = sonos_player = SonosPlayer(
+ self, player_id, discovery_info=discovery_info, ip_address=address
+ )
+ # connect the player first so we can fail early
+ await sonos_player.connect()
+
+ # collect supported features
+ supported_features = set(PLAYER_FEATURES_BASE)
+ if SonosCapability.AUDIO_CLIP in discovery_info["device"]["capabilities"]:
+ supported_features.add(PlayerFeature.PLAY_ANNOUNCEMENT)
+ if not sonos_player.client.player.has_fixed_volume:
+ supported_features.add(PlayerFeature.VOLUME_SET)
+
+ sonos_player.mass_player = mass_player = Player(
+ player_id=player_id,
+ provider=self.instance_id,
+ type=PlayerType.PLAYER,
+ name=display_name,
+ available=True,
+ # treat as powered at start if the player is playing/paused
+ powered=sonos_player.client.player.group.playback_state
+ in (
+ SonosPlayBackState.PLAYBACK_STATE_PLAYING,
+ SonosPlayBackState.PLAYBACK_STATE_BUFFERING,
+ SonosPlayBackState.PLAYBACK_STATE_PAUSED,
+ ),
+ device_info=DeviceInfo(
+ model=discovery_info["device"]["modelDisplayName"],
+ manufacturer=self.manifest.name,
+ address=address,
+ ),
+ supported_features=tuple(supported_features),
+ )
+ sonos_player.update_attributes()
+ self.mass.players.register_or_update(mass_player)
+
+ # register callback for state changed
+ def on_player_event(event: SonosEvent) -> None:
+ """Handle incoming event from player."""
+ sonos_player.update_attributes()
+ self.mass.players.update(player_id)
+
+ sonos_player.client.subscribe(
+ on_player_event,
+ (
+ SonosEventType.GROUP_UPDATED,
+ SonosEventType.PLAYER_UPDATED,
+ ),
+ )
+ # when we add a new player, update 'can_sync_with' for all other players
+ for other_player_id in self.sonos_players:
+ if other_player_id == player_id:
+ continue
+ self.sonos_players[other_player_id].update_attributes()
- allow_network_scan = self.config.get_value(CONF_NETWORK_SCAN)
-
- def do_discover() -> None:
- """Run discovery and add players in executor thread."""
- self._discovery_running = True
- try:
- self.logger.debug("Sonos discovery started...")
- discovered_devices: set[SoCo] = discover(allow_network_scan=allow_network_scan)
- if discovered_devices is None:
- discovered_devices = set()
- # process new players
- for soco in discovered_devices:
- try:
- self._add_player(soco)
- except RequestException as err:
- # player is offline
- self.logger.debug("Failed to add SonosPlayer %s: %s", soco, err)
- except Exception as err:
- self.logger.warning(
- "Failed to add SonosPlayer %s: %s",
- soco,
- err,
- exc_info=err if self.logger.isEnabledFor(10) else None,
- )
- finally:
- self._discovery_running = False
-
- await self.mass.create_task(do_discover)
+ async def _handle_sonos_queue_itemwindow(self, request: web.Request) -> web.Response:
+ """
+ Handle the Sonos CloudQueue ItemWindow endpoint.
- def reschedule() -> None:
- self._discovery_reschedule_timer = None
- self.mass.create_task(self._run_discovery())
+ https://docs.sonos.com/reference/itemwindow
+ """
+ self.logger.log(VERBOSE_LOG_LEVEL, "Cloud Queue ItemWindow request: %s", request.query)
+ sonos_playback_id = request.headers["X-Sonos-Playback-Id"]
+ sonos_player_id = sonos_playback_id.split(":")[0]
+ upcoming_window_size = int(request.query.get("upcomingWindowSize") or 10)
+ previous_window_size = int(request.query.get("previousWindowSize") or 10)
+ queue_version = request.query.get("queueVersion")
+ context_version = request.query.get("contextVersion")
+ queue = self.mass.player_queues.get(sonos_player_id)
+ if item_id := request.query.get("itemId"):
+ queue_index = self.mass.player_queues.index_by_id(queue.queue_id, item_id)
+ else:
+ queue_index = queue.current_index or 0
+ offset = max(queue_index - previous_window_size, 0)
+ queue_items = self.mass.player_queues.items(
+ sonos_player_id,
+ limit=upcoming_window_size + previous_window_size,
+ offset=max(queue_index - previous_window_size, 0),
+ )
+ sonos_queue_items = [
+ {
+ "id": item.queue_item_id,
+ "deleted": not item.media_item.available,
+ "policies": {},
+ "track": {
+ "type": "track",
+ "mediaUrl": self.mass.streams.resolve_stream_url(item),
+ "contentType": "audio/flac",
+ "service": {"name": "Music Assistant", "id": "8", "accountId": ""},
+ "name": item.name,
+ "imageUrl": self.mass.metadata.get_image_url(
+ item.image, prefer_proxy=False, image_format="jpeg"
+ )
+ if item.image
+ else None,
+ "durationMillis": item.duration * 1000 if item.duration else None,
+ "artist": {
+ "name": item.media_item.artist_str,
+ }
+ if item.media_item and item.media_item.artist_str
+ else None,
+ "album": {
+ "name": item.media_item.album.name,
+ }
+ if item.media_item and item.media_item.album
+ else None,
+ "quality": {
+ "bitDepth": item.streamdetails.audio_format.bit_depth,
+ "sampleRate": item.streamdetails.audio_format.sample_rate,
+ "codec": item.streamdetails.audio_format.content_type.value,
+ "lossless": item.streamdetails.audio_format.content_type.is_lossless(),
+ }
+ if item.streamdetails
+ else None,
+ },
+ }
+ for item in queue_items
+ ]
+ result = {
+ "includesBeginningOfQueue": offset == 0,
+ "includesEndOfQueue": queue.items <= (queue_index + len(sonos_queue_items)),
+ "contextVersion": context_version,
+ "queueVersion": queue_version,
+ "items": sonos_queue_items,
+ }
+ return web.json_response(result)
+
+ async def _handle_sonos_queue_version(self, request: web.Request) -> web.Response:
+ """
+ Handle the Sonos CloudQueue Version endpoint.
- # reschedule self once finished
- self._discovery_reschedule_timer = self.mass.loop.call_later(1800, reschedule)
+ https://docs.sonos.com/reference/version
+ """
+ self.logger.log(VERBOSE_LOG_LEVEL, "Cloud Queue Version request: %s", request.query)
+ sonos_playback_id = request.headers["X-Sonos-Playback-Id"]
+ sonos_player_id = sonos_playback_id.split(":")[0]
+ queue = self.mass.player_queues.get(sonos_player_id)
+ context_version = request.query.get("contextVersion") or "1"
+ queue_version = str(queue.queue_items_last_updated)
+ result = {"contextVersion": context_version, "queueVersion": queue_version}
+ return web.json_response(result)
+
+ async def _handle_sonos_queue_context(self, request: web.Request) -> web.Response:
+ """
+ Handle the Sonos CloudQueue Context endpoint.
- def _add_player(self, soco: SoCo) -> None:
- """Add discovered Sonos player."""
- player_id = soco.uid
- # check if existing player changed IP
- if existing := self.sonosplayers.get(player_id):
- if existing.soco.ip_address != soco.ip_address:
- existing.update_ip(soco.ip_address)
- return
- if not soco.is_visible:
- return
- enabled = self.mass.config.get_raw_player_config_value(player_id, "enabled", True)
- if not enabled:
- self.logger.debug("Ignoring disabled player: %s", player_id)
- return
+ https://docs.sonos.com/reference/context
+ """
+ self.logger.log(VERBOSE_LOG_LEVEL, "Cloud Queue Context request: %s", request.query)
+ sonos_playback_id = request.headers["X-Sonos-Playback-Id"]
+ sonos_player_id = sonos_playback_id.split(":")[0]
+ queue = self.mass.player_queues.get(sonos_player_id)
+ result = {
+ "contextVersion": "1",
+ "queueVersion": str(queue.queue_items_last_updated),
+ "container": {
+ "type": "playlist",
+ "name": "Music Assistant",
+ "imageUrl": MASS_LOGO_ONLINE,
+ "service": {"name": "Music Assistant", "id": "mass"},
+ "id": {
+ "serviceId": "mass",
+ "objectId": f"mass:queue:{queue.queue_id}",
+ "accountId": "",
+ },
+ },
+ "reports": {"sendUpdateAfterMillis": 0, "sendPlaybackActions": True},
+ "playbackPolicies": {
+ "canSkip": True,
+ "limitedSkips": False,
+ "canSkipToItem": True,
+ "canSkipBack": True,
+ "canSeek": False, # somehow not working correctly, investigate later
+ "canRepeat": True,
+ "canRepeatOne": True,
+ "canCrossfade": True,
+ "canShuffle": False, # handled by our queue controller itself
+ "showNNextTracks": 5,
+ "showNPreviousTracks": 5,
+ },
+ }
+ return web.json_response(result)
+
+ async def _handle_sonos_queue_time_played(self, request: web.Request) -> web.Response:
+ """
+ Handle the Sonos CloudQueue TimePlayed endpoint.
- speaker_info = soco.get_speaker_info(True, timeout=7)
- if soco.uid not in self.boot_counts:
- self.boot_counts[soco.uid] = soco.boot_seqnum
- self.logger.debug("Adding new player: %s", speaker_info)
- if not (mass_player := self.mass.players.get(soco.uid)):
- mass_player = Player(
- player_id=soco.uid,
- provider=self.instance_id,
- type=PlayerType.PLAYER,
- name=soco.player_name,
- available=True,
- powered=False,
- supported_features=PLAYER_FEATURES,
- device_info=DeviceInfo(
- model=speaker_info["model_name"],
- address=soco.ip_address,
- manufacturer="SONOS",
- ),
- needs_poll=True,
- poll_interval=120,
- )
- self.sonosplayers[player_id] = sonos_player = SonosPlayer(
- self,
- soco=soco,
- mass_player=mass_player,
- )
- if soco.fixed_volume:
- mass_player.supported_features = tuple(
- x for x in mass_player.supported_features if x != PlayerFeature.VOLUME_SET
- )
- sonos_player.setup()
- self.mass.loop.call_soon_threadsafe(
- self.mass.players.register_or_update, sonos_player.mass_player
- )
+ https://docs.sonos.com/reference/timeplayed
+ """
+ self.logger.log(VERBOSE_LOG_LEVEL, "Cloud Queue TimePlayed request: %s", request.query)
+ json_body = await request.json()
+ sonos_playback_id = request.headers["X-Sonos-Playback-Id"]
+ sonos_player_id = sonos_playback_id.split(":")[0]
+ mass_player = self.mass.players.get(sonos_player_id)
+ for item in json_body["items"]:
+ if "positionMillis" not in item:
+ continue
+ if mass_player.current_media:
+ mass_player.current_media.queue_item_id = item["id"]
+ mass_player.current_media.uri = item["mediaUrl"]
+ mass_player.current_media.queue_id = sonos_playback_id
+ mass_player.elapsed_time = item["positionMillis"] / 1000
+ mass_player.elapsed_time_last_updated = time()
+ break
+ return web.Response(status=204)
+
+
+def get_primary_ip_address(discovery_info: AsyncServiceInfo) -> str | None:
+ """Get primary IP address from zeroconf discovery info."""
+ for address in discovery_info.parsed_addresses(IPVersion.V4Only):
+ if address.startswith("127"):
+ # filter out loopback address
+ continue
+ if address.startswith("169.254"):
+ # filter out APIPA address
+ continue
+ return address
+ return None
+++ /dev/null
-"""Helper methods for common tasks."""
-
-from __future__ import annotations
-
-import logging
-from collections.abc import Callable
-from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, overload
-
-from soco import SoCo
-from soco.exceptions import SoCoException, SoCoUPnPException
-
-from music_assistant.common.models.errors import PlayerCommandFailed
-
-if TYPE_CHECKING:
- from . import SonosPlayer
-
-
-UID_PREFIX = "RINCON_"
-UID_POSTFIX = "01400"
-
-_LOGGER = logging.getLogger(__name__)
-
-_T = TypeVar("_T", bound="SonosPlayer")
-_R = TypeVar("_R")
-_P = ParamSpec("_P")
-
-_FuncType = Callable[Concatenate[_T, _P], _R]
-_ReturnFuncType = Callable[Concatenate[_T, _P], _R | None]
-
-
-class SonosUpdateError(PlayerCommandFailed):
- """Update failed."""
-
-
-@overload
-def soco_error(
- errorcodes: None = ...,
-) -> Callable[[_FuncType[_T, _P, _R]], _FuncType[_T, _P, _R]]: ...
-
-
-@overload
-def soco_error(
- errorcodes: list[str],
-) -> Callable[[_FuncType[_T, _P, _R]], _ReturnFuncType[_T, _P, _R]]: ...
-
-
-def soco_error(
- errorcodes: list[str] | None = None,
-) -> Callable[[_FuncType[_T, _P, _R]], _ReturnFuncType[_T, _P, _R]]:
- """Filter out specified UPnP errors and raise exceptions for service calls."""
-
- def decorator(funct: _FuncType[_T, _P, _R]) -> _ReturnFuncType[_T, _P, _R]:
- """Decorate functions."""
-
- def wrapper(self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R | None:
- """Wrap for all soco UPnP exception."""
- args_soco = next((arg for arg in args if isinstance(arg, SoCo)), None)
- try:
- result = funct(self, *args, **kwargs)
- except (OSError, SoCoException, SoCoUPnPException, TimeoutError) as err:
- error_code = getattr(err, "error_code", None)
- function = funct.__qualname__
- if errorcodes and error_code in errorcodes:
- _LOGGER.debug("Error code %s ignored in call to %s", error_code, function)
- return None
-
- if (target := _find_target_identifier(self, args_soco)) is None:
- msg = "Unexpected use of soco_error"
- raise RuntimeError(msg) from err
-
- message = f"Error calling {function} on {target}: {err}"
- raise SonosUpdateError(message) from err
-
- return result
-
- return wrapper
-
- return decorator
-
-
-def _find_target_identifier(instance: Any, fallback_soco: SoCo | None) -> str | None:
- """Extract the best available target identifier from the provided instance object."""
- if zone_name := getattr(instance, "zone_name", None):
- # SonosPlayer instance
- return zone_name
- if soco := getattr(instance, "soco", fallback_soco):
- # Holds a SoCo instance attribute
- # Only use attributes with no I/O
- return soco._player_name or soco.ip_address # pylint: disable=protected-access
- return None
-
-
-def hostname_to_uid(hostname: str) -> str:
- """Convert a Sonos hostname to a uid."""
- if hostname.startswith("Sonos-"):
- baseuid = hostname.removeprefix("Sonos-").replace(".local.", "")
- elif hostname.startswith("sonos"):
- baseuid = hostname.removeprefix("sonos").replace(".local.", "")
- else:
- msg = f"{hostname} is not a sonos device."
- raise ValueError(msg)
- return f"{UID_PREFIX}{baseuid}{UID_POSTFIX}"
-
-
-def sync_get_visible_zones(soco: SoCo) -> set[SoCo]:
- """Ensure I/O attributes are cached and return visible zones."""
- _ = soco.household_id
- _ = soco.uid
- return soco.visible_zones
"domain": "sonos",
"name": "SONOS",
"description": "SONOS Player provider for Music Assistant.",
- "codeowners": [
- "@music-assistant"
- ],
- "requirements": [
- "soco==0.30.4",
- "sonos-websocket==0.1.3",
- "defusedxml==0.7.1"
- ],
+ "codeowners": ["@music-assistant"],
+ "requirements": ["aiosonos==0.1.1"],
"documentation": "https://music-assistant.io/player-support/sonos/",
"multi_instance": false,
- "builtin": false
+ "builtin": false,
+ "mdns_discovery": ["_sonos._tcp.local."]
}
+++ /dev/null
-"""
-Sonos Player provider for Music Assistant: SonosPlayer object/model.
-
-Note that large parts of this code are copied over from the Home Assistant
-integration for Sonos.
-"""
-
-from __future__ import annotations
-
-import asyncio
-import contextlib
-import datetime
-import logging
-import time
-from collections.abc import Callable, Coroutine
-from typing import TYPE_CHECKING, Any
-
-from soco import SoCoException
-from soco.core import (
- MUSIC_SRC_AIRPLAY,
- MUSIC_SRC_LINE_IN,
- MUSIC_SRC_RADIO,
- MUSIC_SRC_SPOTIFY_CONNECT,
- MUSIC_SRC_TV,
- SoCo,
-)
-from soco.data_structures import DidlAudioBroadcast, DidlPlaylistContainer
-from sonos_websocket import SonosWebsocket
-
-from music_assistant.common.helpers.datetime import utc
-from music_assistant.common.models.enums import PlayerFeature, PlayerState
-from music_assistant.common.models.errors import PlayerCommandFailed
-from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.constants import VERBOSE_LOG_LEVEL
-
-from .helpers import SonosUpdateError, soco_error
-
-if TYPE_CHECKING:
- from soco.events_base import Event as SonosEvent
- from soco.events_base import SubscriptionBase
-
- from . import SonosPlayerProvider
-
-CALLBACK_TYPE = Callable[[], None]
-LOGGER = logging.getLogger(__name__)
-
-PLAYER_FEATURES = (
- PlayerFeature.SYNC,
- PlayerFeature.VOLUME_MUTE,
- PlayerFeature.VOLUME_SET,
- PlayerFeature.ENQUEUE_NEXT,
-)
-DURATION_SECONDS = "duration_in_s"
-POSITION_SECONDS = "position_in_s"
-SUBSCRIPTION_TIMEOUT = 1200
-ZGS_SUBSCRIPTION_TIMEOUT = 2
-AVAILABILITY_CHECK_INTERVAL = datetime.timedelta(minutes=1)
-AVAILABILITY_TIMEOUT = AVAILABILITY_CHECK_INTERVAL.total_seconds() * 4.5
-SONOS_STATE_PLAYING = "PLAYING"
-SONOS_STATE_TRANSITIONING = "TRANSITIONING"
-NEVER_TIME = -1200.0
-RESUB_COOLDOWN_SECONDS = 10.0
-SUBSCRIPTION_SERVICES = {
- # "alarmClock",
- "avTransport",
- # "contentDirectory",
- "deviceProperties",
- "renderingControl",
- "zoneGroupTopology",
-}
-SUPPORTED_VANISH_REASONS = ("powered off", "sleeping", "switch to bluetooth", "upgrade")
-UNUSED_DEVICE_KEYS = ["SPID", "TargetRoomName"]
-LINEIN_SOURCES = (MUSIC_SRC_TV, MUSIC_SRC_LINE_IN)
-SOURCE_AIRPLAY = "AirPlay"
-SOURCE_LINEIN = "Line-in"
-SOURCE_SPOTIFY_CONNECT = "Spotify Connect"
-SOURCE_TV = "TV"
-SOURCE_MAPPING = {
- MUSIC_SRC_AIRPLAY: SOURCE_AIRPLAY,
- MUSIC_SRC_TV: SOURCE_TV,
- MUSIC_SRC_LINE_IN: SOURCE_LINEIN,
- MUSIC_SRC_SPOTIFY_CONNECT: SOURCE_SPOTIFY_CONNECT,
-}
-
-
-class SonosSubscriptionsFailed(PlayerCommandFailed):
- """Subscription creation failed."""
-
-
-class SonosPlayer:
- """Wrapper around Sonos/SoCo with some additional attributes."""
-
- def __init__(
- self,
- sonos_prov: SonosPlayerProvider,
- soco: SoCo,
- mass_player: Player,
- ) -> None:
- """Initialize SonosPlayer instance."""
- self.sonos_prov = sonos_prov
- self.mass = sonos_prov.mass
- self.player_id = soco.uid
- self.soco = soco
- self.logger = sonos_prov.logger
- self.household_id: str = soco.household_id
- self.subscriptions: list[SubscriptionBase] = []
- self.websocket: SonosWebsocket | None = None
- self.mass_player: Player = mass_player
- self.available: bool = True
- # cached attributes
- self.crossfade: bool = False
- self.play_mode: str | None = None
- self.playback_status: str | None = None
- self.channel: str | None = None
- self.duration: float | None = None
- self.image_url: str | None = None
- self.source_name: str | None = None
- self.title: str | None = None
- self.uri: str | None = None
- self.position: int | None = None
- self.position_updated_at: datetime.datetime | None = None
- self.loudness: bool = False
- self.bass: int = 0
- self.treble: int = 0
- # Subscriptions and events
- self._subscriptions: list[SubscriptionBase] = []
- self._subscription_lock: asyncio.Lock | None = None
- self._last_activity: float = NEVER_TIME
- self._resub_cooldown_expires_at: float | None = None
- # Grouping
- self.sync_coordinator: SonosPlayer | None = None
- self.group_members: list[SonosPlayer] = [self]
- self.group_members_ids: list[str] = []
- self._group_members_missing: set[str] = set()
-
- def __hash__(self) -> int:
- """Return a hash of self."""
- return hash(self.player_id)
-
- @property
- def zone_name(self) -> str:
- """Return zone name."""
- if self.mass_player:
- return self.mass_player.display_name
- return self.soco.speaker_info["zone_name"]
-
- @property
- def subscription_address(self) -> str:
- """Return the current subscription callback address."""
- assert len(self._subscriptions) > 0
- addr, port = self._subscriptions[0].event_listener.address
- return ":".join([addr, str(port)])
-
- @property
- def missing_subscriptions(self) -> set[str]:
- """Return a list of missing service subscriptions."""
- subscribed_services = {sub.service.service_type for sub in self._subscriptions}
- return SUBSCRIPTION_SERVICES - subscribed_services
-
- @property
- def should_poll(self) -> bool:
- """Return if this player should be polled/pinged."""
- if not self.available:
- return True
- return (time.monotonic() - self._last_activity) > 120
-
- def setup(self) -> None:
- """Run initial setup of the speaker (NOT async friendly)."""
- if self.soco.is_coordinator:
- self.crossfade = self.soco.cross_fade
- self.mass_player.volume_level = self.soco.volume
- self.mass_player.volume_muted = self.soco.mute
- self.loudness = self.soco.loudness
- self.bass = self.soco.bass
- self.treble = self.soco.treble
- self.update_groups()
- if not self.sync_coordinator:
- self.poll_media()
-
- async def do_async_setup() -> None:
- """Complete setup in async context."""
- self.websocket = SonosWebsocket(
- self.soco.ip_address,
- player_id=self.soco.uid,
- session=self.mass.http_session,
- )
-
- future = asyncio.run_coroutine_threadsafe(do_async_setup(), self.mass.loop)
- future.result(timeout=10)
- asyncio.run_coroutine_threadsafe(self.subscribe(), self.mass.loop)
-
- async def offline(self) -> None:
- """Handle removal of speaker when unavailable."""
- if not self.available:
- return
-
- if self._resub_cooldown_expires_at is None and not self.mass.closing:
- self._resub_cooldown_expires_at = time.monotonic() + RESUB_COOLDOWN_SECONDS
- self.logger.debug("Starting resubscription cooldown for %s", self.zone_name)
-
- self.available = False
- self.mass_player.available = False
- self.mass.players.update(self.player_id)
- self._share_link_plugin = None
-
- await self.unsubscribe()
-
- def log_subscription_result(self, result: Any, event: str, level: int = logging.DEBUG) -> None:
- """Log a message if a subscription action (create/renew/stop) results in an exception."""
- if not isinstance(result, Exception):
- return
-
- if isinstance(result, asyncio.exceptions.TimeoutError):
- message = "Request timed out"
- exc_info = None
- else:
- message = str(result)
- exc_info = result if not str(result) else None
-
- self.logger.log(
- level,
- "%s failed for %s: %s",
- event,
- self.zone_name,
- message,
- exc_info=exc_info if self.logger.isEnabledFor(10) else None,
- )
-
- async def subscribe(self) -> None:
- """Initiate event subscriptions under an async lock."""
- if not self._subscription_lock:
- self._subscription_lock = asyncio.Lock()
-
- async with self._subscription_lock:
- try:
- # Create event subscriptions.
- subscriptions = [
- self._subscribe_target(getattr(self.soco, service), self._handle_event)
- for service in self.missing_subscriptions
- ]
- if not subscriptions:
- return
- self.logger.log(VERBOSE_LOG_LEVEL, "Creating subscriptions for %s", self.zone_name)
- results = await asyncio.gather(*subscriptions, return_exceptions=True)
- for result in results:
- self.log_subscription_result(result, "Creating subscription", logging.WARNING)
- if any(isinstance(result, Exception) for result in results):
- raise SonosSubscriptionsFailed
- except SonosSubscriptionsFailed:
- self.logger.warning("Creating subscriptions failed for %s", self.zone_name)
- assert self._subscription_lock is not None
- async with self._subscription_lock:
- await self.offline()
-
- async def unsubscribe(self) -> None:
- """Cancel all subscriptions."""
- if not self._subscriptions:
- return
- self.logger.log(VERBOSE_LOG_LEVEL, "Unsubscribing from events for %s", self.zone_name)
- results = await asyncio.gather(
- *(subscription.unsubscribe() for subscription in self._subscriptions),
- return_exceptions=True,
- )
- for result in results:
- self.log_subscription_result(result, "Unsubscribe")
- self._subscriptions = []
-
- async def check_poll(self) -> None:
- """Validate availability of the speaker based on recent activity."""
- if not self.should_poll:
- return
- self.logger.log(VERBOSE_LOG_LEVEL, "Polling player for availability...")
- try:
- await asyncio.to_thread(self.ping)
- self._speaker_activity("ping")
- except SonosUpdateError:
- if not self.available:
- return # already offline
- self.logger.warning(
- "No recent activity and cannot reach %s, marking unavailable",
- self.zone_name,
- )
- await self.offline()
-
- def update_ip(self, ip_address: str) -> None:
- """Handle updated IP of a Sonos player (NOT async friendly)."""
- if self.available:
- return
- self.logger.debug(
- "Player IP-address changed from %s to %s", self.soco.ip_address, ip_address
- )
- try:
- self.ping()
- except SonosUpdateError:
- return
- self.soco.ip_address = ip_address
- self.setup()
- self.mass_player.device_info = DeviceInfo(
- model=self.mass_player.device_info.model,
- address=ip_address,
- manufacturer=self.mass_player.device_info.manufacturer,
- )
- self.update_player()
-
- @soco_error()
- def ping(self) -> None:
- """Test device availability. Failure will raise SonosUpdateError."""
- self.soco.renderingControl.GetVolume([("InstanceID", 0), ("Channel", "Master")], timeout=1)
-
- async def join(
- self,
- members: list[SonosPlayer],
- ) -> None:
- """Sync given players/speakers with this player."""
- async with self.sonos_prov.topology_condition:
- group: list[SonosPlayer] = await self.mass.create_task(self._join, members)
- await self.wait_for_groups([group])
-
- async def unjoin(self) -> None:
- """Unjoin player from all/any groups."""
- async with self.sonos_prov.topology_condition:
- await self.mass.create_task(self._unjoin)
- await self.wait_for_groups([[self]])
-
- def update_player(self, signal_update: bool = True) -> None:
- """Update Sonos Player."""
- self._update_attributes()
- if signal_update:
- # send update to the player manager right away only if we are triggered from an event
- # when we're just updating from a manual poll, the player manager
- # will detect changes to the player object itself
- self.mass.loop.call_soon_threadsafe(self.sonos_prov.mass.players.update, self.player_id)
-
- @soco_error()
- def poll_track_info(self) -> dict[str, Any]:
- """Poll the speaker for current track info.
-
- Add converted position values (NOT async fiendly).
- """
- track_info: dict[str, Any] = self.soco.get_current_track_info()
- track_info[DURATION_SECONDS] = _timespan_secs(track_info.get("duration"))
- track_info[POSITION_SECONDS] = _timespan_secs(track_info.get("position"))
- return track_info
-
- @soco_error()
- def poll_media(self) -> None:
- """Poll information about currently playing media."""
- transport_info = self.soco.get_current_transport_info()
- new_status = transport_info["current_transport_state"]
-
- if new_status == SONOS_STATE_TRANSITIONING:
- return
-
- update_position = new_status != self.playback_status
- self.playback_status = new_status
- self.play_mode = self.soco.play_mode
- self._set_basic_track_info(update_position=update_position)
- self.update_player()
-
- async def _subscribe_target(self, target: SubscriptionBase, sub_callback: Callable) -> None:
- """Create a Sonos subscription for given target."""
- subscription = await target.subscribe(
- auto_renew=True, requested_timeout=SUBSCRIPTION_TIMEOUT
- )
-
- def on_renew_failed(exception: Exception) -> None:
- """Handle a failed subscription renewal callback."""
- self.mass.create_task(self._renew_failed(exception))
-
- subscription.callback = sub_callback
- subscription.auto_renew_fail = on_renew_failed
- self._subscriptions.append(subscription)
-
- async def _renew_failed(self, exception: Exception) -> None:
- """Mark the speaker as offline after a subscription renewal failure.
-
- This is to reset the state to allow a future clean subscription attempt.
- """
- if not self.available:
- return
-
- self.log_subscription_result(exception, "Subscription renewal", logging.WARNING)
- await self.offline()
-
- def _handle_event(self, event: SonosEvent) -> None:
- """Handle SonosEvent callback."""
- service_type: str = event.service.service_type
- self._speaker_activity(f"{service_type} subscription")
-
- if service_type == "DeviceProperties":
- self.update_player()
- return
- if service_type == "AVTransport":
- self._handle_avtransport_event(event)
- return
- if service_type == "RenderingControl":
- self._handle_rendering_control_event(event)
- return
- if service_type == "ZoneGroupTopology":
- self._handle_zone_group_topology_event(event)
- return
-
- def _handle_avtransport_event(self, event: SonosEvent) -> None:
- """Update information about currently playing media from an event."""
- # NOTE: The new coordinator can be provided in a media update event but
- # before the ZoneGroupState updates. If this happens the playback
- # state will be incorrect and should be ignored. Switching to the
- # new coordinator will use its media. The regrouping process will
- # be completed during the next ZoneGroupState update.
- av_transport_uri = event.variables.get("av_transport_uri", "")
- current_track_uri = event.variables.get("current_track_uri", "")
- if av_transport_uri == current_track_uri and av_transport_uri.startswith("x-rincon:"):
- new_coordinator_uid = av_transport_uri.split(":")[-1]
- if new_coordinator_speaker := self.sonos_prov.sonosplayers.get(new_coordinator_uid):
- self.logger.log(
- 5,
- "Media update coordinator (%s) received for %s",
- new_coordinator_speaker.zone_name,
- self.zone_name,
- )
- self.sync_coordinator = new_coordinator_speaker
- else:
- self.logger.debug(
- "Media update coordinator (%s) for %s not yet available",
- new_coordinator_uid,
- self.zone_name,
- )
- return
-
- if crossfade := event.variables.get("current_crossfade_mode"):
- self.crossfade = bool(int(crossfade))
-
- # Missing transport_state indicates a transient error
- if (new_status := event.variables.get("transport_state")) is None:
- return
-
- # Ignore transitions, we should get the target state soon
- if new_status == SONOS_STATE_TRANSITIONING:
- return
-
- evars = event.variables
- new_status = evars["transport_state"]
- state_changed = new_status != self.playback_status
-
- self.play_mode = evars["current_play_mode"]
- self.playback_status = new_status
-
- track_uri = evars["enqueued_transport_uri"] or evars["current_track_uri"]
- audio_source = self.soco.music_source_from_uri(track_uri)
-
- self._set_basic_track_info(update_position=state_changed)
-
- if (ct_md := evars["current_track_meta_data"]) and not self.image_url:
- if album_art_uri := getattr(ct_md, "album_art_uri", None):
- # TODO: handle library mess here
- self.image_url = album_art_uri
-
- et_uri_md = evars["enqueued_transport_uri_meta_data"]
- if isinstance(et_uri_md, DidlPlaylistContainer):
- self.playlist_name = et_uri_md.title
-
- if queue_size := evars.get("number_of_tracks", 0):
- self.queue_size = int(queue_size)
-
- if audio_source == MUSIC_SRC_RADIO:
- if et_uri_md:
- self.channel = et_uri_md.title
-
- # Extra guards for S1 compatibility
- if ct_md and hasattr(ct_md, "radio_show") and ct_md.radio_show:
- radio_show = ct_md.radio_show.split(",")[0]
- self.channel = " • ".join(filter(None, [self.channel, radio_show]))
-
- if isinstance(et_uri_md, DidlAudioBroadcast):
- self.title = self.title or self.channel
-
- self.update_player()
-
- def _handle_rendering_control_event(self, event: SonosEvent) -> None:
- """Update information about currently volume settings."""
- variables = event.variables
-
- if "volume" in variables:
- volume = variables["volume"]
- self.mass_player.volume_level = int(volume["Master"])
-
- if mute := variables.get("mute"):
- self.mass_player.volume_muted = mute["Master"] == "1"
-
- if loudness := variables.get("loudness"):
- # TODO: handle this is a better way
- self.loudness = loudness["Master"] == "1"
- with contextlib.suppress(KeyError):
- self.mass.loop.call_soon_threadsafe(
- self.mass.config.set_raw_player_config_value,
- self.player_id,
- "sonos_loudness",
- loudness["Master"] == "1",
- )
-
- for int_var in (
- "bass",
- "treble",
- ):
- if int_var in variables:
- # TODO: handle this is a better way
- setattr(self, int_var, variables[int_var])
- with contextlib.suppress(KeyError):
- self.mass.loop.call_soon_threadsafe(
- self.mass.config.set_raw_player_config_value,
- self.player_id,
- f"sonos_{int_var}",
- variables[int_var],
- )
-
- self.update_player()
-
- def _handle_zone_group_topology_event(self, event: SonosEvent) -> None:
- """Handle callback for topology change event."""
- if "zone_player_uui_ds_in_group" not in event.variables:
- return
- asyncio.run_coroutine_threadsafe(self.create_update_groups_coro(event), self.mass.loop)
-
- async def _rebooted(self) -> None:
- """Handle a detected speaker reboot."""
- self.logger.debug("%s rebooted, reconnecting", self.zone_name)
- await self.offline()
- self._speaker_activity("reboot")
-
- def update_groups(self) -> None:
- """Update group topology when polling."""
- asyncio.run_coroutine_threadsafe(self.create_update_groups_coro(), self.mass.loop)
-
- def update_group_for_uid(self, uid: str) -> None:
- """Update group topology if uid is missing."""
- if uid not in self._group_members_missing:
- return
- missing_zone = self.sonos_prov.sonosplayers[uid].zone_name
- self.logger.debug("%s was missing, adding to %s group", missing_zone, self.zone_name)
- self.update_groups()
-
- def create_update_groups_coro(self, event: SonosEvent | None = None) -> Coroutine:
- """Handle callback for topology change event."""
-
- def _get_soco_group() -> list[str]:
- """Ask SoCo cache for existing topology."""
- coordinator_uid = self.soco.uid
- joined_uids = []
- with contextlib.suppress(OSError, SoCoException):
- if self.soco.group and self.soco.group.coordinator:
- coordinator_uid = self.soco.group.coordinator.uid
- joined_uids = [
- p.uid
- for p in self.soco.group.members
- if p.uid != coordinator_uid and p.is_visible
- ]
-
- return [coordinator_uid, *joined_uids]
-
- async def _extract_group(event: SonosEvent | None) -> list[str]:
- """Extract group layout from a topology event."""
- group = event and event.zone_player_uui_ds_in_group
- if group:
- assert isinstance(group, str)
- return group.split(",")
- return await self.mass.create_task(_get_soco_group)
-
- def _regroup(group: list[str]) -> None:
- """Rebuild internal group layout (async safe)."""
- if group == [self.soco.uid] and self.group_members == [self] and self.group_members_ids:
- # Skip updating existing single speakers in polling mode
- return
-
- group_members = []
- group_members_ids = []
-
- for uid in group:
- speaker = self.sonos_prov.sonosplayers.get(uid)
- if speaker:
- self._group_members_missing.discard(uid)
- group_members.append(speaker)
- group_members_ids.append(uid)
- else:
- self._group_members_missing.add(uid)
- self.logger.debug(
- "%s group member unavailable (%s), will try again",
- self.zone_name,
- uid,
- )
- return
-
- if self.group_members_ids == group_members_ids:
- # Useful in polling mode for speakers with stereo pairs or surrounds
- # as those "invisible" speakers will bypass the single speaker check
- return
-
- self.sync_coordinator = None
- self.group_members = group_members
- self.group_members_ids = group_members_ids
- self.mass.players.update(self.player_id)
-
- for joined_uid in group[1:]:
- joined_speaker: SonosPlayer = self.sonos_prov.sonosplayers.get(joined_uid)
- if joined_speaker:
- joined_speaker.sync_coordinator = self
- joined_speaker.group_members = group_members
- joined_speaker.group_members_ids = group_members_ids
- joined_speaker.update_player()
-
- self.logger.debug("Regrouped %s: %s", self.zone_name, self.group_members_ids)
- self.update_player()
-
- async def _handle_group_event(event: SonosEvent | None) -> None:
- """Get async lock and handle event."""
- async with self.sonos_prov.topology_condition:
- group = await _extract_group(event)
- if self.soco.uid == group[0]:
- _regroup(group)
- self.sonos_prov.topology_condition.notify_all()
-
- return _handle_group_event(event)
-
- async def wait_for_groups(self, groups: list[list[SonosPlayer]]) -> None:
- """Wait until all groups are present, or timeout."""
-
- def _test_groups(groups: list[list[SonosPlayer]]) -> bool:
- """Return whether all groups exist now."""
- for group in groups:
- coordinator = group[0]
-
- # Test that coordinator is coordinating
- current_group = coordinator.group_members
- if coordinator != current_group[0]:
- return False
-
- # Test that joined members match
- if set(group[1:]) != set(current_group[1:]):
- return False
-
- return True
-
- try:
- async with asyncio.timeout(5):
- while not _test_groups(groups):
- await self.sonos_prov.topology_condition.wait()
- except TimeoutError:
- self.logger.warning("Timeout waiting for target groups %s", groups)
-
- any_speaker = next(iter(self.sonos_prov.sonosplayers.values()))
- any_speaker.soco.zone_group_state.clear_cache()
-
- def _update_attributes(self) -> None:
- """Update attributes of the MA Player from SoCo state."""
- # generic attributes (player_info)
- self.mass_player.available = self.available
-
- if not self.available:
- self.mass_player.powered = False
- self.mass_player.state = PlayerState.IDLE
- self.mass_player.synced_to = None
- self.mass_player.group_childs = set()
- return
-
- # transport info (playback state)
- self.mass_player.state = current_state = _convert_state(self.playback_status)
-
- # power 'on' player if we detect its playing
- if not self.mass_player.powered and (
- current_state == PlayerState.PLAYING
- or (
- self.sync_coordinator
- and self.sync_coordinator.mass_player.state == PlayerState.PLAYING
- )
- ):
- self.mass_player.powered = True
-
- # media info (track info)
- self.mass_player.current_item_id = self.uri
- if self.uri and self.mass.streams.base_url in self.uri and self.player_id in self.uri:
- self.mass_player.active_source = self.player_id
- else:
- self.mass_player.active_source = self.source_name
- if self.position is not None and self.position_updated_at is not None:
- self.mass_player.elapsed_time = self.position
- self.mass_player.elapsed_time_last_updated = self.position_updated_at.timestamp()
-
- # zone topology (syncing/grouping) details
- self.mass_player.can_sync_with = tuple(
- x.player_id
- for x in self.sonos_prov.sonosplayers.values()
- if x.player_id != self.player_id
- )
- if self.sync_coordinator:
- # player is syned to another player
- self.mass_player.synced_to = self.sync_coordinator.player_id
- self.mass_player.group_childs = set()
- self.mass_player.active_source = self.sync_coordinator.mass_player.active_source
- elif len(self.group_members_ids) > 1:
- # this player is the sync leader in a group
- self.mass_player.synced_to = None
- self.mass_player.group_childs = set(self.group_members_ids)
- else:
- # standalone player, not synced
- self.mass_player.synced_to = None
- self.mass_player.group_childs = set()
-
- def _set_basic_track_info(self, update_position: bool = False) -> None:
- """Query the speaker to update media metadata and position info."""
- self.channel = None
- self.duration = None
- self.image_url = None
- self.source_name = None
- self.title = None
- self.uri = None
-
- try:
- track_info = self.poll_track_info()
- except SonosUpdateError as err:
- self.logger.warning("Fetching track info failed: %s", err)
- return
- if not track_info["uri"]:
- return
- self.uri = track_info["uri"]
-
- audio_source = self.soco.music_source_from_uri(self.uri)
- if source := SOURCE_MAPPING.get(audio_source):
- self.source_name = source
- if audio_source in LINEIN_SOURCES:
- self.position = None
- self.position_updated_at = None
- self.title = source
- return
-
- self.artist = track_info.get("artist")
- self.album_name = track_info.get("album")
- self.title = track_info.get("title")
- self.image_url = track_info.get("album_art")
-
- playlist_position = int(track_info.get("playlist_position", -1))
- if playlist_position > 0:
- self.queue_position = playlist_position
-
- self._update_media_position(track_info, force_update=update_position)
-
- def _update_media_position(
- self, position_info: dict[str, int], force_update: bool = False
- ) -> None:
- """Update state when playing music tracks."""
- duration = position_info.get(DURATION_SECONDS)
- current_position = position_info.get(POSITION_SECONDS)
-
- if not (duration or current_position):
- self.position = None
- self.position_updated_at = None
- return
-
- should_update = force_update
- self.duration = duration
-
- # player started reporting position?
- if current_position is not None and self.position is None:
- should_update = True
-
- # position jumped?
- if current_position is not None and self.position is not None:
- if self.playback_status == SONOS_STATE_PLAYING:
- assert self.position_updated_at is not None
- time_delta = utc() - self.position_updated_at
- time_diff = time_delta.total_seconds()
- else:
- time_diff = 0
-
- calculated_position = self.position + time_diff
-
- if abs(calculated_position - current_position) > 1.5:
- should_update = True
-
- if current_position is None:
- self.position = None
- self.position_updated_at = None
- elif should_update:
- self.position = current_position
- self.position_updated_at = utc()
-
- def _speaker_activity(self, source: str) -> None:
- """Track the last activity on this speaker, set availability and resubscribe."""
- if self._resub_cooldown_expires_at:
- if time.monotonic() < self._resub_cooldown_expires_at:
- self.logger.debug(
- "Activity on %s from %s while in cooldown, ignoring",
- self.zone_name,
- source,
- )
- return
- self._resub_cooldown_expires_at = None
-
- self.logger.log(VERBOSE_LOG_LEVEL, "Activity on %s from %s", self.zone_name, source)
- self._last_activity = time.monotonic()
- was_available = self.available
- self.available = True
- if not was_available:
- self.update_player()
- self.mass.loop.call_soon_threadsafe(self.mass.create_task, self.subscribe())
-
- @soco_error()
- def _join(self, members: list[SonosPlayer]) -> list[SonosPlayer]:
- if self.sync_coordinator:
- self.unjoin()
- group = [self]
- else:
- group = self.group_members.copy()
-
- for player in members:
- if player.soco.uid != self.soco.uid and player not in group:
- player.soco.join(self.soco)
- player.sync_coordinator = self
- group.append(player)
-
- return group
-
- @soco_error()
- def _unjoin(self) -> None:
- if self.group_members == [self]:
- return
- self.soco.unjoin()
- self.sync_coordinator = None
-
-
-def _convert_state(sonos_state: str) -> PlayerState:
- """Convert Sonos state to PlayerState."""
- if sonos_state == "PLAYING":
- return PlayerState.PLAYING
- if sonos_state == "TRANSITIONING":
- return PlayerState.PLAYING
- if sonos_state == "PAUSED_PLAYBACK":
- return PlayerState.PAUSED
- return PlayerState.IDLE
-
-
-def _timespan_secs(timespan):
- """Parse a time-span into number of seconds."""
- if timespan in ("", "NOT_IMPLEMENTED", None):
- return None
- return sum(60 ** x[0] * int(x[1]) for x in enumerate(reversed(timespan.split(":"))))
--- /dev/null
+"""
+Sonos Player S1 provider for Music Assistant.
+
+Based on the SoCo library for Sonos which uses the legacy/V1 UPnP API.
+
+Note that large parts of this code are copied over from the Home Assistant
+integration for Sonos.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from collections import OrderedDict
+from dataclasses import dataclass, field
+from typing import TYPE_CHECKING
+
+from requests.exceptions import RequestException
+from soco import config as soco_config
+from soco import events_asyncio, zonegroupstate
+from soco.discovery import discover, scan_network
+
+from music_assistant.common.models.config_entries import (
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_ENFORCE_MP3_DEFAULT_ENABLED,
+ ConfigEntry,
+ ConfigValueType,
+ create_sample_rates_config_entry,
+)
+from music_assistant.common.models.enums import (
+ ConfigEntryType,
+ PlayerFeature,
+ PlayerType,
+ ProviderFeature,
+)
+from music_assistant.common.models.errors import PlayerCommandFailed, PlayerUnavailableError
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
+from music_assistant.constants import CONF_CROSSFADE, CONF_ENFORCE_MP3, VERBOSE_LOG_LEVEL
+from music_assistant.server.helpers.didl_lite import create_didl_metadata
+from music_assistant.server.models.player_provider import PlayerProvider
+
+from .player import SonosPlayer
+
+if TYPE_CHECKING:
+ from soco.core import SoCo
+
+ from music_assistant.common.models.config_entries import ProviderConfig
+ from music_assistant.common.models.provider import ProviderManifest
+ from music_assistant.server import MusicAssistant
+ from music_assistant.server.models import ProviderInstanceType
+
+
+PLAYER_FEATURES = (
+ PlayerFeature.SYNC,
+ PlayerFeature.VOLUME_MUTE,
+ PlayerFeature.ENQUEUE_NEXT,
+ PlayerFeature.PAUSE,
+)
+
+CONF_NETWORK_SCAN = "network_scan"
+CONF_HOUSEHOLD_ID = "household_id"
+SUBSCRIPTION_TIMEOUT = 1200
+ZGS_SUBSCRIPTION_TIMEOUT = 2
+
+CONF_ENTRY_SAMPLE_RATES = create_sample_rates_config_entry(48000, 16, 48000, 16, True)
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ soco_config.EVENTS_MODULE = events_asyncio
+ soco_config.REQUEST_TIMEOUT = 9.5
+ zonegroupstate.EVENT_CACHE_TIMEOUT = SUBSCRIPTION_TIMEOUT
+ prov = SonosPlayerProvider(mass, manifest, config)
+ # set-up soco logging
+ if prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
+ logging.getLogger("soco").setLevel(logging.DEBUG)
+ else:
+ logging.getLogger("soco").setLevel(prov.logger.level + 10)
+ await prov.handle_async_init()
+ return prov
+
+
+async def get_config_entries(
+ mass: MusicAssistant,
+ instance_id: str | None = None,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to setup this provider.
+
+ instance_id: id of an existing provider instance (None if new instance setup).
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ # ruff: noqa: ARG001
+ household_ids = await discover_household_ids(mass)
+ return (
+ ConfigEntry(
+ key=CONF_NETWORK_SCAN,
+ type=ConfigEntryType.BOOLEAN,
+ label="Enable network scan for discovery",
+ default_value=False,
+ description="Enable network scan for discovery of players. \n"
+ "Can be used if (some of) your players are not automatically discovered.\n"
+ "Should normally not be needed",
+ ),
+ ConfigEntry(
+ key=CONF_HOUSEHOLD_ID,
+ type=ConfigEntryType.STRING,
+ label="Household ID",
+ default_value=household_ids[0] if household_ids else None,
+ description="Household ID for the Sonos (S1) system. Will be auto detected if empty.",
+ category="advanced",
+ required=False,
+ ),
+ )
+
+
+@dataclass
+class UnjoinData:
+ """Class to track data necessary for unjoin coalescing."""
+
+ players: list[SonosPlayer]
+ event: asyncio.Event = field(default_factory=asyncio.Event)
+
+
+class SonosPlayerProvider(PlayerProvider):
+ """Sonos Player provider."""
+
+ sonosplayers: dict[str, SonosPlayer] | None = None
+ _discovery_running: bool = False
+ _discovery_reschedule_timer: asyncio.TimerHandle | None = None
+
+ @property
+ def supported_features(self) -> tuple[ProviderFeature, ...]:
+ """Return the features supported by this Provider."""
+ return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE)
+
+ async def handle_async_init(self) -> None:
+ """Handle async initialization of the provider."""
+ self.sonosplayers: OrderedDict[str, SonosPlayer] = OrderedDict()
+ self.topology_condition = asyncio.Condition()
+ self.boot_counts: dict[str, int] = {}
+ self.mdns_names: dict[str, str] = {}
+ self.unjoin_data: dict[str, UnjoinData] = {}
+ self._discovery_running = False
+ self.hosts_in_error: dict[str, bool] = {}
+ self.discovery_lock = asyncio.Lock()
+ self.creation_lock = asyncio.Lock()
+ self._known_invisible: set[SoCo] = set()
+
+ async def loaded_in_mass(self) -> None:
+ """Call after the provider has been loaded."""
+ await self._run_discovery()
+
+ async def unload(self) -> None:
+ """Handle close/cleanup of the provider."""
+ if self._discovery_reschedule_timer:
+ self._discovery_reschedule_timer.cancel()
+ self._discovery_reschedule_timer = None
+ # await any in-progress discovery
+ while self._discovery_running:
+ await asyncio.sleep(0.5)
+ await asyncio.gather(*(player.offline() for player in self.sonosplayers.values()))
+ if events_asyncio.event_listener:
+ await events_asyncio.event_listener.async_stop()
+ self.sonosplayers = None
+
+ async def get_player_config_entries(
+ self,
+ player_id: str,
+ ) -> tuple[ConfigEntry, ...]:
+ """Return Config Entries for the given player."""
+ base_entries = await super().get_player_config_entries(player_id)
+ if not (self.sonosplayers.get(player_id)):
+ # most probably a syncgroup
+ return (*base_entries, CONF_ENTRY_CROSSFADE, CONF_ENTRY_ENFORCE_MP3_DEFAULT_ENABLED)
+ return (
+ *base_entries,
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_SAMPLE_RATES,
+ CONF_ENTRY_ENFORCE_MP3_DEFAULT_ENABLED,
+ )
+
+ def is_device_invisible(self, ip_address: str) -> bool:
+ """Check if device at provided IP is known to be invisible."""
+ return any(x for x in self._known_invisible if x.ip_address == ip_address)
+
+ async def cmd_stop(self, player_id: str) -> None:
+ """Send STOP command to given player."""
+ sonos_player = self.sonosplayers[player_id]
+ if sonos_player.sync_coordinator:
+ self.logger.debug(
+ "Ignore STOP command for %s: Player is synced to another player.",
+ player_id,
+ )
+ return
+ await asyncio.to_thread(sonos_player.soco.stop)
+ self.mass.call_later(2, sonos_player.poll_speaker)
+
+ async def cmd_play(self, player_id: str) -> None:
+ """Send PLAY command to given player."""
+ sonos_player = self.sonosplayers[player_id]
+ if sonos_player.sync_coordinator:
+ self.logger.debug(
+ "Ignore PLAY command for %s: Player is synced to another player.",
+ player_id,
+ )
+ return
+ await asyncio.to_thread(sonos_player.soco.play)
+ self.mass.call_later(2, sonos_player.poll_speaker)
+
+ async def cmd_pause(self, player_id: str) -> None:
+ """Send PAUSE command to given player."""
+ sonos_player = self.sonosplayers[player_id]
+ if sonos_player.sync_coordinator:
+ self.logger.debug(
+ "Ignore PLAY command for %s: Player is synced to another player.",
+ player_id,
+ )
+ return
+ if "Pause" not in sonos_player.soco.available_actions:
+ # pause not possible
+ await self.cmd_stop(player_id)
+ return
+ await asyncio.to_thread(sonos_player.soco.pause)
+ self.mass.call_later(2, sonos_player.poll_speaker)
+
+ async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
+ """Send VOLUME_SET command to given player."""
+ sonos_player = self.sonosplayers[player_id]
+
+ def set_volume_level(player_id: str, volume_level: int) -> None:
+ sonos_player.soco.volume = volume_level
+
+ await asyncio.to_thread(set_volume_level, player_id, volume_level)
+ self.mass.call_later(2, sonos_player.poll_speaker)
+
+ async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
+ """Send VOLUME MUTE command to given player."""
+
+ def set_volume_mute(player_id: str, muted: bool) -> None:
+ sonos_player = self.sonosplayers[player_id]
+ sonos_player.soco.mute = muted
+
+ await asyncio.to_thread(set_volume_mute, player_id, muted)
+
+ async def cmd_sync(self, player_id: str, target_player: str) -> None:
+ """Handle SYNC command for given player.
+
+ Join/add the given player(id) to the given (master) player/sync group.
+
+ - player_id: player_id of the player to handle the command.
+ - target_player: player_id of the syncgroup master or group player.
+ """
+ sonos_player = self.sonosplayers[player_id]
+ sonos_master_player = self.sonosplayers[target_player]
+ await sonos_master_player.join([sonos_player])
+ self.mass.call_later(2, sonos_player.poll_speaker)
+
+ async def cmd_unsync(self, player_id: str) -> None:
+ """Handle UNSYNC command for given player.
+
+ Remove the given player from any syncgroups it currently is synced to.
+
+ - player_id: player_id of the player to handle the command.
+ """
+ sonos_player = self.sonosplayers[player_id]
+ await sonos_player.unjoin()
+ self.mass.call_later(2, sonos_player.poll_speaker)
+
+ async def play_media(
+ self,
+ player_id: str,
+ media: PlayerMedia,
+ ) -> None:
+ """Handle PLAY MEDIA on given player."""
+ sonos_player = self.sonosplayers[player_id]
+ mass_player = self.mass.players.get(player_id)
+ if sonos_player.sync_coordinator:
+ # this should be already handled by the player manager, but just in case...
+ msg = (
+ f"Player {mass_player.display_name} can not "
+ "accept play_media command, it is synced to another player."
+ )
+ raise PlayerCommandFailed(msg)
+ if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, True):
+ media.uri = media.uri.replace(".flac", ".mp3")
+ didl_metadata = create_didl_metadata(media)
+ await asyncio.to_thread(sonos_player.soco.play_uri, media.uri, meta=didl_metadata)
+ self.mass.call_later(2, sonos_player.poll_speaker)
+
+ async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
+ """Handle enqueuing of the next queue item on the player."""
+ sonos_player = self.sonosplayers[player_id]
+ if self.mass.config.get_raw_player_config_value(player_id, CONF_ENFORCE_MP3, True):
+ media.uri = media.uri.replace(".flac", ".mp3")
+ didl_metadata = create_didl_metadata(media)
+ # set crossfade according to player setting
+ crossfade = await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
+ if sonos_player.crossfade != crossfade:
+
+ def set_crossfade() -> None:
+ try:
+ sonos_player.soco.cross_fade = crossfade
+ sonos_player.crossfade = crossfade
+ except Exception as err:
+ self.logger.warning(
+ "Unable to set crossfade for player %s: %s", sonos_player.zone_name, err
+ )
+
+ await asyncio.to_thread(set_crossfade)
+
+ try:
+ await asyncio.to_thread(
+ sonos_player.soco.avTransport.SetNextAVTransportURI,
+ [("InstanceID", 0), ("NextURI", media.uri), ("NextURIMetaData", didl_metadata)],
+ timeout=60,
+ )
+ except Exception as err:
+ self.logger.warning(
+ "Unable to enqueue next track on player: %s: %s", sonos_player.zone_name, err
+ )
+ else:
+ self.logger.debug(
+ "Enqued next track (%s) to player %s",
+ media.title or media.uri,
+ sonos_player.soco.player_name,
+ )
+
+ async def poll_player(self, player_id: str) -> None:
+ """Poll player for state updates."""
+ if player_id not in self.sonosplayers:
+ return
+ sonos_player = self.sonosplayers[player_id]
+ try:
+ # the check_poll logic will work out what endpoints need polling now
+ # based on when we last received info from the device
+ if needs_poll := await sonos_player.check_poll():
+ await sonos_player.poll_speaker()
+ # always update the attributes
+ sonos_player.update_player(signal_update=needs_poll)
+ except ConnectionResetError as err:
+ raise PlayerUnavailableError from err
+
+ async def _run_discovery(self) -> None:
+ """Discover Sonos players on the network."""
+ if self._discovery_running:
+ return
+
+ allow_network_scan = self.config.get_value(CONF_NETWORK_SCAN)
+ if not (household_id := self.config.get_value(CONF_HOUSEHOLD_ID)):
+ household_id = "Sonos"
+
+ def do_discover() -> None:
+ """Run discovery and add players in executor thread."""
+ self._discovery_running = True
+ try:
+ self.logger.debug("Sonos discovery started...")
+ discovered_devices: set[SoCo] = discover(
+ timeout=30, household_id=household_id, allow_network_scan=allow_network_scan
+ )
+ if discovered_devices is None:
+ discovered_devices = set()
+ # process new players
+ for soco in discovered_devices:
+ try:
+ self._add_player(soco)
+ except RequestException as err:
+ # player is offline
+ self.logger.debug("Failed to add SonosPlayer %s: %s", soco, err)
+ except Exception as err:
+ self.logger.warning(
+ "Failed to add SonosPlayer %s: %s",
+ soco,
+ err,
+ exc_info=err if self.logger.isEnabledFor(10) else None,
+ )
+ finally:
+ self._discovery_running = False
+
+ await self.mass.create_task(do_discover)
+
+ def reschedule() -> None:
+ self._discovery_reschedule_timer = None
+ self.mass.create_task(self._run_discovery())
+
+ # reschedule self once finished
+ self._discovery_reschedule_timer = self.mass.loop.call_later(1800, reschedule)
+
+ def _add_player(self, soco: SoCo) -> None:
+ """Add discovered Sonos player."""
+ player_id = soco.uid
+ # check if existing player changed IP
+ if existing := self.sonosplayers.get(player_id):
+ if existing.soco.ip_address != soco.ip_address:
+ existing.update_ip(soco.ip_address)
+ return
+ if not soco.is_visible:
+ return
+ enabled = self.mass.config.get_raw_player_config_value(player_id, "enabled", True)
+ if not enabled:
+ self.logger.debug("Ignoring disabled player: %s", player_id)
+ return
+
+ speaker_info = soco.get_speaker_info(True, timeout=7)
+ if soco.uid not in self.boot_counts:
+ self.boot_counts[soco.uid] = soco.boot_seqnum
+ self.logger.debug("Adding new player: %s", speaker_info)
+ transport_info = soco.get_current_transport_info()
+ play_state = transport_info["current_transport_state"]
+ if not (mass_player := self.mass.players.get(soco.uid)):
+ mass_player = Player(
+ player_id=soco.uid,
+ provider=self.instance_id,
+ type=PlayerType.PLAYER,
+ name=soco.player_name,
+ available=True,
+ powered=play_state in ("PLAYING", "TRANSITIONING"),
+ supported_features=PLAYER_FEATURES,
+ device_info=DeviceInfo(
+ model=speaker_info["model_name"],
+ address=soco.ip_address,
+ manufacturer="SONOS",
+ ),
+ needs_poll=True,
+ poll_interval=30,
+ )
+ self.sonosplayers[player_id] = sonos_player = SonosPlayer(
+ self,
+ soco=soco,
+ mass_player=mass_player,
+ )
+ if not soco.fixed_volume:
+ mass_player.supported_features = (
+ *mass_player.supported_features,
+ PlayerFeature.VOLUME_SET,
+ )
+
+ self.mass.loop.call_soon_threadsafe(
+ self.mass.players.register_or_update, sonos_player.mass_player
+ )
+
+
+async def discover_household_ids(mass: MusicAssistant, prefer_s1: bool = True) -> list[str]:
+ """Discover the HouseHold ID of S1 speaker(s) the network."""
+ if cache := await mass.cache.get("sonos_household_ids"):
+ return cache
+ household_ids: list[str] = []
+
+ def get_all_sonos_ips() -> set[SoCo]:
+ """Run full network discovery and return IP's of all devices found on the network."""
+ discovered_zones: set[SoCo] | None
+ if discovered_zones := scan_network(multi_household=True):
+ return {zone.ip_address for zone in discovered_zones}
+ return set()
+
+ all_sonos_ips = await asyncio.to_thread(get_all_sonos_ips)
+ for ip_address in all_sonos_ips:
+ async with mass.http_session.get(f"http://{ip_address}:1400/status/zp") as resp:
+ if resp.status == 200:
+ data = await resp.text()
+ if prefer_s1 and "<SWGen>2</SWGen>" in data:
+ continue
+ if "HouseholdControlID" in data:
+ household_id = data.split("<HouseholdControlID>")[1].split(
+ "</HouseholdControlID>"
+ )[0]
+ household_ids.append(household_id)
+ await mass.cache.set("sonos_household_ids", household_ids, 3600)
+ return household_ids
--- /dev/null
+"""Helper methods for common tasks."""
+
+from __future__ import annotations
+
+import logging
+from collections.abc import Callable
+from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, overload
+
+from soco import SoCo
+from soco.exceptions import SoCoException, SoCoUPnPException
+
+from music_assistant.common.models.errors import PlayerCommandFailed
+
+if TYPE_CHECKING:
+ from . import SonosPlayer
+
+
+UID_PREFIX = "RINCON_"
+UID_POSTFIX = "01400"
+
+_LOGGER = logging.getLogger(__name__)
+
+_T = TypeVar("_T", bound="SonosPlayer")
+_R = TypeVar("_R")
+_P = ParamSpec("_P")
+
+_FuncType = Callable[Concatenate[_T, _P], _R]
+_ReturnFuncType = Callable[Concatenate[_T, _P], _R | None]
+
+
+class SonosUpdateError(PlayerCommandFailed):
+ """Update failed."""
+
+
+@overload
+def soco_error(
+ errorcodes: None = ...,
+) -> Callable[[_FuncType[_T, _P, _R]], _FuncType[_T, _P, _R]]: ...
+
+
+@overload
+def soco_error(
+ errorcodes: list[str],
+) -> Callable[[_FuncType[_T, _P, _R]], _ReturnFuncType[_T, _P, _R]]: ...
+
+
+def soco_error(
+ errorcodes: list[str] | None = None,
+) -> Callable[[_FuncType[_T, _P, _R]], _ReturnFuncType[_T, _P, _R]]:
+ """Filter out specified UPnP errors and raise exceptions for service calls."""
+
+ def decorator(funct: _FuncType[_T, _P, _R]) -> _ReturnFuncType[_T, _P, _R]:
+ """Decorate functions."""
+
+ def wrapper(self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R | None:
+ """Wrap for all soco UPnP exception."""
+ args_soco = next((arg for arg in args if isinstance(arg, SoCo)), None)
+ try:
+ result = funct(self, *args, **kwargs)
+ except (OSError, SoCoException, SoCoUPnPException, TimeoutError) as err:
+ error_code = getattr(err, "error_code", None)
+ function = funct.__qualname__
+ if errorcodes and error_code in errorcodes:
+ _LOGGER.debug("Error code %s ignored in call to %s", error_code, function)
+ return None
+
+ if (target := _find_target_identifier(self, args_soco)) is None:
+ msg = "Unexpected use of soco_error"
+ raise RuntimeError(msg) from err
+
+ message = f"Error calling {function} on {target}: {err}"
+ raise SonosUpdateError(message) from err
+
+ return result
+
+ return wrapper
+
+ return decorator
+
+
+def _find_target_identifier(instance: Any, fallback_soco: SoCo | None) -> str | None:
+ """Extract the best available target identifier from the provided instance object."""
+ if zone_name := getattr(instance, "zone_name", None):
+ # SonosPlayer instance
+ return zone_name
+ if soco := getattr(instance, "soco", fallback_soco):
+ # Holds a SoCo instance attribute
+ # Only use attributes with no I/O
+ return soco._player_name or soco.ip_address # pylint: disable=protected-access
+ return None
+
+
+def hostname_to_uid(hostname: str) -> str:
+ """Convert a Sonos hostname to a uid."""
+ if hostname.startswith("Sonos-"):
+ baseuid = hostname.removeprefix("Sonos-").replace(".local.", "")
+ elif hostname.startswith("sonos"):
+ baseuid = hostname.removeprefix("sonos").replace(".local.", "")
+ else:
+ msg = f"{hostname} is not a sonos device."
+ raise ValueError(msg)
+ return f"{UID_PREFIX}{baseuid}{UID_POSTFIX}"
+
+
+def sync_get_visible_zones(soco: SoCo) -> set[SoCo]:
+ """Ensure I/O attributes are cached and return visible zones."""
+ _ = soco.household_id
+ _ = soco.uid
+ return soco.visible_zones
--- /dev/null
+<svg clip-rule="evenodd" fill-rule="evenodd" stroke-linejoin="round" stroke-miterlimit="2" version="1.1" viewBox="6.4552e-5 0 500 500" xml:space="preserve" xmlns="http://www.w3.org/2000/svg">
+ <g transform="matrix(.48835 0 0 .48835 -880 0)">
+ <path d="m2595.5 0h-563.12c-127.23 0-230.37 103.14-230.37 230.37v563.12c0 127.23 103.14 230.37 230.37 230.37h563.12c127.23 0 230.37-103.14 230.37-230.37v-563.12c0-127.23-103.14-230.37-230.37-230.37z" fill="#fff" fill-rule="nonzero"/>
+ </g>
+ <g transform="matrix(.48835 0 0 .48835 -880 0)">
+ <path d="m2595.5 0h-563.12c-127.23 0-230.37 103.14-230.37 230.37v563.12c0 127.23 103.14 230.37 230.37 230.37h563.12c127.23 0 230.37-103.14 230.37-230.37v-563.12c0-127.23-103.14-230.37-230.37-230.37z" fill="#d8a158" fill-rule="nonzero"/>
+ </g>
+ <g transform="matrix(.48835 0 0 .48835 .73081 .53181)">
+ <path d="m319.09 429.94c-44.956 0-81.54 36.19-81.54 80.68 0 44.478 36.584 80.67 81.54 80.67 44.952 0 81.526-36.192 81.526-80.67 0-44.49-36.574-80.68-81.526-80.68zm2e-3 132.37c-28.512 0-51.694-23.182-51.694-51.68 0-28.502 23.182-51.684 51.694-51.684 28.508 0 51.702 23.182 51.702 51.684 0 28.498-23.194 51.68-51.702 51.68zm225.94-39.882-98.286-90.664v153.72h29.122v-85.834l98.29 90.528v-154.11h-29.126v86.362zm-390.15-25.794c15.172 4.764 25.351 9.486 33.084 15.324 11.525 8.692 17.368 19.834 17.368 33.072 0 12.342-5.771 24.376-15.84 33.026-10.114 8.664-23.991 13.452-39.089 13.452-31.416 0-49.471-20.83-50.217-21.704l-2.568-3.008 23.489-15.692 2.038 1.502c3.78 2.804 14.024 9.326 27.258 9.326 15.673 0 25.047-8.588 25.047-16.902 0-3.54 0-10.904-29.522-20.188-15.179-4.77-25.351-9.488-33.085-15.326-11.529-8.692-17.382-19.824-17.382-33.072 0-12.344 5.776-24.378 15.849-33.026 10.106-8.676 23.983-13.454 39.084-13.454 31.413 0 49.468 20.816 50.228 21.706l2.554 3.01-23.474 15.692-2.038-1.508c-3.797-2.808-14.063-9.33-27.27-9.33-15.673 0-25.04 8.6-25.04 16.91 0 3.542 0 10.902 29.526 20.19zm546.92-66.094c-44.946 0-81.52 36.192-81.52 80.676s36.574 80.674 81.52 80.674c44.968 0 81.542-36.19 81.542-80.674s-36.574-80.676-81.542-80.676zm-4e-3 132.36c-28.506 0-51.688-23.19-51.688-51.684 0-28.492 23.182-51.674 51.688-51.674s51.7 23.182 51.7 51.674c0 28.494-23.194 51.684-51.7 51.684zm164.22-37.692c-15.184-4.778-25.354-9.496-33.09-15.334-11.526-8.698-17.354-19.826-17.354-33.076 0-12.33 5.764-24.368 15.832-33.024 10.112-8.676 23.988-13.454 39.088-13.454 31.41 0 49.47 20.828 50.214 21.706l2.562 3.012-23.478 15.7-2.04-1.512c-3.792-2.8-14.02-9.324-27.258-9.324-15.68 0-25.052 8.588-25.052 16.896 0 3.544 0 10.906 29.528 20.192 15.182 4.762 25.356 9.486 33.088 15.324 11.526 8.694 17.364 19.832 17.364 33.072 0 12.348-5.77 24.38-15.842 33.024-10.104 8.68-23.98 13.458-39.086 13.458-31.412 0-49.462-20.82-50.214-21.706l-2.566-3.016 23.482-15.682 2.038 1.498c3.792 2.808 14.058 9.332 27.26 9.332 15.678 0 25.05-8.596 25.05-16.908 0-3.54 0-10.906-29.526-20.178z"/>
+ </g>
+</svg>
--- /dev/null
+{
+ "type": "player",
+ "domain": "sonos_s1",
+ "name": "SONOS S1",
+ "description": "SONOS Player provider for Music Assistant for the S1 hardware, based on the Soco library. Select this provider if you have Sonos devices on the S1 operating system (with the S1 Controller app)",
+ "codeowners": [
+ "@music-assistant"
+ ],
+ "requirements": [
+ "soco==0.30.4",
+ "defusedxml==0.7.1"
+ ],
+ "documentation": "https://music-assistant.io/player-support/sonos/",
+ "multi_instance": false,
+ "builtin": false
+}
--- /dev/null
+"""
+Sonos Player provider for Music Assistant: SonosPlayer object/model.
+
+Note that large parts of this code are copied over from the Home Assistant
+integration for Sonos.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import contextlib
+import datetime
+import logging
+import time
+from collections.abc import Callable, Coroutine
+from typing import TYPE_CHECKING, Any
+
+from soco import SoCoException
+from soco.core import (
+ MUSIC_SRC_AIRPLAY,
+ MUSIC_SRC_LINE_IN,
+ MUSIC_SRC_RADIO,
+ MUSIC_SRC_SPOTIFY_CONNECT,
+ MUSIC_SRC_TV,
+ SoCo,
+)
+from soco.data_structures import DidlAudioBroadcast, DidlPlaylistContainer
+from sonos_websocket import SonosWebsocket
+
+from music_assistant.common.helpers.datetime import utc
+from music_assistant.common.models.enums import PlayerFeature, PlayerState
+from music_assistant.common.models.errors import PlayerCommandFailed
+from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.constants import VERBOSE_LOG_LEVEL
+
+from .helpers import SonosUpdateError, soco_error
+
+if TYPE_CHECKING:
+ from soco.events_base import Event as SonosEvent
+ from soco.events_base import SubscriptionBase
+
+ from . import SonosPlayerProvider
+
+CALLBACK_TYPE = Callable[[], None]
+LOGGER = logging.getLogger(__name__)
+
+PLAYER_FEATURES = (
+ PlayerFeature.SYNC,
+ PlayerFeature.VOLUME_MUTE,
+ PlayerFeature.VOLUME_SET,
+ PlayerFeature.ENQUEUE_NEXT,
+)
+DURATION_SECONDS = "duration_in_s"
+POSITION_SECONDS = "position_in_s"
+SUBSCRIPTION_TIMEOUT = 1200
+ZGS_SUBSCRIPTION_TIMEOUT = 2
+AVAILABILITY_CHECK_INTERVAL = datetime.timedelta(minutes=1)
+AVAILABILITY_TIMEOUT = AVAILABILITY_CHECK_INTERVAL.total_seconds() * 4.5
+SONOS_STATE_PLAYING = "PLAYING"
+SONOS_STATE_TRANSITIONING = "TRANSITIONING"
+NEVER_TIME = -1200.0
+RESUB_COOLDOWN_SECONDS = 10.0
+SUBSCRIPTION_SERVICES = {
+ # "alarmClock",
+ "avTransport",
+ # "contentDirectory",
+ "deviceProperties",
+ "renderingControl",
+ "zoneGroupTopology",
+}
+SUPPORTED_VANISH_REASONS = ("powered off", "sleeping", "switch to bluetooth", "upgrade")
+UNUSED_DEVICE_KEYS = ["SPID", "TargetRoomName"]
+LINEIN_SOURCES = (MUSIC_SRC_TV, MUSIC_SRC_LINE_IN)
+SOURCE_AIRPLAY = "AirPlay"
+SOURCE_LINEIN = "Line-in"
+SOURCE_SPOTIFY_CONNECT = "Spotify Connect"
+SOURCE_TV = "TV"
+SOURCE_MAPPING = {
+ MUSIC_SRC_AIRPLAY: SOURCE_AIRPLAY,
+ MUSIC_SRC_TV: SOURCE_TV,
+ MUSIC_SRC_LINE_IN: SOURCE_LINEIN,
+ MUSIC_SRC_SPOTIFY_CONNECT: SOURCE_SPOTIFY_CONNECT,
+}
+
+
+class SonosSubscriptionsFailed(PlayerCommandFailed):
+ """Subscription creation failed."""
+
+
+class SonosPlayer:
+ """Wrapper around Sonos/SoCo with some additional attributes."""
+
+ def __init__(
+ self,
+ sonos_prov: SonosPlayerProvider,
+ soco: SoCo,
+ mass_player: Player,
+ ) -> None:
+ """Initialize SonosPlayer instance."""
+ self.sonos_prov = sonos_prov
+ self.mass = sonos_prov.mass
+ self.player_id = soco.uid
+ self.soco = soco
+ self.logger = sonos_prov.logger
+ self.household_id: str = soco.household_id
+ self.subscriptions: list[SubscriptionBase] = []
+ self.websocket: SonosWebsocket | None = None
+ self.mass_player: Player = mass_player
+ self.available: bool = True
+ # cached attributes
+ self.crossfade: bool = False
+ self.play_mode: str | None = None
+ self.playback_status: str | None = None
+ self.channel: str | None = None
+ self.duration: float | None = None
+ self.image_url: str | None = None
+ self.source_name: str | None = None
+ self.title: str | None = None
+ self.uri: str | None = None
+ self.position: int | None = None
+ self.position_updated_at: datetime.datetime | None = None
+ self.loudness: bool = False
+ self.bass: int = 0
+ self.treble: int = 0
+ # Subscriptions and events
+ self._subscriptions: list[SubscriptionBase] = []
+ self._subscription_lock: asyncio.Lock | None = None
+ self._last_activity: float = NEVER_TIME
+ self._resub_cooldown_expires_at: float | None = None
+ # Grouping
+ self.sync_coordinator: SonosPlayer | None = None
+ self.group_members: list[SonosPlayer] = [self]
+ self.group_members_ids: list[str] = []
+ self._group_members_missing: set[str] = set()
+
+ def __hash__(self) -> int:
+ """Return a hash of self."""
+ return hash(self.player_id)
+
+ @property
+ def zone_name(self) -> str:
+ """Return zone name."""
+ if self.mass_player:
+ return self.mass_player.display_name
+ return self.soco.speaker_info["zone_name"]
+
+ @property
+ def subscription_address(self) -> str:
+ """Return the current subscription callback address."""
+ assert len(self._subscriptions) > 0
+ addr, port = self._subscriptions[0].event_listener.address
+ return ":".join([addr, str(port)])
+
+ @property
+ def missing_subscriptions(self) -> set[str]:
+ """Return a list of missing service subscriptions."""
+ subscribed_services = {sub.service.service_type for sub in self._subscriptions}
+ return SUBSCRIPTION_SERVICES - subscribed_services
+
+ @property
+ def should_poll(self) -> bool:
+ """Return if this player should be polled/pinged."""
+ if not self.available:
+ return True
+ return (time.monotonic() - self._last_activity) > self.mass_player.poll_interval
+
+ def setup(self) -> None:
+ """Run initial setup of the speaker (NOT async friendly)."""
+ if self.soco.is_coordinator:
+ self.crossfade = self.soco.cross_fade
+ self.mass_player.volume_level = self.soco.volume
+ self.mass_player.volume_muted = self.soco.mute
+ self.loudness = self.soco.loudness
+ self.bass = self.soco.bass
+ self.treble = self.soco.treble
+ self.update_groups()
+ if not self.sync_coordinator:
+ self.poll_media()
+
+ async def do_async_setup() -> None:
+ """Complete setup in async context."""
+ self.websocket = SonosWebsocket(
+ self.soco.ip_address,
+ player_id=self.soco.uid,
+ session=self.mass.http_session,
+ )
+
+ future = asyncio.run_coroutine_threadsafe(do_async_setup(), self.mass.loop)
+ future.result(timeout=10)
+ asyncio.run_coroutine_threadsafe(self.subscribe(), self.mass.loop)
+
+ async def offline(self) -> None:
+ """Handle removal of speaker when unavailable."""
+ if not self.available:
+ return
+
+ if self._resub_cooldown_expires_at is None and not self.mass.closing:
+ self._resub_cooldown_expires_at = time.monotonic() + RESUB_COOLDOWN_SECONDS
+ self.logger.debug("Starting resubscription cooldown for %s", self.zone_name)
+
+ self.available = False
+ self.mass_player.available = False
+ self.mass.players.update(self.player_id)
+ self._share_link_plugin = None
+
+ await self.unsubscribe()
+
+ def log_subscription_result(self, result: Any, event: str, level: int = logging.DEBUG) -> None:
+ """Log a message if a subscription action (create/renew/stop) results in an exception."""
+ if not isinstance(result, Exception):
+ return
+
+ if isinstance(result, asyncio.exceptions.TimeoutError):
+ message = "Request timed out"
+ exc_info = None
+ else:
+ message = str(result)
+ exc_info = result if not str(result) else None
+
+ self.logger.log(
+ level,
+ "%s failed for %s: %s",
+ event,
+ self.zone_name,
+ message,
+ exc_info=exc_info if self.logger.isEnabledFor(10) else None,
+ )
+
+ async def subscribe(self) -> None:
+ """Initiate event subscriptions under an async lock."""
+ if not self._subscription_lock:
+ self._subscription_lock = asyncio.Lock()
+
+ async with self._subscription_lock:
+ try:
+ # Create event subscriptions.
+ subscriptions = [
+ self._subscribe_target(getattr(self.soco, service), self._handle_event)
+ for service in self.missing_subscriptions
+ ]
+ if not subscriptions:
+ return
+ self.logger.log(VERBOSE_LOG_LEVEL, "Creating subscriptions for %s", self.zone_name)
+ results = await asyncio.gather(*subscriptions, return_exceptions=True)
+ for result in results:
+ self.log_subscription_result(result, "Creating subscription", logging.WARNING)
+ if any(isinstance(result, Exception) for result in results):
+ raise SonosSubscriptionsFailed
+ except SonosSubscriptionsFailed:
+ self.logger.warning("Creating subscriptions failed for %s", self.zone_name)
+ assert self._subscription_lock is not None
+ async with self._subscription_lock:
+ await self.offline()
+
+ async def unsubscribe(self) -> None:
+ """Cancel all subscriptions."""
+ if not self._subscriptions:
+ return
+ self.logger.log(VERBOSE_LOG_LEVEL, "Unsubscribing from events for %s", self.zone_name)
+ results = await asyncio.gather(
+ *(subscription.unsubscribe() for subscription in self._subscriptions),
+ return_exceptions=True,
+ )
+ for result in results:
+ self.log_subscription_result(result, "Unsubscribe")
+ self._subscriptions = []
+
+ async def check_poll(self) -> bool:
+ """Validate availability of the speaker based on recent activity."""
+ if not self.should_poll:
+ return False
+ self.logger.log(VERBOSE_LOG_LEVEL, "Polling player for availability...")
+ try:
+ await asyncio.to_thread(self.ping)
+ self._speaker_activity("ping")
+ except SonosUpdateError:
+ if not self.available:
+ return False # already offline
+ self.logger.warning(
+ "No recent activity and cannot reach %s, marking unavailable",
+ self.zone_name,
+ )
+ await self.offline()
+ return True
+
+ def update_ip(self, ip_address: str) -> None:
+ """Handle updated IP of a Sonos player (NOT async friendly)."""
+ if self.available:
+ return
+ self.logger.debug(
+ "Player IP-address changed from %s to %s", self.soco.ip_address, ip_address
+ )
+ try:
+ self.ping()
+ except SonosUpdateError:
+ return
+ self.soco.ip_address = ip_address
+ self.setup()
+ self.mass_player.device_info = DeviceInfo(
+ model=self.mass_player.device_info.model,
+ address=ip_address,
+ manufacturer=self.mass_player.device_info.manufacturer,
+ )
+ self.update_player()
+
+ @soco_error()
+ def ping(self) -> None:
+ """Test device availability. Failure will raise SonosUpdateError."""
+ self.soco.renderingControl.GetVolume([("InstanceID", 0), ("Channel", "Master")], timeout=1)
+
+ async def join(
+ self,
+ members: list[SonosPlayer],
+ ) -> None:
+ """Sync given players/speakers with this player."""
+ async with self.sonos_prov.topology_condition:
+ group: list[SonosPlayer] = await self.mass.create_task(self._join, members)
+ await self.wait_for_groups([group])
+
+ async def unjoin(self) -> None:
+ """Unjoin player from all/any groups."""
+ async with self.sonos_prov.topology_condition:
+ await self.mass.create_task(self._unjoin)
+ await self.wait_for_groups([[self]])
+
+ def update_player(self, signal_update: bool = True) -> None:
+ """Update Sonos Player."""
+ self._update_attributes()
+ if signal_update:
+ # send update to the player manager right away only if we are triggered from an event
+ # when we're just updating from a manual poll, the player manager
+ # will detect changes to the player object itself
+ self.mass.loop.call_soon_threadsafe(self.sonos_prov.mass.players.update, self.player_id)
+
+ async def poll_speaker(self) -> None:
+ """Poll the speaker for updates."""
+
+ def _poll():
+ """Poll the speaker for updates (NOT async friendly)."""
+ self.update_groups()
+ self.poll_media()
+ self.mass_player.volume_level = self.soco.volume
+ self.mass_player.volume_muted = self.soco.mute
+
+ await asyncio.to_thread(_poll)
+
+ @soco_error()
+ def poll_media(self) -> None:
+ """Poll information about currently playing media."""
+ transport_info = self.soco.get_current_transport_info()
+ new_status = transport_info["current_transport_state"]
+
+ if new_status == SONOS_STATE_TRANSITIONING:
+ return
+
+ update_position = new_status != self.playback_status
+ self.playback_status = new_status
+ self.play_mode = self.soco.play_mode
+ self._set_basic_track_info(update_position=update_position)
+ self.update_player()
+
+ async def _subscribe_target(self, target: SubscriptionBase, sub_callback: Callable) -> None:
+ """Create a Sonos subscription for given target."""
+ subscription = await target.subscribe(
+ auto_renew=True, requested_timeout=SUBSCRIPTION_TIMEOUT
+ )
+
+ def on_renew_failed(exception: Exception) -> None:
+ """Handle a failed subscription renewal callback."""
+ self.mass.create_task(self._renew_failed(exception))
+
+ subscription.callback = sub_callback
+ subscription.auto_renew_fail = on_renew_failed
+ self._subscriptions.append(subscription)
+
+ async def _renew_failed(self, exception: Exception) -> None:
+ """Mark the speaker as offline after a subscription renewal failure.
+
+ This is to reset the state to allow a future clean subscription attempt.
+ """
+ if not self.available:
+ return
+
+ self.log_subscription_result(exception, "Subscription renewal", logging.WARNING)
+ await self.offline()
+
+ def _handle_event(self, event: SonosEvent) -> None:
+ """Handle SonosEvent callback."""
+ service_type: str = event.service.service_type
+ self._speaker_activity(f"{service_type} subscription")
+
+ if service_type == "DeviceProperties":
+ self.update_player()
+ return
+ if service_type == "AVTransport":
+ self._handle_avtransport_event(event)
+ return
+ if service_type == "RenderingControl":
+ self._handle_rendering_control_event(event)
+ return
+ if service_type == "ZoneGroupTopology":
+ self._handle_zone_group_topology_event(event)
+ return
+
+ def _handle_avtransport_event(self, event: SonosEvent) -> None:
+ """Update information about currently playing media from an event."""
+ # NOTE: The new coordinator can be provided in a media update event but
+ # before the ZoneGroupState updates. If this happens the playback
+ # state will be incorrect and should be ignored. Switching to the
+ # new coordinator will use its media. The regrouping process will
+ # be completed during the next ZoneGroupState update.
+ av_transport_uri = event.variables.get("av_transport_uri", "")
+ current_track_uri = event.variables.get("current_track_uri", "")
+ if av_transport_uri == current_track_uri and av_transport_uri.startswith("x-rincon:"):
+ new_coordinator_uid = av_transport_uri.split(":")[-1]
+ if new_coordinator_speaker := self.sonos_prov.sonosplayers.get(new_coordinator_uid):
+ self.logger.log(
+ 5,
+ "Media update coordinator (%s) received for %s",
+ new_coordinator_speaker.zone_name,
+ self.zone_name,
+ )
+ self.sync_coordinator = new_coordinator_speaker
+ else:
+ self.logger.debug(
+ "Media update coordinator (%s) for %s not yet available",
+ new_coordinator_uid,
+ self.zone_name,
+ )
+ return
+
+ if crossfade := event.variables.get("current_crossfade_mode"):
+ self.crossfade = bool(int(crossfade))
+
+ # Missing transport_state indicates a transient error
+ if (new_status := event.variables.get("transport_state")) is None:
+ return
+
+ # Ignore transitions, we should get the target state soon
+ if new_status == SONOS_STATE_TRANSITIONING:
+ return
+
+ evars = event.variables
+ new_status = evars["transport_state"]
+ state_changed = new_status != self.playback_status
+
+ self.play_mode = evars["current_play_mode"]
+ self.playback_status = new_status
+
+ track_uri = evars["enqueued_transport_uri"] or evars["current_track_uri"]
+ audio_source = self.soco.music_source_from_uri(track_uri)
+
+ self._set_basic_track_info(update_position=state_changed)
+
+ if (ct_md := evars["current_track_meta_data"]) and not self.image_url:
+ if album_art_uri := getattr(ct_md, "album_art_uri", None):
+ # TODO: handle library mess here
+ self.image_url = album_art_uri
+
+ et_uri_md = evars["enqueued_transport_uri_meta_data"]
+ if isinstance(et_uri_md, DidlPlaylistContainer):
+ self.playlist_name = et_uri_md.title
+
+ if queue_size := evars.get("number_of_tracks", 0):
+ self.queue_size = int(queue_size)
+
+ if audio_source == MUSIC_SRC_RADIO:
+ if et_uri_md:
+ self.channel = et_uri_md.title
+
+ # Extra guards for S1 compatibility
+ if ct_md and hasattr(ct_md, "radio_show") and ct_md.radio_show:
+ radio_show = ct_md.radio_show.split(",")[0]
+ self.channel = " • ".join(filter(None, [self.channel, radio_show]))
+
+ if isinstance(et_uri_md, DidlAudioBroadcast):
+ self.title = self.title or self.channel
+
+ self.update_player()
+
+ def _handle_rendering_control_event(self, event: SonosEvent) -> None:
+ """Update information about currently volume settings."""
+ variables = event.variables
+
+ if "volume" in variables:
+ volume = variables["volume"]
+ self.mass_player.volume_level = int(volume["Master"])
+
+ if mute := variables.get("mute"):
+ self.mass_player.volume_muted = mute["Master"] == "1"
+
+ self.update_player()
+
+ def _handle_zone_group_topology_event(self, event: SonosEvent) -> None:
+ """Handle callback for topology change event."""
+ if "zone_player_uui_ds_in_group" not in event.variables:
+ return
+ asyncio.run_coroutine_threadsafe(self.create_update_groups_coro(event), self.mass.loop)
+
+ async def _rebooted(self) -> None:
+ """Handle a detected speaker reboot."""
+ self.logger.debug("%s rebooted, reconnecting", self.zone_name)
+ await self.offline()
+ self._speaker_activity("reboot")
+
+ def update_groups(self) -> None:
+ """Update group topology when polling."""
+ asyncio.run_coroutine_threadsafe(self.create_update_groups_coro(), self.mass.loop)
+
+ def update_group_for_uid(self, uid: str) -> None:
+ """Update group topology if uid is missing."""
+ if uid not in self._group_members_missing:
+ return
+ missing_zone = self.sonos_prov.sonosplayers[uid].zone_name
+ self.logger.debug("%s was missing, adding to %s group", missing_zone, self.zone_name)
+ self.update_groups()
+
+ def create_update_groups_coro(self, event: SonosEvent | None = None) -> Coroutine:
+ """Handle callback for topology change event."""
+
+ def _get_soco_group() -> list[str]:
+ """Ask SoCo cache for existing topology."""
+ coordinator_uid = self.soco.uid
+ joined_uids = []
+ with contextlib.suppress(OSError, SoCoException):
+ if self.soco.group and self.soco.group.coordinator:
+ coordinator_uid = self.soco.group.coordinator.uid
+ joined_uids = [
+ p.uid
+ for p in self.soco.group.members
+ if p.uid != coordinator_uid and p.is_visible
+ ]
+
+ return [coordinator_uid, *joined_uids]
+
+ async def _extract_group(event: SonosEvent | None) -> list[str]:
+ """Extract group layout from a topology event."""
+ group = event and event.zone_player_uui_ds_in_group
+ if group:
+ assert isinstance(group, str)
+ return group.split(",")
+ return await self.mass.create_task(_get_soco_group)
+
+ def _regroup(group: list[str]) -> None:
+ """Rebuild internal group layout (async safe)."""
+ if group == [self.soco.uid] and self.group_members == [self] and self.group_members_ids:
+ # Skip updating existing single speakers in polling mode
+ return
+
+ group_members = []
+ group_members_ids = []
+
+ for uid in group:
+ speaker = self.sonos_prov.sonosplayers.get(uid)
+ if speaker:
+ self._group_members_missing.discard(uid)
+ group_members.append(speaker)
+ group_members_ids.append(uid)
+ else:
+ self._group_members_missing.add(uid)
+ self.logger.debug(
+ "%s group member unavailable (%s), will try again",
+ self.zone_name,
+ uid,
+ )
+ return
+
+ if self.group_members_ids == group_members_ids:
+ # Useful in polling mode for speakers with stereo pairs or surrounds
+ # as those "invisible" speakers will bypass the single speaker check
+ return
+
+ self.sync_coordinator = None
+ self.group_members = group_members
+ self.group_members_ids = group_members_ids
+ self.mass.players.update(self.player_id)
+
+ for joined_uid in group[1:]:
+ joined_speaker: SonosPlayer = self.sonos_prov.sonosplayers.get(joined_uid)
+ if joined_speaker:
+ joined_speaker.sync_coordinator = self
+ joined_speaker.group_members = group_members
+ joined_speaker.group_members_ids = group_members_ids
+ joined_speaker.update_player()
+
+ self.logger.debug("Regrouped %s: %s", self.zone_name, self.group_members_ids)
+ self.update_player()
+
+ async def _handle_group_event(event: SonosEvent | None) -> None:
+ """Get async lock and handle event."""
+ async with self.sonos_prov.topology_condition:
+ group = await _extract_group(event)
+ if self.soco.uid == group[0]:
+ _regroup(group)
+ self.sonos_prov.topology_condition.notify_all()
+
+ return _handle_group_event(event)
+
+ async def wait_for_groups(self, groups: list[list[SonosPlayer]]) -> None:
+ """Wait until all groups are present, or timeout."""
+
+ def _test_groups(groups: list[list[SonosPlayer]]) -> bool:
+ """Return whether all groups exist now."""
+ for group in groups:
+ coordinator = group[0]
+
+ # Test that coordinator is coordinating
+ current_group = coordinator.group_members
+ if coordinator != current_group[0]:
+ return False
+
+ # Test that joined members match
+ if set(group[1:]) != set(current_group[1:]):
+ return False
+
+ return True
+
+ try:
+ async with asyncio.timeout(5):
+ while not _test_groups(groups):
+ await self.sonos_prov.topology_condition.wait()
+ except TimeoutError:
+ self.logger.warning("Timeout waiting for target groups %s", groups)
+
+ any_speaker = next(iter(self.sonos_prov.sonosplayers.values()))
+ any_speaker.soco.zone_group_state.clear_cache()
+
+ def _update_attributes(self) -> None:
+ """Update attributes of the MA Player from SoCo state."""
+ # generic attributes (player_info)
+ self.mass_player.available = self.available
+
+ if not self.available:
+ self.mass_player.powered = False
+ self.mass_player.state = PlayerState.IDLE
+ self.mass_player.synced_to = None
+ self.mass_player.group_childs = set()
+ return
+
+ # transport info (playback state)
+ self.mass_player.state = current_state = _convert_state(self.playback_status)
+
+ # power 'on' player if we detect its playing
+ if not self.mass_player.powered and (
+ current_state == PlayerState.PLAYING
+ or (
+ self.sync_coordinator
+ and self.sync_coordinator.mass_player.state == PlayerState.PLAYING
+ )
+ ):
+ self.mass_player.powered = True
+
+ # media info (track info)
+ self.mass_player.current_item_id = self.uri
+ if self.uri and self.mass.streams.base_url in self.uri and self.player_id in self.uri:
+ self.mass_player.active_source = self.player_id
+ else:
+ self.mass_player.active_source = self.source_name
+ if self.position is not None and self.position_updated_at is not None:
+ self.mass_player.elapsed_time = self.position
+ self.mass_player.elapsed_time_last_updated = self.position_updated_at.timestamp()
+
+ # zone topology (syncing/grouping) details
+ self.mass_player.can_sync_with = tuple(
+ x.player_id
+ for x in self.sonos_prov.sonosplayers.values()
+ if x.player_id != self.player_id
+ )
+ if self.sync_coordinator:
+ # player is synced to another player
+ self.mass_player.synced_to = self.sync_coordinator.player_id
+ self.mass_player.group_childs = set()
+ self.mass_player.active_source = self.sync_coordinator.mass_player.active_source
+ elif len(self.group_members_ids) > 1:
+ # this player is the sync leader in a group
+ self.mass_player.synced_to = None
+ self.mass_player.group_childs = set(self.group_members_ids)
+ else:
+ # standalone player, not synced
+ self.mass_player.synced_to = None
+ self.mass_player.group_childs = set()
+
+ def _set_basic_track_info(self, update_position: bool = False) -> None:
+ """Query the speaker to update media metadata and position info."""
+ self.channel = None
+ self.duration = None
+ self.image_url = None
+ self.source_name = None
+ self.title = None
+ self.uri = None
+
+ try:
+ track_info = self._poll_track_info()
+ except SonosUpdateError as err:
+ self.logger.warning("Fetching track info failed: %s", err)
+ return
+ if not track_info["uri"]:
+ return
+ self.uri = track_info["uri"]
+
+ audio_source = self.soco.music_source_from_uri(self.uri)
+ if source := SOURCE_MAPPING.get(audio_source):
+ self.source_name = source
+ if audio_source in LINEIN_SOURCES:
+ self.position = None
+ self.position_updated_at = None
+ self.title = source
+ return
+
+ self.artist = track_info.get("artist")
+ self.album_name = track_info.get("album")
+ self.title = track_info.get("title")
+ self.image_url = track_info.get("album_art")
+
+ playlist_position = int(track_info.get("playlist_position", -1))
+ if playlist_position > 0:
+ self.queue_position = playlist_position
+
+ self._update_media_position(track_info, force_update=update_position)
+
+ def _update_media_position(
+ self, position_info: dict[str, int], force_update: bool = False
+ ) -> None:
+ """Update state when playing music tracks."""
+ duration = position_info.get(DURATION_SECONDS)
+ current_position = position_info.get(POSITION_SECONDS)
+
+ if not (duration or current_position):
+ self.position = None
+ self.position_updated_at = None
+ return
+
+ should_update = force_update
+ self.duration = duration
+
+ # player started reporting position?
+ if current_position is not None and self.position is None:
+ should_update = True
+
+ # position jumped?
+ if current_position is not None and self.position is not None:
+ if self.playback_status == SONOS_STATE_PLAYING:
+ assert self.position_updated_at is not None
+ time_delta = utc() - self.position_updated_at
+ time_diff = time_delta.total_seconds()
+ else:
+ time_diff = 0
+
+ calculated_position = self.position + time_diff
+
+ if abs(calculated_position - current_position) > 1.5:
+ should_update = True
+
+ if current_position is None:
+ self.position = None
+ self.position_updated_at = None
+ elif should_update:
+ self.position = current_position
+ self.position_updated_at = utc()
+
+ def _speaker_activity(self, source: str) -> None:
+ """Track the last activity on this speaker, set availability and resubscribe."""
+ if self._resub_cooldown_expires_at:
+ if time.monotonic() < self._resub_cooldown_expires_at:
+ self.logger.debug(
+ "Activity on %s from %s while in cooldown, ignoring",
+ self.zone_name,
+ source,
+ )
+ return
+ self._resub_cooldown_expires_at = None
+
+ self.logger.log(VERBOSE_LOG_LEVEL, "Activity on %s from %s", self.zone_name, source)
+ self._last_activity = time.monotonic()
+ was_available = self.available
+ self.available = True
+ if not was_available:
+ self.update_player()
+ self.mass.loop.call_soon_threadsafe(self.mass.create_task, self.subscribe())
+
+ @soco_error()
+ def _join(self, members: list[SonosPlayer]) -> list[SonosPlayer]:
+ if self.sync_coordinator:
+ self.unjoin()
+ group = [self]
+ else:
+ group = self.group_members.copy()
+
+ for player in members:
+ if player.soco.uid != self.soco.uid and player not in group:
+ player.soco.join(self.soco)
+ player.sync_coordinator = self
+ group.append(player)
+
+ return group
+
+ @soco_error()
+ def _unjoin(self) -> None:
+ if self.group_members == [self]:
+ return
+ self.soco.unjoin()
+ self.sync_coordinator = None
+
+ @soco_error()
+ def _poll_track_info(self) -> dict[str, Any]:
+ """Poll the speaker for current track info.
+
+ Add converted position values (NOT async fiendly).
+ """
+ track_info: dict[str, Any] = self.soco.get_current_track_info()
+ track_info[DURATION_SECONDS] = _timespan_secs(track_info.get("duration"))
+ track_info[POSITION_SECONDS] = _timespan_secs(track_info.get("position"))
+ return track_info
+
+
+def _convert_state(sonos_state: str) -> PlayerState:
+ """Convert Sonos state to PlayerState."""
+ if sonos_state == "PLAYING":
+ return PlayerState.PLAYING
+ if sonos_state == "TRANSITIONING":
+ return PlayerState.PLAYING
+ if sonos_state == "PAUSED_PLAYBACK":
+ return PlayerState.PAUSED
+ return PlayerState.IDLE
+
+
+def _timespan_secs(timespan):
+ """Parse a time-span into number of seconds."""
+ if timespan in ("", "NOT_IMPLEMENTED", None):
+ return None
+ return sum(60 ** x[0] * int(x[1]) for x in enumerate(reversed(timespan.split(":"))))
+++ /dev/null
-<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 512 512"><rect width="512" height="512" rx="15%" fill="#f50"/><path d="m59 270-3 22 3 22c0 2 3 2 3 0l3-22-3-22c0-3-3-3-3 0zm18-14c0-3-3-3-3 0l-5 36 4 35c0 3 4 3 4 0l4-35zm59-30-3 66 2 40c0 8 7 8 7 0l4-40-4-66c0-5-6-5-6 0zm-31 22-4 44 3 40c0 6 5 6 5 0l4-40-4-44c0-3-4-3-4 0zm70 84 3-40-3-88c0-6-7-6-7 0l-3 88 2 40c0 8 8 8 8 0zm68 0 2-40-2-102c0-7-10-7-10 0l-2 102 2 40c0 8 10 8 10 0zm-34 0 3-40-3-89c0-6-9-6-9 0l-2 89 2 40c0 8 9 8 9 0zm-83 0 3-40-3-41c0-3-6-3-6 0l-3 41 3 40c0 7 6 7 6 0zm-33 0 4-40-4-43c0-3-4-3-4 0l-4 43 4 40c0 4 4 4 4 0zm124-125-2 85 1 40c0 8 10 8 10 0l2-40-2-85c0-7-9-7-9 0zm-58 125 3-40-3-81c0-6-7-6-7 0l-3 81 2 40c0 8 8 8 8 0zm33 0 3-40-3-91c0-6-8-6-8 0l-3 91 3 40c0 8 8 8 8 0zm196-89c-5-57-64-94-118-73-4 2-5 3-5 6v156c0 3 2 6 5 6h137c27 0 49-22 49-49 0-37-35-57-68-46zm-138-62-3 111 3 40c0 8 10 8 10 0l3-40-3-111c0-7-10-7-10 0z" fill="#fff"/></svg>
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
) -> ProviderInstanceType:
"""Initialize provider(instance) with given configuration."""
- if not config.get_value(CONF_REFRESH_TOKEN):
+ if config.get_value(CONF_REFRESH_TOKEN) in (None, ""):
msg = "Re-Authentication required"
raise SetupFailedError(msg)
return SpotifyProvider(mass, manifest, config)
aiojellyfin==0.10.0
aiorun==2024.8.1
aioslimproto==3.0.1
+aiosonos==0.1.1
aiosqlite==0.20.0
async-upnp-client==0.40.0
bidict==0.23.1
shortuuid==1.0.13
snapcast==2.3.6
soco==0.30.4
-sonos-websocket==0.1.3
soundcloudpy==0.1.0
tidalapi==0.7.6
unidecode==1.3.8