import asyncio
import logging
import uuid
-from typing import List, Optional
+from typing import Callable, Dict, List, Optional, Set
import pychromecast
from music_assistant.helpers.compare import compare_strings
async def cmd_play_uri(self, uri: str) -> None:
"""Play single uri on player."""
- player_queue = self.mass.players.get_player_queue(self.player_id)
- if player_queue.use_queue_stream:
- # create (fake) CC queue so that skip and previous will work
- queue_item = QueueItem(
- item_id=uri, provider="mass", name="Music Assistant", stream_url=uri
- )
- await self.cmd_queue_load([queue_item, queue_item])
- else:
- await self.chromecast_command(
- self._chromecast.play_media, uri, "audio/flac"
- )
+ # create (fake) CC queue so that skip and previous will work
+ queue_item = QueueItem(
+ item_id=uri, provider="mass", name="Music Assistant", stream_url=uri
+ )
+ await self.cmd_queue_load([queue_item, queue_item], True)
- async def cmd_queue_load(self, queue_items: List[QueueItem]) -> None:
+ async def cmd_queue_load(
+ self, queue_items: List[QueueItem], repeat: bool = False
+ ) -> None:
"""Load (overwrite) queue with new items."""
- player_queue = self.mass.players.get_player_queue(self.player_id)
cc_queue_items = self.__create_queue_items(queue_items[:25])
- repeat_enabled = player_queue.use_queue_stream or player_queue.repeat_enabled
queuedata = {
"type": "QUEUE_LOAD",
- "repeatMode": "REPEAT_ALL" if repeat_enabled else "REPEAT_OFF",
+ "repeatMode": "REPEAT_ALL" if repeat else "REPEAT_OFF",
"shuffle": False, # handled by our queue controller
"queueType": "PLAYLIST",
"startIndex": 0, # Item index to play after this request or keep same item if undefined
)
return
await self.mass.loop.run_in_executor(None, func, *args)
+
+
+class ChromecastDynamicGroupChild(ChromecastPlayer):
+ """Chromecast Group player object."""
+
+ def __init__(
+ self, mass: MusicAssistant, cast_info: ChromecastInfo, update_callback: Callable
+ ) -> None:
+ """Initialize the cast device."""
+ super().__init__(mass, cast_info)
+ self._update_callback = update_callback
+
+ def update_state(self) -> None:
+ """Call when state updates."""
+ return self._update_callback()
+
+ @property
+ def child_count(self) -> int:
+ """Return number of child players."""
+ return len(self.group_childs)
+
+
+class ChromecastDynamicGroupPlayer(Player):
+ """Representation of a Chromecast group player.
+
+ Tries to workaround the limitation of not being able to manage group members
+ by dynamically selecting the right underlying cast player.
+ Kind of a hack at the moment, work in progress.
+ """
+
+ def __init__(self, mass: MusicAssistant, player_id: str, name: str) -> None:
+ """Initialize the cast device."""
+ super().__init__()
+ self.mass = mass
+ self._player_id = player_id
+ self._name = name
+ self._cast_players: Dict[str, ChromecastDynamicGroupChild] = {}
+ self._active_player: Optional[ChromecastDynamicGroupChild] = None
+
+ @property
+ def player_id(self) -> str:
+ """Return player id of this player."""
+ return self._player_id
+
+ @property
+ def provider_id(self) -> str:
+ """Return provider id of this player."""
+ return PROV_ID
+
+ @property
+ def name(self) -> str:
+ """Return name of this player."""
+ return self._name
+
+ @property
+ def powered(self) -> bool:
+ """Return power state of this player."""
+ return self._active_player is not None
+
+ @property
+ def should_poll(self) -> bool:
+ """Return bool if this player needs to be polled for state changes."""
+ return self.state == PlayerState.PLAYING
+
+ @property
+ def state(self) -> PlayerState:
+ """Return the state of the player."""
+ active_player = self._active_player
+ if active_player is not None:
+ return active_player.state
+ return PlayerState.IDLE
+
+ @property
+ def elapsed_time(self) -> int:
+ """Return position of current playing media in seconds."""
+ active_player = self._active_player
+ if active_player is not None:
+ return active_player.elapsed_time
+ return 0
+
+ @property
+ def available(self) -> bool:
+ """Return availablity state of this player."""
+ for player in self._cast_players.values():
+ if player.available:
+ return True
+ return False
+
+ @property
+ def current_uri(self) -> str:
+ """Return current_uri of this player."""
+ active_player = self._active_player
+ if active_player is not None:
+ return active_player.current_uri
+ return None
+
+ @property
+ def group_childs(self) -> Set[str]:
+ """Return group_childs."""
+ all_childs = set()
+ for player in self._cast_players.values():
+ for child in player.group_childs:
+ all_childs.add(child)
+ return all_childs
+
+ @property
+ def device_info(self) -> DeviceInfo:
+ """Return deviceinfo."""
+ active_player = self._active_player
+ if active_player is not None:
+ return active_player.device_info
+ return DeviceInfo(
+ model="Cast Group",
+ address="",
+ manufacturer="Google",
+ )
+
+ @property
+ def volume_level(self) -> int:
+ """Return volume_level of this player."""
+ active_player = self._active_player
+ if active_player is not None:
+ return active_player.volume_level
+ return 0
+
+ @property
+ def muted(self) -> bool:
+ """Return mute state of this player."""
+ active_player = self._active_player
+ if active_player is not None:
+ return active_player.muted
+ return False
+
+ @property
+ def is_group_player(self) -> bool:
+ """Return if this player is a group player."""
+ return True
+
+ @property
+ def features(self) -> List[PlayerFeature]:
+ """Return list of features this player supports."""
+ return PLAYER_FEATURES
+
+ @property
+ def config_entries(self) -> List[ConfigEntry]:
+ """Return player specific config entries (if any)."""
+ return PLAYER_CONFIG_ENTRIES
+
+ def set_cast_info(self, cast_info: ChromecastInfo) -> None:
+ """Set (or update) the cast discovery info."""
+ if cast_info.uuid not in self._cast_players:
+ player = self._cast_players[cast_info.uuid] = ChromecastDynamicGroupChild(
+ self.mass, cast_info, self.on_child_update
+ )
+ create_task(player.on_add())
+ self._cast_players[cast_info.uuid].set_cast_info(cast_info)
+ self._active_player = self.get_active_player()
+
+ def get_active_player(self) -> Optional[ChromecastDynamicGroupChild]:
+ """Return the currently active group player."""
+ for player in self._cast_players.values():
+ if player.powered:
+ return player
+ return None
+
+ def select_active_player(self) -> Optional[ChromecastDynamicGroupChild]:
+ """Select the most suitable player at this time to handle playback."""
+ required_players = set()
+ for player_id in self.group_childs:
+ player = self.mass.players.get_player(player_id)
+ if player and player.calculated_state.powered:
+ required_players.add(player_id)
+ if not required_players:
+ return None
+ # look for a group which has all required players
+ for group_player in sorted(
+ self._cast_players.values(), key=lambda x: x.child_count
+ ):
+ if required_players.issubset(group_player.group_childs):
+ return group_player
+ return None
+
+ def on_child_update(self) -> None:
+ """Call when child group player updates."""
+ create_task(self.on_poll())
+
+ async def on_remove(self) -> None:
+ """Call when player is removed from the player manager."""
+ for player in self._cast_players.values():
+ await player.on_remove()
+
+ async def cmd_stop(self) -> None:
+ """Send stop command to player."""
+ if self._active_player:
+ await self._active_player.cmd_stop()
+
+ async def cmd_play(self) -> None:
+ """Send play command to player."""
+ if self._active_player:
+ await self._active_player.cmd_play()
+
+ async def cmd_pause(self) -> None:
+ """Send pause command to player."""
+ if self._active_player:
+ await self._active_player.cmd_pause()
+
+ async def cmd_next(self) -> None:
+ """Send next track command to player."""
+ if self._active_player:
+ await self._active_player.cmd_next()
+
+ async def cmd_previous(self) -> None:
+ """Send previous track command to player."""
+ if self._active_player:
+ await self._active_player.cmd_next()
+
+ async def cmd_power_on(self) -> None:
+ """Send power ON command to player."""
+ self._active_player = self.select_active_player()
+ await self._active_player.cmd_power_on()
+
+ async def cmd_power_off(self) -> None:
+ """Send power OFF command to player."""
+ if self._active_player:
+ await self._active_player.cmd_power_off()
+
+ async def cmd_play_uri(self, uri: str) -> None:
+ """Play single uri on player."""
+ self._active_player = self.select_active_player()
+ await self._active_player.cmd_play_uri(uri)
+
+ async def cmd_queue_load(
+ self, queue_items: List[QueueItem], repeat: bool = False
+ ) -> None:
+ """Load (overwrite) queue with new items."""
+ self._active_player = self.select_active_player()
+ await self._active_player.cmd_queue_load(queue_items, repeat)
+
+ async def cmd_queue_append(self, queue_items: List[QueueItem]) -> None:
+ """Append new items at the end of the queue."""
+ self._active_player = self.select_active_player()
+ await self._active_player.cmd_queue_append(queue_items)
+
+ async def on_poll(self):
+ """Call when player is polled by player manager."""
+ required_player = self.select_active_player()
+ if self._active_player != required_player and required_player:
+ # active player changed, we need to transfer the queue
+ if self._active_player and self._active_player.state != PlayerState.IDLE:
+ player_queue = self.mass.players.get_player_queue(self.player_id)
+ await player_queue.stop()
+ self._active_player = required_player
+ await player_queue.resume()
+ self._active_player = required_player
+ await super().on_poll()