From: Marcel van der Veldt Date: Sun, 28 Jan 2024 21:48:50 +0000 (+0100) Subject: Fix Sonos player provider (#1038) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=a1d02f9f56ddb546a5da128f581566a5c1f8417e;p=music-assistant-server.git Fix Sonos player provider (#1038) --- diff --git a/music_assistant/common/models/player.py b/music_assistant/common/models/player.py index 86f85016..55f9ab58 100644 --- a/music_assistant/common/models/player.py +++ b/music_assistant/common/models/player.py @@ -51,7 +51,7 @@ class Player(DataClassDictMixin): # active_source: return player_id of the active queue for this player # if the player is grouped and a group is active, this will be set to the group's player_id # otherwise it will be set to the own player_id - active_source: str = "" + active_source: str | None = None # current_item_id: return item_id/uri of the current active/loaded item on the player # this may be a MA queue_item_id, url, uri or some provider specific string diff --git a/music_assistant/server/controllers/config.py b/music_assistant/server/controllers/config.py index 0b2ddc3c..c46b7ad5 100644 --- a/music_assistant/server/controllers/config.py +++ b/music_assistant/server/controllers/config.py @@ -586,9 +586,17 @@ class ConfigController: """ Set (raw) single config(entry) value for a provider. - Note that this only returns the stored value without any validation or default. + Note that this only stores the (raw) value without any validation or default. + """ + self.set(f"{CONF_PROVIDERS}/{provider_instance}/{key}", value) + + def set_raw_player_config_value(self, player_id: str, key: str, value: ConfigValueType) -> None: + """ + Set (raw) single config(entry) value for a player. + + Note that this only stores the (raw) value without any validation or default. """ - return self.set(f"{CONF_PROVIDERS}/{provider_instance}/{key}", value) + self.set(f"{CONF_PLAYERS}/{player_id}/values/{key}", value) def save(self, immediate: bool = False) -> None: """Schedule save of data to disk.""" diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index 4012f15b..012e20cc 100755 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -21,7 +21,6 @@ from music_assistant.common.models.enums import ( ) from music_assistant.common.models.errors import ( AlreadyRegisteredError, - PlayerCommandFailed, PlayerUnavailableError, ProviderUnavailableError, UnsupportedFeaturedException, @@ -427,6 +426,9 @@ class PlayerController(CoreController): # forward to player provider player_provider = self.get_player_provider(player_id) await player_provider.cmd_power(player_id, powered) + else: + # allow the stop command to process and prevent race conditions + await asyncio.sleep(0.2) # always optimistically set the power state to update the UI # as fast as possible and prevent race conditions player.powered = powered @@ -656,8 +658,6 @@ class PlayerController(CoreController): raise UnsupportedFeaturedException( f"Player {parent_player.name} does not support (un)sync commands" ) - if player_id not in parent_player.can_sync_with: - raise PlayerCommandFailed(f"Player {player_id} can not be synced to {target_player}.") if child_player.synced_to: if child_player.synced_to == parent_player.player_id: # nothing to do: already synced to this parent @@ -775,7 +775,7 @@ class PlayerController(CoreController): """Return the active_source id for given player.""" # if player is synced, return group leader's active source if player.synced_to and (parent_player := self.get(player.synced_to)): - return self._get_active_source(parent_player) + return parent_player.player_id if active_player_group := self._get_active_player_group(player): return active_player_group.player_id # defaults to the player's own player id if not active source set @@ -890,16 +890,20 @@ class PlayerController(CoreController): return player def get_sync_leader(self, group_player: Player) -> Player | None: - """Get the sync leader player for a syncgroup or synced player.""" + """Get the active sync leader player for a syncgroup or synced player.""" if group_player.synced_to: # should not happen but just in case... return group_player.synced_to for child_player in self.iter_group_members( group_player, only_powered=True, only_playing=False ): - if not child_player.group_childs: + if child_player.synced_to and child_player.synced_to in group_player.group_childs: + return self.get(child_player.synced_to) + elif child_player.synced_to: + # player is already synced to a member outside this group ?! continue - return child_player + elif child_player.group_childs: + return child_player return None async def _sync_syncgroup(self, player_id: str) -> None: @@ -952,6 +956,7 @@ class PlayerController(CoreController): device_info=DeviceInfo(model="SyncGroup", manufacturer=provider.title()), supported_features=first_player.supported_features, group_childs=set(members), + active_source=group_player_id, ) self.mass.players.register_or_update(player) return player diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 61b97384..b7e5c4e7 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -104,11 +104,12 @@ class MultiClientStreamJob: self.expected_players: set[str] = set() self.subscribed_players: dict[str, asyncio.Queue[bytes]] = {} self.bytes_streamed: int = 0 - self.client_seconds_skipped: dict[str, int] = {} self._all_clients_connected = asyncio.Event() self.logger = stream_controller.logger.getChild(f"streamjob_{self.job_id}") self._finished: bool = False - self._first_chunk: bytes = b"" + self.workaround_players_seen: set[str] = set() + # start running the audio task in the background + self._audio_task = asyncio.create_task(self._stream_job_runner()) @property def finished(self) -> bool: @@ -125,11 +126,6 @@ class MultiClientStreamJob: """Return if this Job is running.""" return not self.finished and not self.pending - def start(self) -> None: - """Start running this streamjob.""" - # start running the audio task in the background - self._audio_task = asyncio.create_task(self._stream_job_runner()) - def stop(self) -> None: """Stop running this job.""" self._finished = True @@ -164,21 +160,22 @@ class MultiClientStreamJob: async def subscribe(self, player_id: str) -> AsyncGenerator[bytes, None]: """Subscribe consumer and iterate incoming chunks on the queue.""" + if ( + player_id in self.stream_controller.workaround_players + and player_id not in self.workaround_players_seen + ): + self.workaround_players_seen.add(player_id) + yield b"" + return + try: self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2) - if self._first_chunk: - yield self._first_chunk - if self._all_clients_connected.is_set(): # client subscribes while we're already started - self.logger.debug( + self.logger.warning( "Client %s is joining while the stream is already started", player_id ) - # calculate how many seconds the client missed so far - self.client_seconds_skipped[player_id] = ( - self.bytes_streamed / self.pcm_format.pcm_sample_size - ) else: self.logger.debug("Subscribed client %s", player_id) @@ -245,11 +242,6 @@ class MultiClientStreamJob: await self._put_chunk(chunk) - # keep first chunk to workaround (dlna) players that do multiple get requests - if chunk_num == 1: - self._first_chunk = chunk - await asyncio.sleep(0.1) - # mark EOF with empty chunk await self._put_chunk(b"") @@ -284,6 +276,7 @@ class StreamsController(CoreController): "some player specific local control callbacks." ) self.manifest.icon = "cast-audio" + self.workaround_players: set[str] = set() @property def base_url(self) -> str: @@ -706,6 +699,7 @@ class StreamsController(CoreController): "to the same stream, playback may be disturbed!", child_player_id, ) + self.workaround_players.add(child_player_id) # all checks passed, start streaming! self.logger.debug( diff --git a/music_assistant/server/providers/slimproto/__init__.py b/music_assistant/server/providers/slimproto/__init__.py index 7a11a28a..4817b48f 100644 --- a/music_assistant/server/providers/slimproto/__init__.py +++ b/music_assistant/server/providers/slimproto/__init__.py @@ -430,7 +430,6 @@ class SlimprotoProvider(PlayerProvider): auto_play=False, ) ) - stream_job.start() else: # regular, single player playback client = self._socket_clients[player_id] diff --git a/music_assistant/server/providers/sonos/__init__.py b/music_assistant/server/providers/sonos/__init__.py index 3095ea09..814f1f9c 100644 --- a/music_assistant/server/providers/sonos/__init__.py +++ b/music_assistant/server/providers/sonos/__init__.py @@ -1,17 +1,25 @@ -"""Sample Player provider for Music Assistant.""" +""" +Sonos Player provider for Music Assistant. + +Note that large parts of this code are copied over from the Home Assistant +integratioon for Sonos. +""" from __future__ import annotations import asyncio import logging import time +from collections import OrderedDict from contextlib import suppress -from typing import TYPE_CHECKING, Any +from dataclasses import dataclass, field +from typing import TYPE_CHECKING -import soco -from soco import config -from soco.events_base import Event as SonosEvent -from soco.events_base import SubscriptionBase +import soco.config as soco_config +from requests.exceptions import Timeout +from soco import SoCoException, events_asyncio, zonegroupstate +from soco.core import SoCo +from soco.discovery import discover from music_assistant.common.models.config_entries import ( CONF_ENTRY_CROSSFADE, @@ -22,7 +30,6 @@ from music_assistant.common.models.enums import ( ConfigEntryType, ContentType, PlayerFeature, - PlayerState, PlayerType, ProviderFeature, ) @@ -33,6 +40,8 @@ from music_assistant.constants import CONF_CROSSFADE, CONF_PLAYERS from music_assistant.server.helpers.didl_lite import create_didl_metadata from music_assistant.server.models.player_provider import PlayerProvider +from .player import SonosPlayer + if TYPE_CHECKING: from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig from music_assistant.common.models.provider import ProviderManifest @@ -40,7 +49,6 @@ if TYPE_CHECKING: from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType -LOGGER = logging.getLogger(__name__) PLAYER_FEATURES = ( PlayerFeature.SYNC, @@ -50,10 +58,9 @@ PLAYER_FEATURES = ( ) CONF_NETWORK_SCAN = "network_scan" +SUBSCRIPTION_TIMEOUT = 1200 +ZGS_SUBSCRIPTION_TIMEOUT = 2 -# set event listener port to something other than 1400 -# to allow coextistence with HA on the same host -config.EVENT_LISTENER_PORT = 1700 HIRES_MODELS = ( "Sonos Roam", @@ -75,6 +82,16 @@ async def setup( mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig ) -> ProviderInstanceType: """Initialize provider(instance) with given configuration.""" + # set event listener port to something other than 1400 + # to allow coextistence with HA on the same host + soco_config.EVENT_LISTENER_PORT = 1700 + soco_config.EVENTS_MODULE = events_asyncio + soco_config.REQUEST_TIMEOUT = 9.5 + soco_config.ZGT_EVENT_FALLBACK = False + zonegroupstate.EVENT_CACHE_TIMEOUT = SUBSCRIPTION_TIMEOUT + # silence the soco logger a bit + logging.getLogger("soco").setLevel(logging.INFO) + logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO) prov = SonosPlayerProvider(mass, manifest, config) await prov.handle_setup() return prov @@ -106,227 +123,12 @@ async def get_config_entries( ) -class SonosPlayer: - """Wrapper around Sonos/SoCo with some additional attributes.""" - - def __init__(self, sonos_prov: SonosPlayerProvider, soco_device: soco.SoCo) -> None: - """Initialize SonosPlayer instance.""" - self.sonos_prov = sonos_prov - self.player_id = soco_device.uid - self.soco_device = soco_device - self.is_stereo_pair: bool = False - self.elapsed_time: int = 0 - self.playback_started: float | None = None - self.need_elapsed_time_workaround: bool = False - self.subscriptions: list[SubscriptionBase] = [] - self.transport_info: dict = {} - self.track_info: dict = {} - self.speaker_info: dict = {} - self.rendering_control_info: dict = {} - self.speaker_info_updated: float = 0.0 - self.transport_info_updated: float = 0.0 - self.track_info_updated: float = 0.0 - self.rendering_control_info_updated: float = 0.0 - - def update_info( - self, - update_transport_info: bool = False, - update_track_info: bool = False, - update_speaker_info: bool = False, - update_rendering_control_info: bool = False, - ): - """Poll all info from player (must be run in executor thread).""" - # transport info - if update_transport_info: - transport_info = self.soco_device.get_current_transport_info() - if transport_info.get("current_transport_state") != "TRANSITIONING": - self.transport_info = transport_info - self.transport_info_updated = time.time() - # track info - if update_track_info: - self.track_info = self.soco_device.get_current_track_info() - # sonos reports bullshit elapsed time while playing radio (or flow mode), - # trying to be "smart" and resetting the counter when new ICY metadata is detected - # we try to detect this and work around it - self.need_elapsed_time_workaround = self.track_info["duration"] == "0:00:00" - if not self.need_elapsed_time_workaround: - self.elapsed_time = _timespan_secs(self.track_info["position"]) or 0 - self.track_info_updated = time.time() - - # speaker info - if update_speaker_info: - self.speaker_info = self.soco_device.get_speaker_info() - self.speaker_info_updated = time.time() - # rendering control info - if update_rendering_control_info: - self.rendering_control_info["volume"] = self.soco_device.volume - self.rendering_control_info["mute"] = self.soco_device.mute - self.rendering_control_info_updated = time.time() - - def update_attributes(self): - """Update attributes of the MA Player from soco.SoCo state.""" - mass_player = self.sonos_prov.mass.players.get(self.player_id) - if not mass_player: - return - now = time.time() - # generic attributes (speaker_info) - mass_player.available = True - mass_player.name = self.speaker_info["zone_name"] - mass_player.volume_level = int(self.rendering_control_info["volume"]) - mass_player.volume_muted = self.rendering_control_info["mute"] - - # transport info (playback state) - current_transport_state = self.transport_info["current_transport_state"] - mass_player.state = current_state = _convert_state(current_transport_state) - - if self.playback_started is not None and current_state == PlayerState.IDLE: - self.playback_started = None - elif self.playback_started is None and current_state == PlayerState.PLAYING: - self.playback_started = now - mass_player.powered = True - - # media info (track info) - mass_player.current_item_id = self.track_info["uri"] - if mass_player.player_id in mass_player.current_item_id: - mass_player.active_source = mass_player.player_id - elif "spotify" in mass_player.current_item_id: - mass_player.active_source = "spotify" - else: - mass_player.active_source = self.soco_device.music_source_from_uri( - self.track_info["uri"] - ) - if not self.need_elapsed_time_workaround: - mass_player.elapsed_time = self.elapsed_time - mass_player.elapsed_time_last_updated = self.track_info_updated - - # zone topology (syncing/grouping) details - if ( - self.soco_device.group - and self.soco_device.group.coordinator - and self.soco_device.group.coordinator.uid == self.player_id - ): - # this player is the sync leader - mass_player.synced_to = None - group_members = {x.uid for x in self.soco_device.group.members if x.is_visible} - if not group_members: - # not sure about this ?! - mass_player.type = PlayerType.PLAYER - elif group_members == {self.player_id}: - mass_player.group_childs = set() - else: - mass_player.group_childs = group_members - elif self.soco_device.group and self.soco_device.group.coordinator: - # player is synced to - mass_player.group_childs = set() - mass_player.synced_to = self.soco_device.group.coordinator.uid - else: - # unsure - mass_player.group_childs = set() - - async def check_poll(self) -> None: - """Check if any of the endpoints needs to be polled for info.""" - cur_time = time.time() - update_transport_info = (cur_time - self.transport_info_updated) > 30 - update_track_info = self.transport_info.get("current_transport_state") == "PLAYING" or ( - (cur_time - self.track_info_updated) > 300 - ) - update_speaker_info = (cur_time - self.speaker_info_updated) > 300 - update_rendering_control_info = (cur_time - self.rendering_control_info_updated) > 30 - - if not ( - update_transport_info - or update_track_info - or update_speaker_info - or update_rendering_control_info - ): - return - - await asyncio.to_thread( - self.update_info, - update_transport_info, - update_track_info, - update_speaker_info, - update_rendering_control_info, - ) - - async def connect(self) -> None: - """Handle (re)connect of the Sonos player.""" - # poll all endpoints once and update attributes - self.speaker_info = await asyncio.to_thread(self.soco_device.get_speaker_info, True) - self.speaker_info_updated = time.time() - await self.check_poll() - self.update_attributes() - - # handle subscriptions to events - def subscribe(service, _callback): - queue = ProcessSonosEventQueue(_callback) - sub = service.subscribe(auto_renew=True, event_queue=queue) - self.subscriptions.append(sub) - - subscribe(self.soco_device.avTransport, self._handle_av_transport_event) - subscribe(self.soco_device.renderingControl, self._handle_rendering_control_event) - subscribe(self.soco_device.zoneGroupTopology, self._handle_zone_group_topology_event) - - def disconnect(self) -> None: - """Handle disconnect.""" - mass_player = self.sonos_prov.mass.players.get(self.player_id) - mass_player.available = False - LOGGER.debug("Unsubscribing from events for %s", mass_player.display_name) - for subscription in self.subscriptions: - subscription.unsubscribe() - self.subscriptions = [] - - async def reconnect(self, soco_device: soco.SoCo) -> None: - """Handle reconnect.""" - if self.subscriptions: - # handle reconnect - self.disconnect() - self.soco_device = soco_device - await self.connect() - - def _handle_av_transport_event(self, event: SonosEvent): - """Handle a soco.SoCo AVTransport event.""" - LOGGER.debug("Received AVTransport event for Player %s", self.soco_device.player_name) - - if "transport_state" in event.variables: - new_state = event.variables["transport_state"] - if new_state == "TRANSITIONING": - return - self.transport_info["current_transport_state"] = new_state - - if "current_track_uri" in event.variables: - self.transport_info["uri"] = event.variables["current_track_uri"] - - self.transport_info_updated = time.time() - asyncio.run_coroutine_threadsafe( - self.sonos_prov.update_player(self), self.sonos_prov.mass.loop - ) - - def _handle_rendering_control_event(self, event: SonosEvent): - """Handle a soco.SoCo RenderingControl event.""" - LOGGER.debug( - "Received RenderingControl event for Player %s", - self.soco_device.player_name, - ) - if "volume" in event.variables: - self.rendering_control_info["volume"] = event.variables["volume"]["Master"] - if "mute" in event.variables: - self.rendering_control_info["mute"] = event.variables["mute"]["Master"] == "1" - self.rendering_control_info_updated = time.time() - asyncio.run_coroutine_threadsafe( - self.sonos_prov.update_player(self), self.sonos_prov.mass.loop - ) +@dataclass +class UnjoinData: + """Class to track data necessary for unjoin coalescing.""" - def _handle_zone_group_topology_event(self, event: SonosEvent): # noqa: ARG002 - """Handle a soco.SoCo ZoneGroupTopology event.""" - LOGGER.debug( - "Received ZoneGroupTopology event for Player %s - members: %s", - self.soco_device.player_name, - "/".join([x.player_name for x in self.soco_device.group.members]), - ) - asyncio.run_coroutine_threadsafe( - self.sonos_prov.update_player(self), self.sonos_prov.mass.loop - ) + players: list[SonosPlayer] + event: asyncio.Event = field(default_factory=asyncio.Event) class SonosPlayerProvider(PlayerProvider): @@ -343,11 +145,18 @@ class SonosPlayerProvider(PlayerProvider): async def handle_setup(self) -> None: """Handle async initialization of the provider.""" - self.sonosplayers = {} + self.sonosplayers: OrderedDict[str, SonosPlayer] = OrderedDict() + self.topology_condition = asyncio.Condition() + self.discovery_known: set[str] = set() + self.boot_counts: dict[str, int] = {} + self.mdns_names: dict[str, str] = {} + self.unjoin_data: dict[str, UnjoinData] = {} self._discovery_running = False - # silence the soco logger a bit - logging.getLogger("soco").setLevel(logging.INFO) - logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO) + self.hosts_in_error: dict[str, bool] = {} + self.discovery_lock = asyncio.Lock() + self.creation_lock = asyncio.Lock() + self._known_invisible: set[SoCo] = set() + self.mass.create_task(self._run_discovery()) async def unload(self) -> None: @@ -358,11 +167,9 @@ class SonosPlayerProvider(PlayerProvider): # await any in-progress discovery while self._discovery_running: await asyncio.sleep(0.5) - # cleanup players - if self.sonosplayers: - for player_id in list(self.sonosplayers): - player = self.sonosplayers.pop(player_id) - player.disconnect() + await asyncio.gather(*(player.offline() for player in self.sonosplayers.values())) + if events_asyncio.event_listener: + await events_asyncio.event_listener.async_stop() self.sonosplayers = None async def get_player_config_entries( @@ -381,7 +188,7 @@ class SonosPlayerProvider(PlayerProvider): default_value=0, range=(-10, 10), description="Set the Bass level for the Sonos player", - value=sonos_player.soco_device.bass, + value=sonos_player.soco.bass, advanced=True, ), ConfigEntry( @@ -391,7 +198,7 @@ class SonosPlayerProvider(PlayerProvider): default_value=0, range=(-10, 10), description="Set the Treble level for the Sonos player", - value=sonos_player.soco_device.treble, + value=sonos_player.soco.treble, advanced=True, ), ConfigEntry( @@ -400,7 +207,7 @@ class SonosPlayerProvider(PlayerProvider): label="Loudness compensation", default_value=True, description="Enable loudness compensation on the Sonos player", - value=sonos_player.soco_device.loudness, + value=sonos_player.soco.loudness, advanced=True, ), ) @@ -417,18 +224,18 @@ class SonosPlayerProvider(PlayerProvider): return if "values/sonos_bass" in changed_keys: self.mass.create_task( - sonos_player.soco_device.renderingControl.SetBass, + sonos_player.soco.renderingControl.SetBass, [("InstanceID", 0), ("DesiredBass", config.get_value("sonos_bass"))], ) if "values/sonos_treble" in changed_keys: self.mass.create_task( - sonos_player.soco_device.renderingControl.SetTreble, + sonos_player.soco.renderingControl.SetTreble, [("InstanceID", 0), ("DesiredTreble", config.get_value("sonos_treble"))], ) if "values/sonos_loudness" in changed_keys: loudness_value = "1" if config.get_value("sonos_loudness") else "0" self.mass.create_task( - sonos_player.soco_device.renderingControl.SetLoudness, + sonos_player.soco.renderingControl.SetLoudness, [ ("InstanceID", 0), ("Channel", "Master"), @@ -436,51 +243,53 @@ class SonosPlayerProvider(PlayerProvider): ], ) + def is_device_invisible(self, ip_address: str) -> bool: + """Check if device at provided IP is known to be invisible.""" + return any(x for x in self._known_invisible if x.ip_address == ip_address) + async def cmd_stop(self, player_id: str) -> None: """Send STOP command to given player.""" sonos_player = self.sonosplayers[player_id] - if not sonos_player.soco_device.is_coordinator: + if sonos_player.sync_coordinator: self.logger.debug( "Ignore STOP command for %s: Player is synced to another player.", player_id, ) return - await asyncio.to_thread(sonos_player.soco_device.stop) - await asyncio.to_thread(sonos_player.soco_device.clear_queue) - sonos_player.playback_started = None + await asyncio.to_thread(sonos_player.soco.stop) async def cmd_play(self, player_id: str) -> None: """Send PLAY command to given player.""" sonos_player = self.sonosplayers[player_id] - if not sonos_player.soco_device.is_coordinator: + if sonos_player.sync_coordinator: self.logger.debug( "Ignore PLAY command for %s: Player is synced to another player.", player_id, ) return - await asyncio.to_thread(sonos_player.soco_device.play) + await asyncio.to_thread(sonos_player.soco.play) async def cmd_pause(self, player_id: str) -> None: """Send PAUSE command to given player.""" sonos_player = self.sonosplayers[player_id] - if not sonos_player.soco_device.is_coordinator: + if sonos_player.sync_coordinator: self.logger.debug( "Ignore PLAY command for %s: Player is synced to another player.", player_id, ) return - if sonos_player.need_elapsed_time_workaround: - # no pause allowed when radio/flow mode is active + if "Pause" not in sonos_player.soco.available_actions: + # pause not possible await self.cmd_stop(player_id) return - await asyncio.to_thread(sonos_player.soco_device.pause) + await asyncio.to_thread(sonos_player.soco.pause) async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player.""" def set_volume_level(player_id: str, volume_level: int) -> None: sonos_player = self.sonosplayers[player_id] - sonos_player.soco_device.volume = volume_level + sonos_player.soco.volume = volume_level await asyncio.to_thread(set_volume_level, player_id, volume_level) @@ -489,7 +298,7 @@ class SonosPlayerProvider(PlayerProvider): def set_volume_mute(player_id: str, muted: bool) -> None: sonos_player = self.sonosplayers[player_id] - sonos_player.soco_device.mute = muted + sonos_player.soco.mute = muted await asyncio.to_thread(set_volume_mute, player_id, muted) @@ -502,19 +311,8 @@ class SonosPlayerProvider(PlayerProvider): - target_player: player_id of the syncgroup master or group player. """ sonos_player = self.sonosplayers[player_id] - sonos_master_player = self.sonosplayers[target_player].soco_device - retries = 0 - while True: - try: - await asyncio.to_thread(sonos_player.soco_device.join, sonos_master_player) - break - except soco.exceptions.SoCoUPnPException as err: - if retries >= 3: - raise err - retries += 1 - await asyncio.sleep(1) - # optimistically update player state - # await self.update_player(sonos_player) + sonos_master_player = self.sonosplayers[target_player] + await sonos_master_player.join([sonos_player]) async def cmd_unsync(self, player_id: str) -> None: """Handle UNSYNC command for given player. @@ -524,9 +322,7 @@ class SonosPlayerProvider(PlayerProvider): - player_id: player_id of the player to handle the command. """ sonos_player = self.sonosplayers[player_id] - await asyncio.to_thread(sonos_player.soco_device.unjoin) - # optimistically update player state - # await self.update_player(sonos_player) + await sonos_player.unjoin() async def play_media( self, @@ -550,50 +346,40 @@ class SonosPlayerProvider(PlayerProvider): output_codec=ContentType.FLAC, seek_position=seek_position, fade_in=fade_in, - flow_mode=False, ) sonos_player = self.sonosplayers[player_id] mass_player = self.mass.players.get(player_id) - if not sonos_player.soco_device.is_coordinator: + if sonos_player.sync_coordinator: # this should be already handled by the player manager, but just in case... raise PlayerCommandFailed( f"Player {mass_player.display_name} can not " "accept play_media command, it is synced to another player." ) metadata = create_didl_metadata(self.mass, url, queue_item) - await asyncio.to_thread(sonos_player.soco_device.play_uri, url, meta=metadata) - # optimistically set this timestamp to help figure out elapsed time later - now = time.time() - sonos_player.playback_started = now - mass_player.elapsed_time = 0 - mass_player.elapsed_time_last_updated = now + self.mass.create_task(sonos_player.soco.play_uri, url, meta=metadata) async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: """Handle PLAY STREAM on given player. This is a special feature from the Universal Group provider. """ - url = stream_job.resolve_stream_url(player_id, ContentType.MP3) + url = stream_job.resolve_stream_url(player_id, ContentType.FLAC) sonos_player = self.sonosplayers[player_id] mass_player = self.mass.players.get(player_id) - if not sonos_player.soco_device.is_coordinator: + if sonos_player.sync_coordinator: # this should be already handled by the player manager, but just in case... raise PlayerCommandFailed( f"Player {mass_player.display_name} can not " "accept play_stream command, it is synced to another player." ) metadata = create_didl_metadata(self.mass, url, None) - await asyncio.to_thread(sonos_player.soco_device.play_uri, url, meta=metadata) - # add a special 'command' item to the sonos queue - # this allows for on-player next buttons/commands to still work - await self._enqueue_item( - sonos_player, self.mass.streams.get_command_url(player_id, "next"), None - ) + # sonos players do not like our multi client stream + # add to the workaround players list + self.mass.streams.workaround_players.add(player_id) + await self.mass.create_task(sonos_player.soco.play_uri, url, meta=metadata) # optimistically set this timestamp to help figure out elapsed time later - now = time.time() - sonos_player.playback_started = now mass_player.elapsed_time = 0 - mass_player.elapsed_time_last_updated = now + mass_player.elapsed_time_last_updated = time.time() async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): """ @@ -617,11 +403,11 @@ class SonosPlayerProvider(PlayerProvider): ) # set crossfade according to player setting crossfade = await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE) - if sonos_player.soco_device.cross_fade != crossfade: + if sonos_player.soco.cross_fade != crossfade: def set_crossfade(): with suppress(Exception): - sonos_player.soco_device.cross_fade = crossfade + sonos_player.soco.cross_fade = crossfade await asyncio.to_thread(set_crossfade) @@ -650,7 +436,7 @@ class SonosPlayerProvider(PlayerProvider): # based on when we last received info from the device await sonos_player.check_poll() # always update the attributes - await self.update_player(sonos_player, signal_update=False) + sonos_player.update_player(signal_update=False) except ConnectionResetError as err: raise PlayerUnavailableError from err @@ -658,26 +444,29 @@ class SonosPlayerProvider(PlayerProvider): """Discover Sonos players on the network.""" if self._discovery_running: return - try: - self._discovery_running = True - self.logger.debug("Sonos discovery started...") - discovered_devices: set[soco.SoCo] = await asyncio.to_thread( - soco.discover, allow_network_scan=self.config.get_value(CONF_NETWORK_SCAN) - ) - if discovered_devices is None: - discovered_devices = set() - # process new players - for device in discovered_devices: - if (existing := self.mass.players.get(device.uid)) and existing.available: - continue - try: - await self._device_discovered(device) - except Exception as err: - self.logger.exception(str(err), exc_info=err) + allow_network_scan = self.config.get_value(CONF_NETWORK_SCAN) - finally: - self._discovery_running = False + def do_discover(): + """Run discovery and add players in executor thread.""" + self._discovery_running = True + try: + self.logger.debug("Sonos discovery started...") + discovered_devices: set[SoCo] = discover(allow_network_scan=allow_network_scan) + if discovered_devices is None: + discovered_devices = set() + # process new players + for soco in discovered_devices: + try: + self._add_player(soco) + except (OSError, SoCoException, Timeout) as err: + self.logger.warning( + "Failed to add SonosPlayer using %s: %s", soco, err, exc_info=err + ) + finally: + self._discovery_running = False + + await self.mass.create_task(do_discover) def reschedule(): self._discovery_reschedule_timer = None @@ -686,50 +475,48 @@ class SonosPlayerProvider(PlayerProvider): # reschedule self once finished self._discovery_reschedule_timer = self.mass.loop.call_later(300, reschedule) - async def _device_discovered(self, soco_device: soco.SoCo) -> None: - """Handle discovered Sonos player.""" - player_id = soco_device.uid + def _add_player(self, soco: SoCo) -> None: + """Add discovered Sonos player.""" + player_id = soco.uid + if player_id in self.sonosplayers: + return # already added + if not soco.is_visible: + return enabled = self.mass.config.get(f"{CONF_PLAYERS}/{player_id}/enabled", True) if not enabled: self.logger.debug("Ignoring disabled player: %s", player_id) return - if soco_device not in soco_device.visible_zones: + if soco not in soco.visible_zones: return - if not (sonos_player := self.sonosplayers.get(player_id)): - self.sonosplayers[player_id] = sonos_player = SonosPlayer( - self, - soco_device, - ) - - if not (mass_player := self.mass.players.get(player_id)): - mass_player = Player( - player_id=soco_device.uid, + speaker_info = soco.get_speaker_info(True, timeout=7) + if soco.uid not in self.boot_counts: + self.boot_counts[soco.uid] = soco.boot_seqnum + self.logger.debug("Adding new player: %s", speaker_info) + support_hires = speaker_info["model_name"] in HIRES_MODELS + self.sonosplayers[player_id] = sonos_player = SonosPlayer( + self, + soco=soco, + mass_player=Player( + player_id=soco.uid, provider=self.domain, type=PlayerType.PLAYER, - name=soco_device.player_name, + name=soco.player_name, available=True, powered=False, supported_features=PLAYER_FEATURES, - device_info=DeviceInfo(), - max_sample_rate=44100, - supports_24bit=False, - ) - - await sonos_player.reconnect(soco_device) - - if sonos_player.speaker_info["model_name"] in HIRES_MODELS: - mass_player.max_sample_rate = 48000 - mass_player.supports_24bit = True - - mass_player.device_info = DeviceInfo( - model=sonos_player.speaker_info["model_name"], - address=sonos_player.soco_device.ip_address, - manufacturer="SONOS", + device_info=DeviceInfo( + model=speaker_info["model_name"], + address=soco.ip_address, + manufacturer="SONOS", + ), + max_sample_rate=48000 if support_hires else 44100, + supports_24bit=support_hires, + ), ) - - self.mass.players.register_or_update(mass_player) + sonos_player.setup() + self.mass.loop.call_soon_threadsafe(self.mass.players.register, sonos_player.mass_player) async def _enqueue_item( self, @@ -740,72 +527,12 @@ class SonosPlayerProvider(PlayerProvider): """Enqueue a queue item to the Sonos player Queue.""" metadata = create_didl_metadata(self.mass, url, queue_item) await asyncio.to_thread( - sonos_player.soco_device.avTransport.SetNextAVTransportURI, + sonos_player.soco.avTransport.SetNextAVTransportURI, [("InstanceID", 0), ("NextURI", url), ("NextURIMetaData", metadata)], timeout=60, ) self.logger.debug( "Enqued next track (%s) to player %s", queue_item.name if queue_item else url, - sonos_player.soco_device.player_name, - ) - - async def update_player(self, sonos_player: SonosPlayer, signal_update: bool = True) -> None: - """Update Sonos Player.""" - mass_player = self.mass.players.get(sonos_player.player_id) - prev_url = mass_player.current_item_id - prev_state = mass_player.state - sonos_player.update_attributes() - mass_player.can_sync_with = tuple( - x for x in self.sonosplayers if x != sonos_player.player_id + sonos_player.soco.player_name, ) - current_url = mass_player.current_item_id - current_state = mass_player.state - - if (prev_url != current_url) or (prev_state != current_state): - # fetch track details on state or url change - await asyncio.to_thread( - sonos_player.update_info, - update_track_info=True, - ) - sonos_player.update_attributes() - - if signal_update: - # send update to the player manager right away only if we are triggered from an event - # when we're just updating from a manual poll, the player manager - # will detect changes to the player object itself - self.mass.players.update(mass_player.player_id) - - -def _convert_state(sonos_state: str) -> PlayerState: - """Convert Sonos state to PlayerState.""" - if sonos_state == "PLAYING": - return PlayerState.PLAYING - if sonos_state == "TRANSITIONING": - return PlayerState.PLAYING - if sonos_state == "PAUSED_PLAYBACK": - return PlayerState.PAUSED - return PlayerState.IDLE - - -def _timespan_secs(timespan): - """Parse a time-span into number of seconds.""" - if timespan in ("", "NOT_IMPLEMENTED", None): - return None - return sum(60 ** x[0] * int(x[1]) for x in enumerate(reversed(timespan.split(":")))) - - -class ProcessSonosEventQueue: - """Queue like object for dispatching sonos events.""" - - def __init__( - self, - callback_handler: callable[[dict], None], - ) -> None: - """Initialize Sonos event queue.""" - self._callback_handler = callback_handler - - def put(self, info: Any, block=True, timeout=None) -> None: # noqa: ARG002 - """Process event.""" - # noqa: ARG001 - self._callback_handler(info) diff --git a/music_assistant/server/providers/sonos/helpers.py b/music_assistant/server/providers/sonos/helpers.py new file mode 100644 index 00000000..e2895835 --- /dev/null +++ b/music_assistant/server/providers/sonos/helpers.py @@ -0,0 +1,111 @@ +"""Helper methods for common tasks.""" + +from __future__ import annotations + +import logging +from collections.abc import Callable +from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, overload + +from requests.exceptions import Timeout +from soco import SoCo +from soco.exceptions import SoCoException, SoCoUPnPException + +from music_assistant.common.models.errors import PlayerCommandFailed + +if TYPE_CHECKING: + from . import SonosPlayer + from .media import SonosMedia + +UID_PREFIX = "RINCON_" +UID_POSTFIX = "01400" +SONOS_SPEAKER_ACTIVITY = "sonos_speaker_activity" + +_LOGGER = logging.getLogger(__name__) + +_T = TypeVar("_T", bound="SonosPlayer | SonosMedia") +_R = TypeVar("_R") +_P = ParamSpec("_P") + +_FuncType = Callable[Concatenate[_T, _P], _R] +_ReturnFuncType = Callable[Concatenate[_T, _P], _R | None] + + +@overload +def soco_error( + errorcodes: None = ..., +) -> Callable[[_FuncType[_T, _P, _R]], _FuncType[_T, _P, _R]]: ... + + +@overload +def soco_error( + errorcodes: list[str], +) -> Callable[[_FuncType[_T, _P, _R]], _ReturnFuncType[_T, _P, _R]]: ... + + +def soco_error( + errorcodes: list[str] | None = None, +) -> Callable[[_FuncType[_T, _P, _R]], _ReturnFuncType[_T, _P, _R]]: + """Filter out specified UPnP errors and raise exceptions for service calls.""" + + def decorator(funct: _FuncType[_T, _P, _R]) -> _ReturnFuncType[_T, _P, _R]: + """Decorate functions.""" + + def wrapper(self: _T, *args: _P.args, **kwargs: _P.kwargs) -> _R | None: + """Wrap for all soco UPnP exception.""" + args_soco = next((arg for arg in args if isinstance(arg, SoCo)), None) + try: + result = funct(self, *args, **kwargs) + except (OSError, SoCoException, SoCoUPnPException, Timeout) as err: + error_code = getattr(err, "error_code", None) + function = funct.__qualname__ + if errorcodes and error_code in errorcodes: + _LOGGER.debug("Error code %s ignored in call to %s", error_code, function) + return None + + if (target := _find_target_identifier(self, args_soco)) is None: + raise RuntimeError("Unexpected use of soco_error") from err + + message = f"Error calling {function} on {target}: {err}" + raise PlayerCommandFailed(message) from err + + return result + + return wrapper + + return decorator + + +def _find_target_identifier(instance: Any, fallback_soco: SoCo | None) -> str | None: + """Extract the best available target identifier from the provided instance object.""" + if entity_id := getattr(instance, "entity_id", None): + # SonosEntity instance + return entity_id + if zone_name := getattr(instance, "zone_name", None): + # SonosSpeaker instance + return zone_name + if speaker := getattr(instance, "speaker", None): + # Holds a SonosSpeaker instance attribute + return speaker.zone_name + if soco := getattr(instance, "soco", fallback_soco): + # Holds a SoCo instance attribute + # Only use attributes with no I/O + return soco._player_name or soco.ip_address # pylint: disable=protected-access + return None + + +def hostname_to_uid(hostname: str) -> str: + """Convert a Sonos hostname to a uid.""" + if hostname.startswith("Sonos-"): + baseuid = hostname.removeprefix("Sonos-").replace(".local.", "") + elif hostname.startswith("sonos"): + baseuid = hostname.removeprefix("sonos").replace(".local.", "") + else: + raise ValueError(f"{hostname} is not a sonos device.") + return f"{UID_PREFIX}{baseuid}{UID_POSTFIX}" + + +def sync_get_visible_zones(soco: SoCo) -> set[SoCo]: + """Ensure I/O attributes are cached and return visible zones.""" + _ = soco.household_id + _ = soco.uid + return soco.visible_zones diff --git a/music_assistant/server/providers/sonos/manifest.json b/music_assistant/server/providers/sonos/manifest.json index 0e3ce826..493c7ea9 100644 --- a/music_assistant/server/providers/sonos/manifest.json +++ b/music_assistant/server/providers/sonos/manifest.json @@ -3,8 +3,14 @@ "domain": "sonos", "name": "SONOS", "description": "SONOS Playerprovider for Music Assistant.", - "codeowners": ["@music-assistant"], - "requirements": ["soco==0.30.2"], + "codeowners": [ + "@music-assistant" + ], + "requirements": [ + "soco==0.30.2", + "sonos-websocket==0.1.3", + "defusedxml==0.7.1" + ], "documentation": "https://github.com/orgs/music-assistant/discussions/1171", "multi_instance": false, "builtin": false, diff --git a/music_assistant/server/providers/sonos/player.py b/music_assistant/server/providers/sonos/player.py new file mode 100644 index 00000000..dba6733c --- /dev/null +++ b/music_assistant/server/providers/sonos/player.py @@ -0,0 +1,828 @@ +""" +Sonos Player provider for Music Assistant: SonosPlayer object/model. + +Note that large parts of this code are copied over from the Home Assistant +integration for Sonos. +""" + +from __future__ import annotations + +import asyncio +import contextlib +import datetime +import logging +import time +from collections.abc import Callable, Coroutine +from typing import TYPE_CHECKING, Any + +import defusedxml.ElementTree as ET # noqa: N817 +from soco import SoCoException +from soco.core import ( + MUSIC_SRC_AIRPLAY, + MUSIC_SRC_LINE_IN, + MUSIC_SRC_RADIO, + MUSIC_SRC_SPOTIFY_CONNECT, + MUSIC_SRC_TV, + SoCo, +) +from soco.data_structures import DidlAudioBroadcast, DidlPlaylistContainer +from soco.events_base import Event as SonosEvent +from soco.events_base import SubscriptionBase +from sonos_websocket import SonosWebsocket + +from music_assistant.common.helpers.datetime import utc +from music_assistant.common.models.enums import PlayerFeature, PlayerState +from music_assistant.common.models.errors import PlayerCommandFailed +from music_assistant.common.models.player import Player +from music_assistant.server.providers.sonos.helpers import soco_error + +if TYPE_CHECKING: + from . import SonosPlayerProvider + +CALLBACK_TYPE = Callable[[], None] +LOGGER = logging.getLogger(__name__) + +PLAYER_FEATURES = ( + PlayerFeature.SYNC, + PlayerFeature.VOLUME_MUTE, + PlayerFeature.VOLUME_SET, + PlayerFeature.ENQUEUE_NEXT, +) +DURATION_SECONDS = "duration_in_s" +POSITION_SECONDS = "position_in_s" +SUBSCRIPTION_TIMEOUT = 1200 +ZGS_SUBSCRIPTION_TIMEOUT = 2 +AVAILABILITY_CHECK_INTERVAL = datetime.timedelta(minutes=1) +AVAILABILITY_TIMEOUT = AVAILABILITY_CHECK_INTERVAL.total_seconds() * 4.5 +SONOS_STATE_PLAYING = "PLAYING" +SONOS_STATE_TRANSITIONING = "TRANSITIONING" +NEVER_TIME = -1200.0 +RESUB_COOLDOWN_SECONDS = 10.0 +SUBSCRIPTION_SERVICES = { + # "alarmClock", + "avTransport", + # "contentDirectory", + "deviceProperties", + "renderingControl", + "zoneGroupTopology", +} +SUPPORTED_VANISH_REASONS = ("powered off", "sleeping", "switch to bluetooth", "upgrade") +UNUSED_DEVICE_KEYS = ["SPID", "TargetRoomName"] +LINEIN_SOURCES = (MUSIC_SRC_TV, MUSIC_SRC_LINE_IN) +SOURCE_AIRPLAY = "AirPlay" +SOURCE_LINEIN = "Line-in" +SOURCE_SPOTIFY_CONNECT = "Spotify Connect" +SOURCE_TV = "TV" +SOURCE_MAPPING = { + MUSIC_SRC_AIRPLAY: SOURCE_AIRPLAY, + MUSIC_SRC_TV: SOURCE_TV, + MUSIC_SRC_LINE_IN: SOURCE_LINEIN, + MUSIC_SRC_SPOTIFY_CONNECT: SOURCE_SPOTIFY_CONNECT, +} + +HIRES_MODELS = ( + "Sonos Roam", + "Sonos Arc", + "Sonos Beam", + "Sonos Five", + "Sonos Move", + "Sonos One SL", + "Sonos Port", + "Sonos Amp", + "SYMFONISK Bookshelf", + "SYMFONISK Table Lamp", + "Sonos Era 100", + "Sonos Era 300", +) + + +class SonosSubscriptionsFailed(PlayerCommandFailed): + """Subscription creation failed.""" + + +class SonosUpdateError(PlayerCommandFailed): + """Update failed.""" + + +class SonosPlayer: + """Wrapper around Sonos/SoCo with some additional attributes.""" + + def __init__( + self, + sonos_prov: SonosPlayerProvider, + soco: SoCo, + mass_player: Player, + ) -> None: + """Initialize SonosPlayer instance.""" + self.sonos_prov = sonos_prov + self.mass = sonos_prov.mass + self.player_id = soco.uid + self.soco = soco + self.logger = sonos_prov.logger.getChild(soco.uid) + self.household_id: str = soco.household_id + self.subscriptions: list[SubscriptionBase] = [] + self.websocket: SonosWebsocket | None = None + self.mass_player: Player = mass_player + self.available: bool = True + # cached attributes + self.play_mode: str | None = None + self.playback_status: str | None = None + self.channel: str | None = None + self.duration: float | None = None + self.image_url: str | None = None + self.source_name: str | None = None + self.title: str | None = None + self.uri: str | None = None + self.position: int | None = None + self.position_updated_at: datetime.datetime | None = None + # Subscriptions and events + self._subscriptions: list[SubscriptionBase] = [] + self._subscription_lock: asyncio.Lock | None = None + self._last_activity: float = NEVER_TIME + self._resub_cooldown_expires_at: float | None = None + self._needs_poll: bool = False + # Grouping + self.sync_coordinator: SonosPlayer | None = None + self.group_members: list[SonosPlayer] = [self] + self.group_members_ids: list[str] = [] + self._group_members_missing: set[str] = set() + + def __hash__(self) -> int: + """Return a hash of self.""" + return hash(self.player_id) + + @property + def zone_name(self) -> str: + """Return zone name.""" + if self.mass_player: + return self.mass_player.display_name + return self.soco.speaker_info["zone_name"] + + @property + def subscription_address(self) -> str: + """Return the current subscription callback address.""" + assert len(self._subscriptions) > 0 + addr, port = self._subscriptions[0].event_listener.address + return ":".join([addr, str(port)]) + + @property + def missing_subscriptions(self) -> set[str]: + """Return a list of missing service subscriptions.""" + subscribed_services = {sub.service.service_type for sub in self._subscriptions} + return SUBSCRIPTION_SERVICES - subscribed_services + + def setup(self) -> None: + """Run initial setup of the speaker (NOT async friendly).""" + # update volume + self.mass_player.volume_level = self.soco.volume + self.mass_player.volume_muted = self.soco.mute + self.update_groups() + if not self.sync_coordinator: + self.poll_media() + + async def do_async_setup() -> None: + """Complete setup in async context.""" + self.websocket = SonosWebsocket( + self.soco.ip_address, + player_id=self.soco.uid, + session=self.mass.http_session, + ) + + future = asyncio.run_coroutine_threadsafe(do_async_setup(), self.mass.loop) + future.result(timeout=10) + asyncio.run_coroutine_threadsafe(self.subscribe(), self.mass.loop) + + async def offline(self) -> None: + """Handle removal of speaker when unavailable.""" + if not self.available: + return + + if self._resub_cooldown_expires_at is None and not self.mass.closing: + self._resub_cooldown_expires_at = time.monotonic() + RESUB_COOLDOWN_SECONDS + self.logger.debug("Starting resubscription cooldown for %s", self.zone_name) + + self.available = False + self.mass.players.update(self.player_id) + self._share_link_plugin = None + + if self._poll_timer: + self._poll_timer() + self._poll_timer = None + + await self.unsubscribe() + self.sonos_prov.discovery_known.discard(self.soco.uid) + + def log_subscription_result(self, result: Any, event: str, level: int = logging.DEBUG) -> None: + """Log a message if a subscription action (create/renew/stop) results in an exception.""" + if not isinstance(result, Exception): + return + + if isinstance(result, asyncio.exceptions.TimeoutError): + message = "Request timed out" + exc_info = None + else: + message = str(result) + exc_info = result if not str(result) else None + + self.logger.log( + level, + "%s failed for %s: %s", + event, + self.zone_name, + message, + exc_info=exc_info, + ) + + async def subscribe(self) -> None: + """Initiate event subscriptions under an async lock.""" + if not self._subscription_lock: + self._subscription_lock = asyncio.Lock() + + async with self._subscription_lock: + try: + # Create event subscriptions. + subscriptions = [ + self._subscribe_target(getattr(self.soco, service), self._handle_event) + for service in self.missing_subscriptions + ] + if not subscriptions: + return + self.logger.debug("Creating subscriptions for %s", self.zone_name) + results = await asyncio.gather(*subscriptions, return_exceptions=True) + for result in results: + self.log_subscription_result(result, "Creating subscription", logging.WARNING) + if any(isinstance(result, Exception) for result in results): + raise SonosSubscriptionsFailed + except SonosSubscriptionsFailed: + self.logger.warning("Creating subscriptions failed for %s", self.zone_name) + assert self._subscription_lock is not None + async with self._subscription_lock: + await self.offline() + + async def unsubscribe(self) -> None: + """Cancel all subscriptions.""" + if not self._subscriptions: + return + self.logger.debug("Unsubscribing from events for %s", self.zone_name) + results = await asyncio.gather( + *(subscription.unsubscribe() for subscription in self._subscriptions), + return_exceptions=True, + ) + for result in results: + self.log_subscription_result(result, "Unsubscribe") + self._subscriptions = [] + + async def check_poll(self) -> None: + """Validate availability of the speaker based on recent activity.""" + if not (self._needs_poll or (time.monotonic() - self._last_activity) > 600): + return + try: + await self.mass.create_task(self.ping) + except SonosUpdateError: + self.logger.warning( + "No recent activity and cannot reach %s, marking unavailable", + self.zone_name, + ) + self._needs_poll = True + await self.offline() + + @soco_error() + def ping(self) -> None: + """Test device availability. Failure will raise SonosUpdateError.""" + self.soco.renderingControl.GetVolume([("InstanceID", 0), ("Channel", "Master")], timeout=1) + + async def join( + self, + members: list[SonosPlayer], + ) -> None: + """Sync given players/speakers with this player.""" + async with self.sonos_prov.topology_condition: + group: list[SonosPlayer] = await self.mass.create_task(self._join, members) + await self.wait_for_groups([group]) + + async def unjoin(self) -> None: + """Unjoin player from all/any groups.""" + async with self.sonos_prov.topology_condition: + await self.mass.create_task(self._unjoin) + await self.wait_for_groups([[self]]) + + def update_player(self, signal_update: bool = True) -> None: + """Update Sonos Player.""" + self._update_attributes() + if signal_update: + # send update to the player manager right away only if we are triggered from an event + # when we're just updating from a manual poll, the player manager + # will detect changes to the player object itself + self.sonos_prov.mass.players.update(self.player_id) + + @soco_error() + def poll_track_info(self) -> dict[str, Any]: + """Poll the speaker for current track info. + + Add converted position values (NOT async fiendly). + """ + track_info: dict[str, Any] = self.soco.get_current_track_info() + track_info[DURATION_SECONDS] = _timespan_secs(track_info.get("duration")) + track_info[POSITION_SECONDS] = _timespan_secs(track_info.get("position")) + return track_info + + @soco_error() + def poll_media(self) -> None: + """Poll information about currently playing media.""" + transport_info = self.soco.get_current_transport_info() + new_status = transport_info["current_transport_state"] + + if new_status == SONOS_STATE_TRANSITIONING: + return + + update_position = new_status != self.playback_status + self.playback_status = new_status + self.play_mode = self.soco.play_mode + self._set_basic_track_info(update_position=update_position) + self.update_player() + + async def _subscribe_target(self, target: SubscriptionBase, sub_callback: Callable) -> None: + """Create a Sonos subscription for given target.""" + subscription = await target.subscribe( + auto_renew=True, requested_timeout=SUBSCRIPTION_TIMEOUT + ) + + def on_renew_failed(exception: Exception) -> None: + """Handle a failed subscription renewal callback.""" + self.mass.create_task(self._renew_failed(exception)) + + subscription.callback = sub_callback + subscription.auto_renew_fail = on_renew_failed + self._subscriptions.append(subscription) + + async def _renew_failed(self, exception: Exception) -> None: + """Mark the speaker as offline after a subscription renewal failure. + + This is to reset the state to allow a future clean subscription attempt. + """ + if not self.available: + return + + self.log_subscription_result(exception, "Subscription renewal", logging.WARNING) + await self.offline() + + def _handle_event(self, event: SonosEvent) -> None: + """Handle SonosEvent callback.""" + if self._needs_poll: + self.logger.debug("Received event, cancelling poll timer for %s", self.zone_name) + self._needs_poll = False + + service_type: str = event.service.service_type + self._speaker_activity(f"{service_type} subscription") + + if service_type == "DeviceProperties": + self.update_player() + return + if service_type == "AVTransport": + self._handle_avtransport_event(event) + return + if service_type == "RenderingControl": + self._handle_rendering_control_event(event) + return + if service_type == "ZoneGroupTopology": + self._handle_zone_group_topology_event(event) + return + + def _handle_avtransport_event(self, event: SonosEvent) -> None: + """Update information about currently playing media from an event.""" + # NOTE: The new coordinator can be provided in a media update event but + # before the ZoneGroupState updates. If this happens the playback + # state will be incorrect and should be ignored. Switching to the + # new coordinator will use its media. The regrouping process will + # be completed during the next ZoneGroupState update. + av_transport_uri = event.variables.get("av_transport_uri", "") + current_track_uri = event.variables.get("current_track_uri", "") + if av_transport_uri == current_track_uri and av_transport_uri.startswith("x-rincon:"): + new_coordinator_uid = av_transport_uri.split(":")[-1] + if new_coordinator_speaker := self.sonos_prov.sonosplayers.get(new_coordinator_uid): + self.logger.debug( + "Media update coordinator (%s) received for %s", + new_coordinator_speaker.zone_name, + self.zone_name, + ) + self.sync_coordinator = new_coordinator_speaker + else: + self.logger.debug( + "Media update coordinator (%s) for %s not yet available", + new_coordinator_uid, + self.zone_name, + ) + return + + if crossfade := event.variables.get("current_crossfade_mode"): + self.logger.debug("crossfade changed to %s", crossfade) + + # Missing transport_state indicates a transient error + if (new_status := event.variables.get("transport_state")) is None: + return + + # Ignore transitions, we should get the target state soon + if new_status == SONOS_STATE_TRANSITIONING: + return + + evars = event.variables + new_status = evars["transport_state"] + state_changed = new_status != self.playback_status + + self.play_mode = evars["current_play_mode"] + self.playback_status = new_status + + track_uri = evars["enqueued_transport_uri"] or evars["current_track_uri"] + audio_source = self.soco.music_source_from_uri(track_uri) + + self._set_basic_track_info(update_position=state_changed) + + if (ct_md := evars["current_track_meta_data"]) and not self.image_url: # noqa: SIM102 + if album_art_uri := getattr(ct_md, "album_art_uri", None): + # TODO: handle library mess here + self.image_url = album_art_uri + + et_uri_md = evars["enqueued_transport_uri_meta_data"] + if isinstance(et_uri_md, DidlPlaylistContainer): + self.playlist_name = et_uri_md.title + + if queue_size := evars.get("number_of_tracks", 0): + self.queue_size = int(queue_size) + + if audio_source == MUSIC_SRC_RADIO: + if et_uri_md: + self.channel = et_uri_md.title + + # Extra guards for S1 compatibility + if ct_md and hasattr(ct_md, "radio_show") and ct_md.radio_show: + radio_show = ct_md.radio_show.split(",")[0] + self.channel = " • ".join(filter(None, [self.channel, radio_show])) + + if isinstance(et_uri_md, DidlAudioBroadcast): + self.title = self.title or self.channel + + self.update_player() + + def _handle_rendering_control_event(self, event: SonosEvent) -> None: + """Update information about currently volume settings.""" + variables = event.variables + + if "volume" in variables: + volume = variables["volume"] + self.mass_player.volume_level = int(volume["Master"]) + + if mute := variables.get("mute"): + self.mass_player.volume_muted = mute["Master"] == "1" + + if loudness := variables.get("loudness"): + # TODO: handle this is a better way + self.mass.config.set_raw_player_config_value( + self.player_id, "sonos_loudness", loudness["Master"] == "1" + ) + + for int_var in ( + "bass", + "treble", + ): + if int_var in variables: + # TODO: handle this is a better way + self.mass.config.set_raw_player_config_value( + self.player_id, f"sonos_{int_var}", variables[int_var] + ) + + self.update_player() + + def _handle_zone_group_topology_event(self, event: SonosEvent) -> None: + """Handle callback for topology change event.""" + if xml := event.variables.get("zone_group_state"): + zgs = ET.fromstring(xml) + for vanished_device in zgs.find("VanishedDevices") or []: + if (reason := vanished_device.get("Reason")) not in SUPPORTED_VANISH_REASONS: + self.logger.debug( + "Ignoring %s marked %s as vanished with reason: %s", + self.zone_name, + vanished_device.get("ZoneName"), + reason, + ) + continue + self.mass.create_task(self._vanished(reason)) + + if "zone_player_uui_ds_in_group" not in event.variables: + return + asyncio.run_coroutine_threadsafe(self.create_update_groups_coro(event), self.mass.loop) + + async def _vanished(self, reason: str) -> None: + """Handle removal of speaker when marked as vanished.""" + if not self.available: + return + self.logger.debug("%s has vanished (%s), marking unavailable", self.zone_name, reason) + await self.offline() + + async def _rebooted(self) -> None: + """Handle a detected speaker reboot.""" + self.logger.debug("%s rebooted, reconnecting", self.zone_name) + await self.offline() + self._speaker_activity("reboot") + + def update_groups(self) -> None: + """Update group topology when polling.""" + asyncio.run_coroutine_threadsafe(self.create_update_groups_coro(), self.mass.loop) + + def update_group_for_uid(self, uid: str) -> None: + """Update group topology if uid is missing.""" + if uid not in self._group_members_missing: + return + missing_zone = self.sonos_prov.sonosplayers[uid].zone_name + self.logger.debug("%s was missing, adding to %s group", missing_zone, self.zone_name) + self.update_groups() + + def create_update_groups_coro(self, event: SonosEvent | None = None) -> Coroutine: + """Handle callback for topology change event.""" + + def _get_soco_group() -> list[str]: + """Ask SoCo cache for existing topology.""" + coordinator_uid = self.soco.uid + joined_uids = [] + with contextlib.suppress(OSError, SoCoException): + if self.soco.group and self.soco.group.coordinator: + coordinator_uid = self.soco.group.coordinator.uid + joined_uids = [ + p.uid + for p in self.soco.group.members + if p.uid != coordinator_uid and p.is_visible + ] + + return [coordinator_uid] + joined_uids + + async def _extract_group(event: SonosEvent | None) -> list[str]: + """Extract group layout from a topology event.""" + group = event and event.zone_player_uui_ds_in_group + if group: + assert isinstance(group, str) + return group.split(",") + return await self.mass.create_task(_get_soco_group) + + def _regroup(group: list[str]) -> None: + """Rebuild internal group layout (async safe).""" + if group == [self.soco.uid] and self.group_members == [self] and self.group_members_ids: + # Skip updating existing single speakers in polling mode + return + + group_members = [] + group_members_ids = [] + + for uid in group: + speaker = self.sonos_prov.sonosplayers.get(uid) + if speaker: + self._group_members_missing.discard(uid) + group_members.append(speaker) + group_members_ids.append(uid) + else: + self._group_members_missing.add(uid) + self.logger.debug( + "%s group member unavailable (%s), will try again", + self.zone_name, + uid, + ) + return + + if self.group_members_ids == group_members_ids: + # Useful in polling mode for speakers with stereo pairs or surrounds + # as those "invisible" speakers will bypass the single speaker check + return + + self.sync_coordinator = None + self.group_members = group_members + self.group_members_ids = group_members_ids + self.mass.players.update(self.player_id) + + for joined_uid in group[1:]: + joined_speaker: SonosPlayer = self.sonos_prov.sonosplayers.get(joined_uid) + if joined_speaker: + joined_speaker.sync_coordinator = self + joined_speaker.group_members = group_members + joined_speaker.group_members_ids = group_members_ids + joined_speaker.update_player() + + self.logger.debug("Regrouped %s: %s", self.zone_name, self.group_members_ids) + self.update_player() + + async def _handle_group_event(event: SonosEvent | None) -> None: + """Get async lock and handle event.""" + async with self.sonos_prov.topology_condition: + group = await _extract_group(event) + if self.soco.uid == group[0]: + _regroup(group) + self.sonos_prov.topology_condition.notify_all() + + return _handle_group_event(event) + + async def wait_for_groups(self, groups: list[list[SonosPlayer]]) -> None: + """Wait until all groups are present, or timeout.""" + + def _test_groups(groups: list[list[SonosPlayer]]) -> bool: + """Return whether all groups exist now.""" + for group in groups: + coordinator = group[0] + + # Test that coordinator is coordinating + current_group = coordinator.group_members + if coordinator != current_group[0]: + return False + + # Test that joined members match + if set(group[1:]) != set(current_group[1:]): + return False + + return True + + try: + async with asyncio.timeout(5): + while not _test_groups(groups): + await self.sonos_prov.topology_condition.wait() + except asyncio.TimeoutError: + self.logger.warning("Timeout waiting for target groups %s", groups) + + any_speaker = next(iter(self.sonos_prov.sonosplayers.values())) + any_speaker.soco.zone_group_state.clear_cache() + + def _update_attributes(self): + """Update attributes of the MA Player from SoCo state.""" + # generic attributes (player_info) + self.mass_player.available = self.available + + # transport info (playback state) + self.mass_player.state = current_state = _convert_state(self.playback_status) + + # power 'on' player if we detect its playing + if not self.mass_player.powered and ( + current_state == PlayerState.PLAYING + or ( + self.sync_coordinator + and self.sync_coordinator.mass_player.state == PlayerState.PLAYING + ) + ): + self.mass_player.powered = True + + # media info (track info) + self.mass_player.current_item_id = self.uri + if self.uri and self.player_id in self.uri: + self.mass_player.active_source = self.player_id + else: + self.mass_player.active_source = self.source_name + if self.position is not None and self.position_updated_at is not None: + self.mass_player.elapsed_time = self.position + self.mass_player.elapsed_time_last_updated = self.position_updated_at.timestamp() + + # zone topology (syncing/grouping) details + self.mass_player.can_sync_with = tuple( + x.player_id for x in self.sonos_prov.sonosplayers.values() if x.sync_coordinator is None + ) + if self.sync_coordinator: + # player is syned to another player + self.mass_player.synced_to = self.sync_coordinator.player_id + self.mass_player.group_childs = set() + self.mass_player.active_source = self.sync_coordinator.mass_player.active_source + elif len(self.group_members_ids) > 1: + # this player is the sync leader in a group + self.mass_player.synced_to = None + self.mass_player.group_childs = set(self.group_members_ids) + else: + # standalone player, not synced + self.mass_player.synced_to = None + self.mass_player.group_childs = set() + + def _set_basic_track_info(self, update_position: bool = False) -> None: + """Query the speaker to update media metadata and position info.""" + self.channel = None + self.duration = None + self.image_url = None + self.source_name = None + self.title = None + self.uri = None + + track_info = self.poll_track_info() + if not track_info["uri"]: + return + self.uri = track_info["uri"] + + audio_source = self.soco.music_source_from_uri(self.uri) + if source := SOURCE_MAPPING.get(audio_source): + self.source_name = source + if audio_source in LINEIN_SOURCES: + self.position = None + self.position_updated_at = None + self.title = source + return + + self.artist = track_info.get("artist") + self.album_name = track_info.get("album") + self.title = track_info.get("title") + self.image_url = track_info.get("album_art") + + playlist_position = int(track_info.get("playlist_position", -1)) + if playlist_position > 0: + self.queue_position = playlist_position + + self._update_media_position(track_info, force_update=update_position) + + def _update_media_position( + self, position_info: dict[str, int], force_update: bool = False + ) -> None: + """Update state when playing music tracks.""" + duration = position_info.get(DURATION_SECONDS) + current_position = position_info.get(POSITION_SECONDS) + + if not (duration or current_position): + self.position = None + self.position_updated_at = None + return + + should_update = force_update + self.duration = duration + + # player started reporting position? + if current_position is not None and self.position is None: + should_update = True + + # position jumped? + if current_position is not None and self.position is not None: + if self.playback_status == SONOS_STATE_PLAYING: + assert self.position_updated_at is not None + time_delta = utc() - self.position_updated_at + time_diff = time_delta.total_seconds() + else: + time_diff = 0 + + calculated_position = self.position + time_diff + + if abs(calculated_position - current_position) > 1.5: + should_update = True + + if current_position is None: + self.position = None + self.position_updated_at = None + elif should_update: + self.position = current_position + self.position_updated_at = utc() + + def _speaker_activity(self, source: str) -> None: + """Track the last activity on this speaker, set availability and resubscribe.""" + if self._resub_cooldown_expires_at: + if time.monotonic() < self._resub_cooldown_expires_at: + self.logger.debug( + "Activity on %s from %s while in cooldown, ignoring", + self.zone_name, + source, + ) + return + self._resub_cooldown_expires_at = None + + self.logger.debug("Activity on %s from %s", self.zone_name, source) + self._last_activity = time.monotonic() + was_available = self.available + self.available = True + if not was_available: + self.mass.players.update(self.player_id) + self.mass.create_task(self.subscribe()) + + @soco_error() + def _join(self, members: list[SonosPlayer]) -> list[SonosPlayer]: + if self.sync_coordinator: + self.unjoin() + group = [self] + else: + group = self.group_members.copy() + + for player in members: + if player.soco.uid != self.soco.uid and player not in group: + player.soco.join(self.soco) + player.sync_coordinator = self + group.append(player) + + return group + + @soco_error() + def _unjoin(self) -> None: + if self.group_members == [self]: + return + self.soco.unjoin() + self.sync_coordinator = None + + +def _convert_state(sonos_state: str) -> PlayerState: + """Convert Sonos state to PlayerState.""" + if sonos_state == "PLAYING": + return PlayerState.PLAYING + if sonos_state == "TRANSITIONING": + return PlayerState.PLAYING + if sonos_state == "PAUSED_PLAYBACK": + return PlayerState.PAUSED + return PlayerState.IDLE + + +def _timespan_secs(timespan): + """Parse a time-span into number of seconds.""" + if timespan in ("", "NOT_IMPLEMENTED", None): + return None + return sum(60 ** x[0] * int(x[1]) for x in enumerate(reversed(timespan.split(":")))) diff --git a/music_assistant/server/providers/ugp/__init__.py b/music_assistant/server/providers/ugp/__init__.py index 5f2f3dda..0e1efdf0 100644 --- a/music_assistant/server/providers/ugp/__init__.py +++ b/music_assistant/server/providers/ugp/__init__.py @@ -185,7 +185,6 @@ class UniversalGroupProvider(PlayerProvider): if member is None: continue tg.create_task(player_prov.play_stream(member.player_id, stream_job)) - stream_job.start() async def poll_player(self, player_id: str) -> None: """Poll player for state updates.""" diff --git a/music_assistant/server/server.py b/music_assistant/server/server.py index a0b1b309..25479632 100644 --- a/music_assistant/server/server.py +++ b/music_assistant/server/server.py @@ -342,6 +342,21 @@ class MusicAssistant: task.add_done_callback(task_done_callback) return task + def call_later( + self, + delay: float, + target: Coroutine | Awaitable | Callable | asyncio.Future, + *args: Any, + task_id: str | None = None, + **kwargs: Any, + ) -> asyncio.Task | asyncio.Future: + """Run callable/awaitable after given delay.""" + + def _create_task(): + self.create_task(target, *args, task_id=task_id, **kwargs) + + self.loop.call_later(delay, _create_task) + def get_task(self, task_id: str) -> asyncio.Task | asyncio.Future: """Get existing scheduled task.""" if existing := self._tracked_tasks.get(task_id): diff --git a/requirements_all.txt b/requirements_all.txt index 9bb10bd6..d1eb847e 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -11,6 +11,7 @@ async-upnp-client==0.38.1 asyncio-throttle==1.0.2 colorlog==6.8.0 cryptography==41.0.7 +defusedxml==0.7.1 faust-cchardet>=2.1.18 git+https://github.com/MarvinSchenkel/pytube.git git+https://github.com/music-assistant/deezer-python-async@v0.1.2 @@ -30,6 +31,7 @@ radios==0.3.0 shortuuid==1.0.11 snapcast-mod==2.4.3 soco==0.30.2 +sonos-websocket==0.1.3 tidalapi==0.7.3 unidecode==1.3.8 uvloop==0.19.0