assert isinstance(provider, PlayerProvider)
provider.on_player_config_removed(player_id)
- def create_default_player_config(self, player_id: str, provider: str, name: str) -> None:
+ def create_default_player_config(
+ self, player_id: str, provider: str, name: str, enabled: bool
+ ) -> None:
"""
Create default/empty PlayerConfig.
# config does not yet exist, create a default one
conf_key = f"{CONF_PLAYERS}/{player_id}"
default_conf = PlayerConfig(
- values={}, provider=provider, player_id=player_id, default_name=name
+ values={}, provider=provider, player_id=player_id, enabled=enabled, default_name=name
)
self.set(
conf_key,
raise AlreadyRegisteredError(f"Player {player_id} is already registered")
# make sure a default config exists
- self.mass.config.create_default_player_config(player_id, player.provider, player.name)
+ self.mass.config.create_default_player_config(
+ player_id, player.provider, player.name, player.enabled_by_default
+ )
player.enabled = self.mass.config.get(f"{CONF_PLAYERS}/{player_id}/enabled", True)
if queue_item is None:
return (
'<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
+ f'<item id="flowmode" parentID="0" restricted="1">'
"<dc:title>Music Assistant</dc:title>"
f"<upnp:albumArtURI>{escape_string(MASS_LOGO_ONLINE)}</upnp:albumArtURI>"
"<upnp:class>object.item.audioItem.audioBroadcast</upnp:class>"
import os
import platform
import xml.etree.ElementTree as ET # noqa: N817
+from contextlib import suppress
from typing import TYPE_CHECKING
import aiofiles
slimproto_prov = self.mass.get_provider("slimproto")
await slimproto_prov.cmd_unsync(player_id)
- def _handle_player_register_callback(self, player: Player) -> None:
+ async def _handle_player_register_callback(self, player: Player) -> None:
"""Handle player register callback from slimproto source player."""
# TODO: Can we get better device info from mDNS ?
player.provider = self.domain
)
player.supports_24bit = False
+ # extend info from the discovery xml
+ async with aiofiles.open(self._config_file, "r") as _file:
+ xml_data = await _file.read()
+ with suppress(ET.ParseError):
+ xml_root = ET.XML(xml_data)
+ for device_elem in xml_root.findall("device"):
+ player_id = device_elem.find("mac").text
+ if player_id != player.player_id:
+ continue
+ # prefer name from UDN because default name is often wrong
+ udn = device_elem.find("udn").text
+ udn_name = udn.split("@")[1].split("._")[0]
+ player.name = udn_name
+ # disable sonos by default
+ if "sonos" in (device_elem.find("friendly_name").text or "").lower():
+ player.enabled_by_default = False
+ # TODO: query more info directly from the device
+ player.device_info = DeviceInfo(
+ model="Airplay device",
+ address=player.device_info.address,
+ manufacturer="SONOS",
+ )
+ break
+
def _handle_player_update_callback(self, player: Player) -> None:
"""Handle player update callback from slimproto source player."""
# we could override anything on the player object here
)
return
+ # Disable TV's by default
+ # (can be enabled manually by the user)
+ enabled_by_default = True
+ for exclude in ("tv", "/12", "PUS", "OLED"):
+ if exclude.lower() in cast_info.friendly_name.lower():
+ enabled_by_default = False
+
# Instantiate chromecast object
castplayer = CastPlayer(
player_id,
PlayerFeature.VOLUME_SET,
),
max_sample_rate=96000,
+ enabled_by_default=enabled_by_default,
),
logger=self.logger.getChild(cast_info.friendly_name),
)
await self.cmd_stop(player_id)
didl_metadata = create_didl_metadata(self.mass, url, queue_item)
- await dlna_player.device.async_set_transport_uri(url, queue_item.name, didl_metadata)
+ title = queue_item.name if queue_item else "Music Assistant"
+ await dlna_player.device.async_set_transport_uri(url, title, didl_metadata)
# Play it
await dlna_player.device.async_wait_for_can_play(10)
await dlna_player.device.async_play()
import statistics
import time
from collections import deque
-from collections.abc import Callable, Generator
+from collections.abc import Callable, Coroutine, Generator
from contextlib import suppress
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
_socket_servers: list[asyncio.Server | asyncio.BaseTransport]
_socket_clients: dict[str, SlimClient]
_sync_playpoints: dict[str, deque[SyncPlayPoint]]
- _virtual_providers: dict[str, tuple[Callable, Callable]]
+ _virtual_providers: dict[str, tuple[Coroutine, Callable]]
_do_not_resync_before: dict[str, float]
_cli: LmsCli
port: int = DEFAULT_SLIMPROTO_PORT
return
# forward player update to MA player controller
- self._handle_player_update(client)
+ self.mass.create_task(self._handle_player_update(client))
# construct SlimClient from socket client
SlimClient(reader, writer, client_callback)
def register_virtual_provider(
self,
player_model: str,
- register_callback: Callable,
+ register_callback: Coroutine,
update_callback: Callable,
) -> None:
"""Register a virtual provider based on slimproto, such as the airplay bridge."""
"""Unregister a virtual provider."""
self._virtual_providers.pop(player_model, None)
- def _handle_player_update(self, client: SlimClient) -> None:
+ async def _handle_player_update(self, client: SlimClient) -> None:
"""Process SlimClient update/add to Player controller."""
player_id = client.player_id
virtual_provider_info = self._virtual_providers.get(client.device_model)
)
if virtual_provider_info:
# if this player is part of a virtual provider run the callback
- virtual_provider_info[0](player)
+ await virtual_provider_info[0](player)
self.mass.players.register_or_update(player)
# update player state on player events
self._socket_clients[player_id] = client
# update all attributes
- self._handle_player_update(client)
+ await self._handle_player_update(client)
# update existing players so they can update their `can_sync_with` field
for item in self._socket_clients.values():
if item.player_id == player_id:
continue
- self._handle_player_update(item)
+ await self._handle_player_update(item)
# restore volume and power state
if last_state := await self.mass.cache.get(f"{CACHE_KEY_PREV_STATE}.{player_id}"):
init_power = last_state[0]
# enqueue next item if needed
if sonos_player.player.state == PlayerState.PLAYING and (
- sonos_player.next_url or sonos_player.next_url == sonos_player.player.current_url
+ sonos_player.next_url is None
+ or sonos_player.next_url == sonos_player.player.current_url
):
self.mass.create_task(self._enqueue_next_track(sonos_player))