from typing import TYPE_CHECKING
from uuid import UUID
-from pychromecast import (
- APP_BUBBLEUPNP,
- APP_MEDIA_RECEIVER,
- Chromecast,
- get_chromecast_from_cast_info,
-)
+import pychromecast
+from pychromecast.controllers.bubbleupnp import BubbleUPNPController
from pychromecast.controllers.media import STREAM_TYPE_BUFFERED, STREAM_TYPE_LIVE
from pychromecast.controllers.multizone import MultizoneController, MultizoneManager
from pychromecast.discovery import CastBrowser, SimpleCastListener
from pychromecast.models import CastInfo
from pychromecast.socket_client import CONNECTION_STATUS_CONNECTED, CONNECTION_STATUS_DISCONNECTED
+from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueOption
from music_assistant.common.models.enums import (
+ ConfigEntryType,
ContentType,
MediaType,
PlayerFeature,
from music_assistant.common.models.errors import PlayerUnavailableError, QueueEmpty
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_PLAYERS, MASS_LOGO_ONLINE
-from music_assistant.server.helpers.compare import compare_strings
+from music_assistant.constants import CONF_HIDE_GROUP_CHILDS, CONF_PLAYERS, MASS_LOGO_ONLINE
from music_assistant.server.models.player_provider import PlayerProvider
-from music_assistant.server.providers.chromecast.helpers import CastStatusListener, ChromecastInfo
+
+from .helpers import CastStatusListener, ChromecastInfo
if TYPE_CHECKING:
from pychromecast.controllers.media import MediaStatus
from pychromecast.controllers.receiver import CastStatus
from pychromecast.socket_client import ConnectionStatus
-
-PLAYER_CONFIG_ENTRIES = tuple()
+CONF_ALT_APP = "alt_app"
@dataclass
player_id: str
cast_info: ChromecastInfo
- cc: Chromecast
+ cc: pychromecast.Chromecast
player: Player
logger: Logger
- is_stereo_pair: bool = False
status_listener: CastStatusListener | None = None
mz_controller: MultizoneController | None = None
next_item: str | None = None
flow_mode_active: bool = False
+ active_group: str | None = None
class ChromecastProvider(PlayerProvider):
for castplayer in list(self.castplayers.values()):
await self._disconnect_chromecast(castplayer)
+ def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
+ """Return all (provider/player specific) Config Entries for the given player (if any)."""
+ cast_player = self.castplayers.get(player_id)
+ entries = (
+ ConfigEntry(
+ key=CONF_ALT_APP,
+ type=ConfigEntryType.BOOLEAN,
+ label="Use alternate Media app",
+ default_value=cast_player
+ and not cast_player.cast_info.is_audio_group
+ and cast_player.cast_info.manufacturer == "Google Inc.",
+ description="Using the BubbleUPNP Media controller for playback improves "
+ "the playback experience but may not work on non-Google hardware.",
+ advanced=True,
+ ),
+ )
+ if (
+ cast_player
+ and cast_player.cast_info.is_audio_group
+ and not cast_player.cast_info.is_multichannel_group
+ ):
+ entries = entries + (
+ ConfigEntry(
+ key=CONF_HIDE_GROUP_CHILDS,
+ type=ConfigEntryType.STRING,
+ options=[
+ ConfigValueOption("Always", "always"),
+ ConfigValueOption("Only if the group is active/powered", "active"),
+ ConfigValueOption("Never", "never"),
+ ],
+ default_value="active",
+ label="Hide playergroup members in UI",
+ description="Hide the individual player entry for the members of this group "
+ "in the user interface.",
+ advanced=True,
+ ),
+ )
+ return entries
+
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
castplayer = self.castplayers[player_id]
async def cmd_power(self, player_id: str, powered: bool) -> None:
"""Send POWER command to given player."""
castplayer = self.castplayers[player_id]
+ # handle player that is hidden by active group player, use mute as power
+ if castplayer.active_group:
+ await self.cmd_volume_mute(player_id, not powered)
+ return
if powered:
await self._launch_app(castplayer)
else:
# only update status of media controller if player is on
if not castplayer.player.powered:
return
-
+ if not castplayer.cc.media_controller.is_active:
+ return
try:
await asyncio.to_thread(castplayer.cc.media_controller.update_status)
except ConnectionResetError as err:
self.logger.debug("Discovered new or updated chromecast %s", disc_info)
castplayer = self.castplayers.get(player_id)
- if not castplayer:
- cast_info = ChromecastInfo.from_cast_info(disc_info)
- cast_info.fill_out_missing_chromecast_info(self.mass.zeroconf)
- if cast_info.is_dynamic_group:
- self.logger.warning("Discovered a dynamic cast group which will be ignored.")
- return
-
- # Instantiate chromecast object
- castplayer = CastPlayer(
- player_id,
- cast_info=cast_info,
- cc=get_chromecast_from_cast_info(
- disc_info,
- self.mass.zeroconf,
- ),
- player=Player(
- player_id=player_id,
- provider=self.domain,
- type=PlayerType.GROUP if cast_info.is_audio_group else PlayerType.PLAYER,
- name=cast_info.friendly_name,
- available=False,
- powered=False,
- device_info=DeviceInfo(
- model=cast_info.model_name,
- address=cast_info.host,
- manufacturer=cast_info.manufacturer,
- ),
- supported_features=(
- PlayerFeature.POWER,
- PlayerFeature.VOLUME_MUTE,
- PlayerFeature.VOLUME_SET,
- ),
- max_sample_rate=96000,
- ),
- logger=self.logger.getChild(cast_info.friendly_name),
+ if castplayer:
+ # if player was already added, the player will take care of reconnects itself.
+ castplayer.cast_info.update(disc_info)
+ self.mass.loop.call_soon_threadsafe(self.mass.players.update, player_id)
+ return
+ # new player discovered
+ cast_info = ChromecastInfo.from_cast_info(disc_info)
+ cast_info.fill_out_missing_chromecast_info(self.mass.zeroconf)
+ if cast_info.is_dynamic_group:
+ self.logger.debug("Discovered a dynamic cast group which will be ignored.")
+ return
+ if cast_info.is_multichannel_child:
+ self.logger.debug(
+ "Discovered a passive (multichannel) endpoint which will be ignored."
)
- self.castplayers[player_id] = castplayer
+ return
- castplayer.status_listener = CastStatusListener(self, castplayer, self.mz_mgr)
- if cast_info.is_audio_group:
- mz_controller = MultizoneController(cast_info.uuid)
- castplayer.cc.register_handler(mz_controller)
- castplayer.mz_controller = mz_controller
- castplayer.cc.start()
+ # Instantiate chromecast object
+ castplayer = CastPlayer(
+ player_id,
+ cast_info=cast_info,
+ cc=pychromecast.get_chromecast_from_cast_info(
+ disc_info,
+ self.mass.zeroconf,
+ ),
+ player=Player(
+ player_id=player_id,
+ provider=self.domain,
+ type=PlayerType.GROUP if cast_info.is_audio_group else PlayerType.PLAYER,
+ name=cast_info.friendly_name,
+ available=False,
+ powered=False,
+ device_info=DeviceInfo(
+ model=cast_info.model_name,
+ address=f"{cast_info.host}:{cast_info.port}",
+ manufacturer=cast_info.manufacturer,
+ ),
+ supported_features=(
+ PlayerFeature.POWER,
+ PlayerFeature.VOLUME_MUTE,
+ PlayerFeature.VOLUME_SET,
+ ),
+ max_sample_rate=96000,
+ ),
+ logger=self.logger.getChild(cast_info.friendly_name),
+ )
+ self.castplayers[player_id] = castplayer
- self.mass.loop.call_soon_threadsafe(
- self.mass.players.register_or_update, castplayer.player
- )
+ castplayer.status_listener = CastStatusListener(self, castplayer, self.mz_mgr)
+ if cast_info.is_audio_group:
+ mz_controller = MultizoneController(cast_info.uuid)
+ castplayer.cc.register_handler(mz_controller)
+ castplayer.mz_controller = mz_controller
- # if player was already added, the player will take care of reconnects itself.
- castplayer.cast_info.update(disc_info)
- self.mass.loop.call_soon_threadsafe(self.mass.players.update, player_id)
+ castplayer.cc.start()
+ self.mass.loop.call_soon_threadsafe(
+ self.mass.players.register_or_update, castplayer.player
+ )
def _on_chromecast_removed(self, uuid, service, cast_info): # noqa: ARG002
"""Handle zeroconf discovery of a removed Chromecast."""
status.volume_level,
)
castplayer.player.name = castplayer.cast_info.friendly_name
- castplayer.player.powered = status.app_id in (
- "705D30C6",
- APP_MEDIA_RECEIVER,
- APP_BUBBLEUPNP,
- )
- castplayer.is_stereo_pair = (
- castplayer.cast_info.is_audio_group
- and castplayer.mz_controller
- and castplayer.mz_controller.members
- and compare_strings(castplayer.mz_controller.members[0], castplayer.player_id)
- )
+ if castplayer.active_group:
+ # use mute as power when group is active
+ castplayer.player.powered = not status.volume_muted
+ else:
+ castplayer.player.powered = (
+ castplayer.cc.app_id is not None
+ and castplayer.cc.app_id != pychromecast.IDLE_APP_ID
+ )
castplayer.player.volume_level = int(status.volume_level * 100)
castplayer.player.volume_muted = status.volume_muted
- if castplayer.is_stereo_pair:
- castplayer.player.type = PlayerType.PLAYER
+
+ # handle stereo pairs
+ if castplayer.cast_info.is_multichannel_group:
+ castplayer.player.type = PlayerType.STEREO_PAIR
+ castplayer.player.group_childs = []
+ # handle cast groups
+ elif castplayer.cast_info.is_audio_group:
+ castplayer.player.type = PlayerType.GROUP
+ castplayer.player.group_childs = [
+ str(UUID(x)) for x in castplayer.mz_controller.members
+ ]
+ castplayer.player.supported_features = (
+ PlayerFeature.POWER,
+ PlayerFeature.VOLUME_SET,
+ )
+ # send update to player manager
self.mass.loop.call_soon_threadsafe(self.mass.players.update, castplayer.player_id)
def on_new_media_status(self, castplayer: CastPlayer, status: MediaStatus):
castplayer.player.available = new_available
castplayer.player.device_info = DeviceInfo(
model=castplayer.cast_info.model_name,
- address=castplayer.cast_info.host,
+ address=f"{castplayer.cast_info.host}:{castplayer.cast_info.port}",
manufacturer=castplayer.cast_info.manufacturer,
)
self.mass.loop.call_soon_threadsafe(self.mass.players.update, castplayer.player_id)
group_media_controller = self.mz_mgr.get_multizone_mediacontroller(group_uuid)
if not group_media_controller:
continue
- self.on_multizone_new_media_status(
- castplayer, group_uuid, group_media_controller.status
- )
-
- def on_multizone_new_media_status(
- self, castplayer: CastPlayer, group_uuid: UUID, media_status: MediaStatus # noqa: ARG002
- ):
- """Handle updates of audio group media status."""
- castplayer.logger.debug("Received multizone media status update")
- # self.mz_media_status[group_uuid] = media_status
- # self.mz_media_status_received[group_uuid] = dt_util.utcnow()
- # self.schedule_update_ha_state()
### Helpers / utils
async def _launch_app(self, castplayer: CastPlayer) -> None:
"""Launch the default Media Receiver App on a Chromecast."""
event = asyncio.Event()
+ if use_alt_app := self.mass.config.get_player_config_value(
+ castplayer.player_id, CONF_ALT_APP
+ ).value:
+ app_id = pychromecast.config.APP_BUBBLEUPNP
+ else:
+ app_id = pychromecast.config.APP_MEDIA_RECEIVER
+
+ if castplayer.cc.app_id == app_id:
+ return # already active
def launched_callback():
self.mass.loop.call_soon_threadsafe(event.set)
def launch():
- # controller = BubbleUPNPController()
- # castplayer.cc.register_handler(controller)
- # controller.launch(launched_callback)
- castplayer.cc.media_controller.launch(launched_callback)
+ # Quit the previous app before starting splash screen or media player
+ if castplayer.cc.app_id is not None:
+ castplayer.cc.quit_app()
+ # Use BubbleUPNP media receiver app if configured
+ # which enables a more rich display but does not work on all players
+ # so its configurable to turn it on/off
+ if use_alt_app:
+ castplayer.logger.debug(
+ "Launching BubbleUPNPController (%s) as active app.", app_id
+ )
+ controller = BubbleUPNPController()
+ castplayer.cc.register_handler(controller)
+ controller.launch(launched_callback)
+ else:
+ castplayer.logger.debug(
+ "Launching Default Media Receiver (%s) as active app.", app_id
+ )
+ castplayer.cc.media_controller.launch(launched_callback)
- castplayer.logger.debug("Launching BubbleUPNPController as active app.")
await self.mass.loop.run_in_executor(None, launch)
await event.wait()
"""Helpers to deal with Cast devices."""
from __future__ import annotations
+import urllib.error
from dataclasses import dataclass
from typing import TYPE_CHECKING, Self
from uuid import UUID
from pychromecast import dial
from pychromecast.const import CAST_TYPE_GROUP
+from zeroconf import ServiceInfo
+
+from music_assistant.constants import CONF_HIDE_GROUP_CHILDS
if TYPE_CHECKING:
from pychromecast.controllers.media import MediaStatus
cast_type: str | None = None
manufacturer: str | None = None
is_dynamic_group: bool | None = None
+ is_multichannel_group: bool = False # group created for e.g. stereo pair
+ is_multichannel_child: bool = False # speaker that is part of multichannel setup
@property
def is_audio_group(self) -> bool:
self.cast_type = cast_info.cast_type
self.manufacturer = cast_info.manufacturer
- if not self.is_audio_group or self.is_dynamic_group is not None:
- # We have all information, no need to check HTTP API.
- return
-
# Fill out missing group information via HTTP API.
- http_group_status = dial.get_multizone_status(
+ dynamic_groups, multichannel_groups = get_multizone_info(self.services, zconf)
+ self.is_dynamic_group = self.uuid in dynamic_groups
+ if self.uuid in multichannel_groups:
+ self.is_multichannel_group = True
+ elif multichannel_groups:
+ self.is_multichannel_child = True
+
+
+def get_multizone_info(services: list[ServiceInfo], zconf: Zeroconf, timeout=30):
+ """Get multizone info from eureka endpoint."""
+ dynamic_groups: set[str] = set()
+ multichannel_groups: set[str] = set()
+ try:
+ _, status = dial._get_status(
+ services,
+ zconf,
+ "/setup/eureka_info?params=multizone",
+ True,
+ timeout,
None,
- services=self.services,
- zconf=zconf,
)
- if http_group_status is not None:
- self.is_dynamic_group = any(
- g.uuid == self.uuid for g in http_group_status.dynamic_groups
- )
+ if "multizone" in status and "dynamic_groups" in status["multizone"]:
+ for group in status["multizone"]["dynamic_groups"]:
+ if udn := group.get("uuid"):
+ uuid = UUID(udn.replace("-", ""))
+ dynamic_groups.add(uuid)
+
+ if "multizone" in status and "groups" in status["multizone"]:
+ for group in status["multizone"]["groups"]:
+ if group["multichannel_group"] and (udn := group.get("uuid")):
+ uuid = UUID(udn.replace("-", ""))
+ multichannel_groups.add(uuid)
+ except (urllib.error.HTTPError, urllib.error.URLError, OSError, ValueError):
+ pass
+ return (dynamic_groups, multichannel_groups)
class CastStatusListener:
def new_cast_status(self, status: CastStatus) -> None:
"""Handle updated CastStatus."""
- if self._valid:
- self.prov.on_new_cast_status(self.castplayer, status)
+ if not self._valid:
+ return
+ self.prov.on_new_cast_status(self.castplayer, status)
def new_media_status(self, status: MediaStatus) -> None:
"""Handle updated MediaStatus."""
- if self._valid:
- self.prov.on_new_media_status(self.castplayer, status)
+ if not self._valid:
+ return
+ self.prov.on_new_media_status(self.castplayer, status)
def new_connection_status(self, status: ConnectionStatus) -> None:
"""Handle updated ConnectionStatus."""
- if self._valid:
- self.prov.on_new_connection_status(self.castplayer, status)
+ if not self._valid:
+ return
+ self.prov.on_new_connection_status(self.castplayer, status)
- @staticmethod
- def added_to_multizone(group_uuid):
+ def added_to_multizone(self, group_uuid):
"""Handle the cast added to a group."""
- print("##### added_to_multizone: %s" % group_uuid)
+ self.prov.logger.debug(
+ "%s is added to multizone: %s", self.castplayer.player.display_name, group_uuid
+ )
+ if group_player := self.prov.castplayers.get(group_uuid):
+ hide_group_childs = self.prov.mass.config.get_player_config_value(
+ group_player.player_id, CONF_HIDE_GROUP_CHILDS
+ ).value
+ if hide_group_childs == "always":
+ self.castplayer.player.hidden_by.add(group_uuid)
def removed_from_multizone(self, group_uuid):
"""Handle the cast removed from a group."""
- if self._valid:
- # self._cast_device.multizone_new_media_status(group_uuid, None)
- print("##### removed_from_multizone: %s" % group_uuid)
+ if not self._valid:
+ return
+ if group_uuid in self.castplayer.player.hidden_by:
+ self.castplayer.player.hidden_by.remove(group_uuid)
+ self.prov.logger.debug(
+ "%s is removed from multizone: %s", self.castplayer.player.display_name, group_uuid
+ )
def multizone_new_cast_status(self, group_uuid, cast_status): # noqa: ARG002
"""Handle reception of a new CastStatus for a group."""
- print("##### multizone_new_cast_status: %s" % group_uuid)
+ if group_player := self.prov.castplayers.get(group_uuid):
+ hide_group_childs = self.prov.mass.config.get_player_config_value(
+ group_uuid, CONF_HIDE_GROUP_CHILDS
+ ).value
+ if hide_group_childs == "always":
+ self.castplayer.player.hidden_by.add(group_uuid)
+ if group_player.cc.media_controller.is_active:
+ self.castplayer.active_group = group_uuid
+ if hide_group_childs == "active":
+ self.castplayer.player.hidden_by.add(group_uuid)
+ elif group_uuid == self.castplayer.active_group:
+ self.castplayer.active_group = None
+ if hide_group_childs != "always" and group_uuid in self.castplayer.player.hidden_by:
+ self.castplayer.player.hidden_by.remove(group_uuid)
+ self.prov.logger.debug(
+ "%s got new cast status for group: %s", self.castplayer.player.display_name, group_uuid
+ )
def multizone_new_media_status(self, group_uuid, media_status): # noqa: ARG002
"""Handle reception of a new MediaStatus for a group."""
- if self._valid:
- # self._cast_device.multizone_new_media_status(group_uuid, media_status)
- print("##### multizone_new_media_status: %s" % group_uuid)
+ if not self._valid:
+ return
+ self.prov.logger.debug(
+ "%s got new media_status for group: %s", self.castplayer.player.display_name, group_uuid
+ )
def invalidate(self):
"""