def create_sample_rates_config_entry(
- max_sample_rate: int,
- max_bit_depth: int,
+ supported_sample_rates: list[int] | None = None,
+ supported_bit_depths: list[int] | None = None,
+ hidden: bool = False,
+ max_sample_rate: int | None = None,
+ max_bit_depth: int | None = None,
safe_max_sample_rate: int = 48000,
safe_max_bit_depth: int = 16,
- hidden: bool = False,
- supported_sample_rates: list[int] | None = None,
) -> ConfigEntry:
"""Create sample rates config entry based on player specific helpers."""
assert CONF_ENTRY_SAMPLE_RATES.options
+ if supported_sample_rates is None:
+ supported_sample_rates = []
+ if supported_bit_depths is None:
+ supported_bit_depths = []
conf_entry = ConfigEntry.from_dict(CONF_ENTRY_SAMPLE_RATES.to_dict())
conf_entry.hidden = hidden
options: list[ConfigValueOption] = []
default_value: list[str] = []
+
for option in CONF_ENTRY_SAMPLE_RATES.options:
option_value = cast(str, option.value)
sample_rate_str, bit_depth_str = option_value.split(MULTI_VALUE_SPLITTER, 1)
sample_rate = int(sample_rate_str)
bit_depth = int(bit_depth_str)
- if supported_sample_rates and sample_rate not in supported_sample_rates:
+ # if no supported sample rates are defined, we accept all within max_sample_rate
+ if not supported_sample_rates and max_sample_rate and sample_rate <= max_sample_rate:
+ supported_sample_rates.append(sample_rate)
+ if not supported_bit_depths and max_bit_depth and bit_depth <= max_bit_depth:
+ supported_bit_depths.append(bit_depth)
+
+ if sample_rate not in supported_sample_rates:
+ continue
+ if bit_depth not in supported_bit_depths:
continue
- if sample_rate <= max_sample_rate and bit_depth <= max_bit_depth:
- options.append(option)
+ options.append(option)
if sample_rate <= safe_max_sample_rate and bit_depth <= safe_max_bit_depth:
default_value.append(option_value)
conf_entry.options = options
from __future__ import annotations
import asyncio
+import logging
+import os
import time
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, TypedDict, cast
from hass_client.exceptions import FailedCommand
from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
from music_assistant_models.enums import ConfigEntryType, PlayerFeature, PlayerState, PlayerType
-from music_assistant_models.errors import SetupFailedError
+from music_assistant_models.errors import InvalidDataError, LoginFailed, SetupFailedError
from music_assistant_models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.constants import (
CONF_ENTRY_CROSSFADE_DURATION,
CONF_ENTRY_ENABLE_ICY_METADATA,
CONF_ENTRY_ENFORCE_MP3_DEFAULT_ENABLED,
+ CONF_ENTRY_ENFORCE_MP3_HIDDEN,
CONF_ENTRY_FLOW_MODE_ENFORCED,
CONF_ENTRY_HTTP_PROFILE,
CONF_ENTRY_HTTP_PROFILE_FORCED_2,
CONF_ENTRY_ENABLE_ICY_METADATA,
CONF_ENTRY_FLOW_MODE_ENFORCED,
)
-ESPHOME_V2_MODELS = (
- # The Home Assistant Voice PE introduces a new ESPHome mediaplayer
- # that supports FLAC 48khz/16 bits and has some other optimizations
- # this player is also used in some other (voice) ESPHome projects
- # so until the new media player component is merged into ESPHome
- # we keep a list here of model names that use the new player
- "Home Assistant Voice PE",
- "Koala Satellite",
- "Respeaker Lite Satellite",
- "Satellite1",
-)
-ESPHOME_V2_MODELS_PLAYER_CONFIG_ENTRIES = (
- # New ESPHome mediaplayer (used in Voice PE) uses FLAC 48khz/16 bits
- CONF_ENTRY_CROSSFADE,
- CONF_ENTRY_CROSSFADE_DURATION,
- CONF_ENTRY_FLOW_MODE_ENFORCED,
- CONF_ENTRY_HTTP_PROFILE_FORCED_2,
- create_sample_rates_config_entry(48000, 16, hidden=True, supported_sample_rates=[48000]),
- # although the Voice PE supports announcements, it does not support volume for announcements
- *HIDDEN_ANNOUNCE_VOLUME_CONFIG_ENTRIES,
-)
async def _get_hass_media_players(
yield state
+class ESPHomeSupportedAudioFormat(TypedDict):
+ """ESPHome Supported Audio Format."""
+
+ format: str # flac or mp3
+ sample_rate: int # e.g. 48000
+ num_channels: int # 1 for announcements, 2 for media
+ purpose: int # 0 for media, 1 for announcements
+ sample_bytes: int # 1 for 8 bit, 2 for 16 bit, 4 for 32 bit
+
+
async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
) -> ProviderInstanceType:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
base_entries = await super().get_player_config_entries(player_id)
player = self.mass.players.get(player_id)
- if player and player.device_info.model in ESPHOME_V2_MODELS:
+ if player and player.extra_data.get("esphome_supported_audio_formats"):
# optimized config for new ESPHome mediaplayer
- return base_entries + ESPHOME_V2_MODELS_PLAYER_CONFIG_ENTRIES
+ supported_sample_rates: list[int] = []
+ supported_bit_depths: list[int] = []
+ supports_flac: bool = False
+ for supported_format in player.extra_data["esphome_supported_audio_formats"]:
+ if supported_format["purpose"] != 0:
+ continue
+ if supported_format["format"] == "flac":
+ supports_flac = True
+ if supported_format["sample_rate"] not in supported_sample_rates:
+ supported_sample_rates.append(supported_format["sample_rate"])
+ bit_depth = supported_format["sample_bytes"] * 8
+ if bit_depth not in supported_bit_depths:
+ supported_bit_depths.append(bit_depth)
+ if not supports_flac:
+ base_entries = (*base_entries, CONF_ENTRY_ENFORCE_MP3_HIDDEN)
+ return (
+ *base_entries,
+ # New ESPHome mediaplayer (used in Voice PE) uses FLAC 48khz/16 bits
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_FLOW_MODE_ENFORCED,
+ CONF_ENTRY_HTTP_PROFILE_FORCED_2,
+ create_sample_rates_config_entry(
+ supported_sample_rates=supported_sample_rates,
+ supported_bit_depths=supported_bit_depths,
+ hidden=True,
+ ),
+ # although the Voice PE supports announcements,
+ # it does not support volume for announcements
+ *HIDDEN_ANNOUNCE_VOLUME_CONFIG_ENTRIES,
+ )
+
return base_entries + DEFAULT_PLAYER_CONFIG_ENTRIES
async def cmd_stop(self, player_id: str) -> None:
async def play_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA on given player."""
- is_esphome_v2 = self.mass.players.get(player_id).device_info.model in ESPHOME_V2_MODELS
- if self.mass.config.get_raw_player_config_value(
- player_id, CONF_ENFORCE_MP3, not is_esphome_v2
+ if self.mass.config.get_player_config_value(
+ player_id,
+ CONF_ENFORCE_MP3,
):
media.uri = media.uri.replace(".flac", ".mp3")
player = self.mass.players.get(player_id, True)
"""Handle setup of a Player from an hass entity."""
hass_device: HassDevice | None = None
hass_domain: str | None = None
+ extra_player_data: dict[str, Any] = {}
if entity_registry_entry := entity_registry.get(state["entity_id"]):
hass_device = device_registry.get(entity_registry_entry["device_id"])
hass_domain = entity_registry_entry["platform"]
+ extra_player_data["entity_registry_id"] = entity_registry_entry["id"]
+ extra_player_data["hass_domain"] = hass_domain
+ extra_player_data["hass_device_id"] = hass_device["id"] if hass_device else None
+ if hass_domain == "esphome":
+ # if the player is an ESPHome player, we need to check if it is a V2 player
+ # as the V2 player has different capabilities and needs different config entries
+ # The new media player component publishes its supported sample rates but that info
+ # is not exposed directly by HA, so we fetch it from the diagnostics.
+ esphome_supported_audio_formats = await self._get_esphome_supported_audio_formats(
+ entity_registry_entry["config_entry_id"]
+ )
+ extra_player_data["esphome_supported_audio_formats"] = (
+ esphome_supported_audio_formats
+ )
dev_info: dict[str, Any] = {}
- if hass_device and (model := hass_device.get("model")):
- dev_info["model"] = model
- if hass_device and (manufacturer := hass_device.get("manufacturer")):
- dev_info["manufacturer"] = manufacturer
- if hass_device and (model_id := hass_device.get("model_id")):
- dev_info["model_id"] = model_id
- if hass_device and (sw_version := hass_device.get("sw_version")):
- dev_info["software_version"] = sw_version
- if hass_device and (connections := hass_device.get("connections")):
- for key, value in connections:
- if key == "mac":
- dev_info["mac_address"] = value
+ if hass_device:
+ extra_player_data["hass_device_id"] = hass_device["id"]
+ if model := hass_device.get("model"):
+ dev_info["model"] = model
+ if manufacturer := hass_device.get("manufacturer"):
+ dev_info["manufacturer"] = manufacturer
+ if model_id := hass_device.get("model_id"):
+ dev_info["model_id"] = model_id
+ if sw_version := hass_device.get("sw_version"):
+ dev_info["software_version"] = sw_version
+ if connections := hass_device.get("connections"):
+ for key, value in connections:
+ if key == "mac":
+ dev_info["mac_address"] = value
player = Player(
player_id=state["entity_id"],
available=state["state"] not in UNAVAILABLE_STATES,
device_info=DeviceInfo.from_dict(dev_info),
state=StateMap.get(state["state"], PlayerState.IDLE),
- extra_data={
- "hass_domain": hass_domain,
- "hass_device_id": hass_device["id"] if hass_device else None,
- },
+ extra_data=extra_player_data,
)
# work out supported features
hass_supported_features = MediaPlayerEntityFeature(
if state["entity_id"] != entity_id:
continue
await self._setup_player(state, entity_registry, device_registry)
+
+ async def _get_esphome_supported_audio_formats(
+ self, conf_entry_id: str
+ ) -> list[ESPHomeSupportedAudioFormat]:
+ """Get supported audio formats for an ESPHome device."""
+ result: list[ESPHomeSupportedAudioFormat] = []
+ try:
+ # TODO: expose this in the hass client lib instead of hacking around private vars
+ ws_url = self.hass_prov.hass._websocket_url or "ws://supervisor/core/websocket"
+ hass_url = ws_url.replace("ws://", "http://").replace("wss://", "https://")
+ hass_url = hass_url.replace("/api/websocket", "").replace("/websocket", "")
+ api_token = self.hass_prov.hass._token or os.environ.get("HASSIO_TOKEN")
+ url = f"{hass_url}/api/diagnostics/config_entry/{conf_entry_id}"
+ headers = {
+ "Authorization": f"Bearer {api_token}",
+ "content-type": "application/json",
+ }
+ async with self.mass.http_session.get(url, headers=headers) as response:
+ if response.status != 200:
+ raise LoginFailed("Unable to contact Home Assistant to retrieve diagnostics")
+ data = await response.json()
+ if "data" not in data or "storage_data" not in data["data"]:
+ return result
+ if "media_player" not in data["data"]["storage_data"]:
+ raise InvalidDataError("Media player info not found in ESPHome diagnostics")
+ for media_player_obj in data["data"]["storage_data"]["media_player"]:
+ if "supported_formats" not in media_player_obj:
+ continue
+ for supported_format_obj in media_player_obj["supported_formats"]:
+ result.append(cast(ESPHomeSupportedAudioFormat, supported_format_obj))
+ except Exception as exc:
+ self.logger.warning(
+ "Failed to fetch diagnostics for ESPHome player: %s",
+ str(exc),
+ exc_info=exc if self.logger.isEnabledFor(logging.DEBUG) else None,
+ )
+ return result