From: Marcel van der Veldt Date: Tue, 8 Sep 2020 06:15:33 +0000 (+0200) Subject: bring back sonos support X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=fc75ea16719e8aa8b25fa086ac4f8c0ed92d95af;p=music-assistant-server.git bring back sonos support --- diff --git a/music_assistant/providers/sonos/__init__.py b/music_assistant/providers/sonos/__init__.py new file mode 100644 index 00000000..f06b3f0a --- /dev/null +++ b/music_assistant/providers/sonos/__init__.py @@ -0,0 +1,9 @@ +"""Player provider for Sonos speakers.""" + +from .sonos import SonosProvider + + +async def async_setup(mass): + """Perform async setup of this Plugin/Provider.""" + prov = SonosProvider() + await mass.async_register_provider(prov) \ No newline at end of file diff --git a/music_assistant/providers/sonos/sonos.py b/music_assistant/providers/sonos/sonos.py new file mode 100644 index 00000000..15e936e7 --- /dev/null +++ b/music_assistant/providers/sonos/sonos.py @@ -0,0 +1,407 @@ +"""Player provider for Sonos speakers.""" + +import asyncio +import logging +import time +from typing import List + +import soco +from music_assistant.models.config_entry import ConfigEntry +from music_assistant.models.player import DeviceInfo, Player, PlayerFeature, PlayerState +from music_assistant.models.player_queue import QueueItem +from music_assistant.models.playerprovider import PlayerProvider +from music_assistant.utils import run_periodic + +PROV_ID = "sonos" +PROV_NAME = "Sonos" +LOGGER = logging.getLogger(PROV_ID) + +CONFIG_ENTRIES = [] # we don't have any provider config entries (for now) +PLAYER_FEATURES = [PlayerFeature.QUEUE, PlayerFeature.CROSSFADE, PlayerFeature.GAPLESS] +PLAYER_CONFIG_ENTRIES = [] # we don't have any player config entries (for now) + + +class SonosProvider(PlayerProvider): + """Support for Sonos speakers""" + + _discovery_running = False + _tasks = [] + _players = {} + _report_progress_tasks = [] + + @property + def id(self) -> str: + """Return provider ID for this provider.""" + return PROV_ID + + @property + def name(self) -> str: + """Return provider Name for this provider.""" + return PROV_NAME + + @property + def config_entries(self) -> List[ConfigEntry]: + """Return Config Entries for this provider.""" + return CONFIG_ENTRIES + + async def async_on_start(self) -> bool: + """Called on startup. Handle initialization of the provider.""" + self._tasks.append(self.mass.add_job(self.__async_periodic_discovery())) + + async def async_on_stop(self): + """Called on shutdown. Handle correct close/cleanup of the provider on exit.""" + for task in self._tasks: + task.cancel() + + async def async_cmd_play_uri(self, player_id: str, uri: str): + """ + Play the specified uri/url on the goven player. + :param player_id: player_id of the player to handle the command. + """ + player = self._players.get(player_id) + if player: + self.mass.add_job(player.soco.play_uri, uri) + else: + LOGGER.warning("Received command for unavailable player: %s", player_id) + + async def async_cmd_stop(self, player_id: str): + """ + Send STOP command to given player. + :param player_id: player_id of the player to handle the command. + """ + player = self._players.get(player_id) + if player: + self.mass.add_job(player.soco.stop) + else: + LOGGER.warning("Received command for unavailable player: %s", player_id) + + async def async_cmd_play(self, player_id: str): + """ + Send STOP command to given player. + :param player_id: player_id of the player to handle the command. + """ + player = self._players.get(player_id) + if player: + self.mass.add_job(player.soco.play) + else: + LOGGER.warning("Received command for unavailable player: %s", player_id) + + async def async_cmd_pause(self, player_id: str): + """ + Send PAUSE command to given player. + :param player_id: player_id of the player to handle the command. + """ + player = self._players.get(player_id) + if player: + self.mass.add_job(player.soco.pause) + else: + LOGGER.warning("Received command for unavailable player: %s", player_id) + + async def async_cmd_next(self, player_id: str): + """ + Send NEXT TRACK command to given player. + :param player_id: player_id of the player to handle the command. + """ + player = self._players.get(player_id) + if player: + self.mass.add_job(player.soco.next) + else: + LOGGER.warning("Received command for unavailable player: %s", player_id) + + async def async_cmd_previous(self, player_id: str): + """ + Send PREVIOUS TRACK command to given player. + :param player_id: player_id of the player to handle the command. + """ + player = self._players.get(player_id) + if player: + self.mass.add_job(player.soco.previous) + else: + LOGGER.warning("Received command for unavailable player: %s", player_id) + + async def async_cmd_power_on(self, player_id: str): + """ + Send POWER ON command to given player. + :param player_id: player_id of the player to handle the command. + """ + player = self._players.get(player_id) + if player: + # power is not supported so abuse mute instead + player.soco.mute = False + player.powered = True + else: + LOGGER.warning("Received command for unavailable player: %s", player_id) + + async def async_cmd_power_off(self, player_id: str): + """ + Send POWER OFF command to given player. + :param player_id: player_id of the player to handle the command. + """ + player = self._players.get(player_id) + if player: + # power is not supported so abuse mute instead + self.mass.add_job(player.soco.stop) + player.soco.mute = True + player.powered = False + else: + LOGGER.warning("Received command for unavailable player: %s", player_id) + + async def async_cmd_volume_set(self, player_id: str, volume_level: int): + """ + Send volume level command to given player. + :param player_id: player_id of the player to handle the command. + :param volume_level: volume level to set (0..100). + """ + player = self._players.get(player_id) + if player: + player.soco.volume = volume_level + else: + LOGGER.warning("Received command for unavailable player: %s", player_id) + + async def async_cmd_volume_mute(self, player_id: str, is_muted=False): + """ + Send volume MUTE command to given player. + :param player_id: player_id of the player to handle the command. + :param is_muted: bool with new mute state. + """ + player = self._players.get(player_id) + if player: + player.soco.mute = is_muted + else: + LOGGER.warning("Received command for unavailable player: %s", player_id) + + async def async_cmd_queue_play_index(self, player_id: str, index: int): + """ + Play item at index X on player's queue + :param player_id: player_id of the player to handle the command. + :param index: (int) index of the queue item that should start playing + """ + player = self._players.get(player_id) + if player: + self.mass.add_job(player.soco.play_from_queue, index) + else: + LOGGER.warning("Received command for unavailable player: %s", player_id) + + async def async_cmd_queue_load(self, player_id: str, queue_items: List[QueueItem]): + """ + Load/overwrite given items in the player's queue implementation + :param player_id: player_id of the player to handle the command. + :param queue_items: a list of QueueItems + """ + player = self._players.get(player_id) + if player: + self.mass.add_job(player.soco.clear_queue) + for pos, item in enumerate(queue_items): + self.mass.add_job(player.soco.add_uri_to_queue, item.uri, pos) + else: + LOGGER.warning("Received command for unavailable player: %s", player_id) + + async def async_cmd_queue_insert( + self, player_id: str, queue_items: List[QueueItem], insert_at_index: int + ): + """ + Insert new items at position X into existing queue. + If insert_at_index 0 or None, will start playing newly added item(s) + :param player_id: player_id of the player to handle the command. + :param queue_items: a list of QueueItems + :param insert_at_index: queue position to insert new items + """ + player = self._players.get(player_id) + if player: + for pos, item in enumerate(queue_items): + self.mass.add_job(player.soco.add_uri_to_queue, item.uri, insert_at_index + pos) + else: + LOGGER.warning("Received command for unavailable player: %s", player_id) + + async def async_cmd_queue_append(self, player_id: str, queue_items: List[QueueItem]): + """ + Append new items at the end of the queue. + :param player_id: player_id of the player to handle the command. + :param queue_items: a list of QueueItems + """ + player_queue = self.mass.player_manager.get_player_queue(player_id) + if player_queue: + return await self.async_cmd_queue_insert( + player_id, queue_items, len(player_queue.items) + ) + else: + LOGGER.warning("Received command for unavailable player: %s", player_id) + + async def async_cmd_queue_clear(self, player_id: str): + """ + Clear the player's queue. + :param player_id: player_id of the player to handle the command. + """ + player = self._players.get(player_id) + if player: + self.mass.add_job(player.soco.clear_queue) + else: + LOGGER.warning("Received command for unavailable player: %s", player_id) + + @run_periodic(1800) + async def __async_periodic_discovery(self): + """Run Sonos discovery at interval.""" + self._tasks.append(self.mass.add_job(None, self.__run_discovery)) + + def __run_discovery(self): + """Background Sonos discovery and handler, runs in executor thread.""" + if self._discovery_running: + return + self._discovery_running = True + LOGGER.debug("Sonos discovery started...") + discovered_devices = soco.discover() + if discovered_devices is None: + discovered_devices = [] + new_device_ids = [item.uid for item in discovered_devices] + cur_player_ids = [item.player_id for item in self._players.values()] + # remove any disconnected players... + for player in list(self._players.values()): + if not player.is_group and not player.soco.uid in new_device_ids: + self.mass.add_job(self.mass.player_manager.async_remove_player(player.player_id)) + for sub in player.subscriptions: + sub.unsubscribe() + self._players.pop(player, None) + # process new players + for device in discovered_devices: + if device.uid not in cur_player_ids and device.is_visible: + self.__device_discovered(device) + # handle groups + if len(discovered_devices) > 0: + self.__process_groups(discovered_devices[0].all_groups) + else: + self.__process_groups([]) + + def __device_discovered(self, soco_device: soco.SoCo): + """Handle discovered Sonos player.""" + speaker_info = soco_device.get_speaker_info(True) + player = Player( + player_id=soco_device.uid, + provider_id=PROV_ID, + name=soco_device.player_name, + features=PLAYER_FEATURES, + config_entries=PLAYER_CONFIG_ENTRIES, + device_info=DeviceInfo( + model=speaker_info["model_name"], + address=speaker_info["mac_address"], + manufacturer=PROV_NAME, + ), + ) + # store soco object on player + player.soco = soco_device + player.media_position_updated_at = 0 + # handle subscriptions to events + player.subscriptions = [] + + def subscribe(service, _callback): + queue = ProcessSonosEventQueue(soco_device.uid, _callback) + sub = service.subscribe(auto_renew=True, event_queue=queue) + player.subscriptions.append(sub) + + subscribe(soco_device.avTransport, self.__player_event) + subscribe(soco_device.renderingControl, self.__player_event) + subscribe(soco_device.zoneGroupTopology, self.__topology_changed) + self.mass.run_task(self.mass.player_manager.async_add_player(player)) + return player + + def __player_event(self, player_id: str, event): + """Handle a SoCo player event.""" + player = self._players[player_id] + if event: + variables = event.variables + if "volume" in variables: + player.volume_level = int(variables["volume"]["Master"]) + if "mute" in variables: + player.muted = variables["mute"]["Master"] == "1" + else: + player.volume_level = player.soco.volume + player.muted = player.soco.mute + transport_info = player.soco.get_current_transport_info() + current_transport_state = transport_info.get("current_transport_state") + if current_transport_state == "TRANSITIONING": + return + if player.soco.is_playing_tv or player.soco.is_playing_line_in: + player.powered = False + else: + new_state = __convert_state(current_transport_state) + player.state = new_state + track_info = player.soco.get_current_track_info() + player.current_uri = track_info["uri"] + position_info = player.soco.avTransport.GetPositionInfo( + [("InstanceID", 0), ("Channel", "Master")] + ) + rel_time = __timespan_secs(position_info.get("RelTime")) + player.elapsed_time = rel_time + if player.state == PlayerState.Playing: + self.mass.add_job(self.__async_report_progress(player_id)) + self.mass.add_job(self.mass.player_manager.async_update_player(player)) + + def __process_groups(self, sonos_groups): + """Process all sonos groups.""" + all_group_ids = [] + for group in sonos_groups: + all_group_ids.append(group.uid) + if group.uid not in self._players: + # new group player + group_player = self.__device_discovered(group.coordinator) + else: + group_player = self._players[group.uid] + # check members + group_player.is_group_player = True + group_player.name = group.label + group_player.group_childs = [item.uid for item in group.members] + self.mass.run_task(self.mass.player_manager.async_update_player(group_player)) + + async def __topology_changed(self, player_id, event=None): + """ + Received topology changed event + from one of the sonos players. + Schedule discovery to work out the changes. + """ + self.mass.add_job(self.__run_discovery) + + async def __async_report_progress(self, player_id: str): + """Report current progress while playing.""" + if player_id in self._report_progress_tasks: + return # already running + # sonos does not send instant updates of the player's progress (elapsed time) + # so we need to send it in periodically + player = self._players[player_id] + player.should_poll = True + while player and player.state == PlayerState.Playing: + time_diff = time.time() - player.media_position_updated_at + adjusted_current_time = player.elapsed_time + time_diff + player.elapsed_time = adjusted_current_time + await asyncio.sleep(1) + player.should_poll = False + self._report_progress_tasks.pop(player_id, None) + + +def __convert_state(sonos_state: str) -> PlayerState: + """Convert Sonos state to PlayerState.""" + if sonos_state == "PLAYING": + return PlayerState.Playing + if sonos_state == "TRANSITIONING": + return PlayerState.Playing + if sonos_state == "PAUSED_PLAYBACK": + return PlayerState.Paused + return PlayerState.Stopped + + +def __timespan_secs(timespan): + """Parse a time-span into number of seconds.""" + if timespan in ("", "NOT_IMPLEMENTED", None): + return None + return sum(60 ** x[0] * int(x[1]) for x in enumerate(reversed(timespan.split(":")))) + + +class ProcessSonosEventQueue: + """Queue like object for dispatching sonos events.""" + + def __init__(self, player_id, callback_handler): + """Initialize Sonos event queue.""" + self._callback_handler = callback_handler + self._player_id = player_id + + def put(self, item, block=True, timeout=None): + """Process event.""" + self._callback_handler(self._player_id, item) diff --git a/music_assistant/providers/sonos/todo.py b/music_assistant/providers/sonos/todo.py deleted file mode 100644 index b60dc57d..00000000 --- a/music_assistant/providers/sonos/todo.py +++ /dev/null @@ -1,267 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding:utf-8 -*- - -import asyncio -import logging -import time -import types -from typing import List - -import aiohttp -from music_assistant.constants import CONF_ENABLED, CONF_HOSTNAME, CONF_PORT -from music_assistant.models.player import Player, PlayerState -from music_assistant.models.player_queue import PlayerQueue, QueueItem -from music_assistant.models.playerprovider import PlayerProvider -from music_assistant.utils import LOGGER, run_periodic, try_parse_int - -PROV_ID = "sonos" -PROV_NAME = "Sonos" -PROV_CLASS = "SonosProvider" - -CONFIG_ENTRIES = [(CONF_ENABLED, True, CONF_ENABLED)] - - -class SonosPlayer(Player): - """Sonos player object""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.__sonos_report_progress_task = None - - def __del__(self): - if self.__sonos_report_progress_task: - self.__sonos_report_progress_task.cancel() - - async def async_cmd_stop(self): - """Send stop command to player.""" - self.soco.stop() - - async def async_cmd_play(self): - """Send play command to player.""" - self.soco.play() - - async def async_cmd_pause(self): - """Send pause command to player.""" - self.soco.pause() - - async def async_cmd_next(self): - """Send next track command to player.""" - self.soco.next() - - async def async_cmd_previous(self): - """Send previous track command to player.""" - self.soco.previous() - - async def async_cmd_power_on(self): - """Send power ON command to player.""" - self.powered = True - - async def async_cmd_power_off(self): - """Send power OFF command to player.""" - self.powered = False - # power is not supported so send stop instead - self.soco.stop() - - async def async_cmd_volume_set(self, volume_level): - """Send new volume level command to player.""" - self.soco.volume = volume_level - - async def async_cmd_volume_mute(self, is_muted=False): - """Send mute command to player.""" - self.soco.mute = is_muted - - async def async_cmd_play_uri(self, uri: str): - """Play single uri on player.""" - self.soco.play_uri(uri) - - async def async_cmd_queue_play_index(self, index: int): - """ - play item at index X on player's queue - :attrib index: (int) index of the queue item that should start playing - """ - self.soco.play_from_queue(index) - - async def async_cmd_queue_load(self, queue_items: List[QueueItem]): - """load (overwrite) queue with new items""" - self.soco.clear_queue() - for pos, item in enumerate(queue_items): - self.soco.add_uri_to_queue(item.uri, pos) - - async def async_cmd_queue_insert(self, queue_items: List[QueueItem], insert_at_index): - for pos, item in enumerate(queue_items): - self.soco.add_uri_to_queue(item.uri, insert_at_index + pos) - - async def async_cmd_queue_append(self, queue_items: List[QueueItem]): - """ - append new items at the end of the queue - """ - last_index = len(self.queue.items) - for pos, item in enumerate(queue_items): - self.soco.add_uri_to_queue(item.uri, last_index + pos) - - async def __async_report_progress(self): - """report current progress while playing""" - # sonos does not send instant updates of the player's progress (cur_time) - # so we need to send it in periodically - while self._state == PlayerState.Playing: - time_diff = time.time() - self.media_position_updated_at - adjusted_current_time = self._cur_time + time_diff - self.cur_time = adjusted_current_time - await asyncio.sleep(1) - self.__sonos_report_progress_task = None - - async def async_update_state(self, event=None): - """update state, triggerer by event""" - if event: - variables = event.variables - if "volume" in variables: - self.volume_level = int(variables["volume"]["Master"]) - if "mute" in variables: - self.muted = variables["mute"]["Master"] == "1" - else: - self.volume_level = self.soco.volume - self.muted = self.soco.mute - transport_info = self.soco.get_current_transport_info() - current_transport_state = transport_info.get("current_transport_state") - if current_transport_state == "TRANSITIONING": - return - if self.soco.is_playing_tv or self.soco.is_playing_line_in: - self.powered = False - return - new_state = self.__convert_state(current_transport_state) - self.state = new_state - track_info = self.soco.get_current_track_info() - self.current_uri = track_info["uri"] - position_info = self.soco.avTransport.GetPositionInfo( - [("InstanceID", 0), ("Channel", "Master")] - ) - rel_time = self.__timespan_secs(position_info.get("RelTime")) - self.cur_time = rel_time - if ( - self._state == PlayerState.Playing - and self.__sonos_report_progress_task == None - ): - self.__sonos_report_progress_task = self.mass.add_job( - self.__report_progress() - ) - - @staticmethod - def __convert_state(sonos_state): - """convert sonos state to internal state""" - if sonos_state == "PLAYING": - return PlayerState.Playing - elif sonos_state == "PAUSED_PLAYBACK": - return PlayerState.Paused - else: - return PlayerState.Stopped - - @staticmethod - def __timespan_secs(timespan): - """Parse a time-span into number of seconds.""" - if timespan in ("", "NOT_IMPLEMENTED", None): - return None - return sum( - 60 ** x[0] * int(x[1]) for x in enumerate(reversed(timespan.split(":"))) - ) - - -class SonosProvider(PlayerProvider): - """support for Sonos speakers""" - - _discovery_running = False - - async def async_setup(self, conf): - """perform async setup""" - self.mass.add_job(self.__periodic_discovery()) - - @run_periodic(1800) - async def __async_periodic_discovery(self): - """run sonos discovery on interval""" - self.mass.loop.run_in_executor(None, self.run_discovery) - - def run_discovery(self): - """background sonos discovery and handler""" - if self._discovery_running: - return - self._discovery_running = True - LOGGER.debug("Sonos discovery started...") - import soco - - discovered_devices = soco.discover() - if discovered_devices == None: - discovered_devices = [] - new_device_ids = [item.uid for item in discovered_devices] - cur_player_ids = [item.player_id for item in self.players] - # remove any disconnected players... - for player in self.players: - if not player.is_group and not player.soco.uid in new_device_ids: - self.mass.run_task(self.remove_player(player.player_id)) - # process new players - for device in discovered_devices: - if device.uid not in cur_player_ids and device.is_visible: - self.__device_discovered(device) - # handle groups - if len(discovered_devices) > 0: - self.__process_groups(discovered_devices[0].all_groups) - else: - self.__process_groups([]) - - def __device_discovered(self, soco_device): - """handle new sonos player.""" - player = SonosPlayer(self.mass, soco_device.uid, self.prov_id) - player.soco = soco_device - player.name = soco_device.player_name - self.supports_queue = True - self.supports_gapless = True - self.supports_crossfade = True - player._subscriptions = [] - player._media_position_updated_at = None - # handle subscriptions to events - def subscribe(service, action): - queue = _ProcessSonosEventQueue(self.mass, action) - sub = service.subscribe(auto_renew=True, event_queue=queue) - player._subscriptions.append(sub) - - subscribe(soco_device.avTransport, player.update_state) - subscribe(soco_device.renderingControl, player.update_state) - subscribe(soco_device.zoneGroupTopology, self.__topology_changed) - self.mass.run_task(self.add_player(player)) - return player - - def __process_groups(self, sonos_groups): - """process all sonos groups""" - all_group_ids = [] - for group in sonos_groups: - all_group_ids.append(group.uid) - if group.uid not in self.mass.player_manager._players: - # new group player - group_player = self.__device_discovered(group.coordinator) - else: - group_player = self.mass.player_manager.get_player_sync(group.uid) - # check members - group_player.name = group.label - group_player.group_childs = [item.uid for item in group.members] - - async def __async_topology_changed(self, event=None): - """ - received topology changed event - from one of the sonos players - schedule discovery to work out the changes - """ - self.mass.loop.run_in_executor(None, self.run_discovery) - - -class _ProcessSonosEventQueue: - """Queue like object for dispatching sonos events.""" - - def __init__(self, mass, handler): - """Initialize Sonos event queue.""" - self._handler = handler - self.mass = mass - - def put(self, item, block=True, timeout=None): - """Process event.""" - try: - self.mass.run_task(self._handler(item), wait_for_result=True) - except Exception as ex: - LOGGER.warning("Error calling %s: %s", self._handler, ex) diff --git a/music_assistant/providers/squeezebox/__init__.py b/music_assistant/providers/squeezebox/__init__.py index 9873261f..ec646b2f 100644 --- a/music_assistant/providers/squeezebox/__init__.py +++ b/music_assistant/providers/squeezebox/__init__.py @@ -305,7 +305,7 @@ class PySqueezeProvider(PlayerProvider): await self.mass.player_manager.async_update_player(socket_client) elif event == Event.EVENT_DISCONNECTED: await self.mass.player_manager.async_remove_player(socket_client.player_id) - self._socket_clients.pop(socket_client.player_id) + self._socket_clients.pop(socket_client.player_id, None) del socket_client elif event == Event.EVENT_DECODER_READY: # player is ready for the next track (if any)