import asyncio
import logging
import uuid
-from typing import Callable, Dict, List, Optional, Set
+from typing import List, Optional
import pychromecast
from music_assistant.helpers.compare import compare_strings
self._available = False
self._status_listener: Optional[CastStatusListener] = None
self._is_speaker_group = False
+ self._command_busy = False
@property
def player_id(self) -> str:
self._chromecast.media_controller.is_active
and self.cast_status.app_id == pychromecast.APP_MEDIA_RECEIVER
)
- # return self._chromecast is not None and self._chromecast.is_idle
- # return (
- # self._chromecast.media_controller.app_id
- # == pychromecast.config.APP_MEDIA_RECEIVER
- # )
# Chromecast does not support power so we (ab)use mute instead
if self._chromecast.media_controller.is_active:
"Player %s is not available, command can't be executed", self.name
)
return
- await self.mass.loop.run_in_executor(None, func, *args)
-
-
-class ChromecastDynamicGroupChild(ChromecastPlayer):
- """Chromecast Group player object."""
-
- def __init__(
- self, mass: MusicAssistant, cast_info: ChromecastInfo, update_callback: Callable
- ) -> None:
- """Initialize the cast device."""
- super().__init__(mass, cast_info)
- self._update_callback = update_callback
-
- def update_state(self) -> None:
- """Call when state updates."""
- return self._update_callback()
-
- @property
- def child_count(self) -> int:
- """Return number of child players."""
- return len(self.group_childs)
-
-
-class ChromecastDynamicGroupPlayer(Player):
- """Representation of a Chromecast group player.
-
- Tries to workaround the limitation of not being able to manage group members
- by dynamically selecting the right underlying cast player.
- Kind of a hack at the moment, work in progress.
- """
-
- def __init__(self, mass: MusicAssistant, player_id: str, name: str) -> None:
- """Initialize the cast device."""
- super().__init__()
- self.mass = mass
- self._player_id = player_id
- self._name = name
- self._cast_players: Dict[str, ChromecastDynamicGroupChild] = {}
- self._active_player: Optional[ChromecastDynamicGroupChild] = None
-
- @property
- def player_id(self) -> str:
- """Return player id of this player."""
- return self._player_id
-
- @property
- def provider_id(self) -> str:
- """Return provider id of this player."""
- return PROV_ID
-
- @property
- def name(self) -> str:
- """Return name of this player."""
- return self._name
-
- @property
- def powered(self) -> bool:
- """Return power state of this player."""
- return self._active_player is not None
-
- @property
- def should_poll(self) -> bool:
- """Return bool if this player needs to be polled for state changes."""
- return self.state == PlayerState.PLAYING
-
- @property
- def state(self) -> PlayerState:
- """Return the state of the player."""
- active_player = self._active_player
- if active_player is not None:
- return active_player.state
- return PlayerState.IDLE
-
- @property
- def elapsed_time(self) -> int:
- """Return position of current playing media in seconds."""
- active_player = self._active_player
- if active_player is not None:
- return active_player.elapsed_time
- return 0
-
- @property
- def available(self) -> bool:
- """Return availablity state of this player."""
- for player in self._cast_players.values():
- if player.available:
- return True
- return False
-
- @property
- def current_uri(self) -> str:
- """Return current_uri of this player."""
- active_player = self._active_player
- if active_player is not None:
- return active_player.current_uri
- return None
-
- @property
- def group_childs(self) -> Set[str]:
- """Return group_childs."""
- all_childs = set()
- for player in self._cast_players.values():
- for child in player.group_childs:
- all_childs.add(child)
- return all_childs
-
- @property
- def device_info(self) -> DeviceInfo:
- """Return deviceinfo."""
- active_player = self._active_player
- if active_player is not None:
- return active_player.device_info
- return DeviceInfo(
- model="Cast Group",
- address="",
- manufacturer="Google",
- )
-
- @property
- def volume_level(self) -> int:
- """Return volume_level of this player."""
- active_player = self._active_player
- if active_player is not None:
- return active_player.volume_level
- return 0
-
- @property
- def muted(self) -> bool:
- """Return mute state of this player."""
- active_player = self._active_player
- if active_player is not None:
- return active_player.muted
- return False
-
- @property
- def is_group_player(self) -> bool:
- """Return if this player is a group player."""
- return True
-
- @property
- def features(self) -> List[PlayerFeature]:
- """Return list of features this player supports."""
- return PLAYER_FEATURES
-
- @property
- def config_entries(self) -> List[ConfigEntry]:
- """Return player specific config entries (if any)."""
- return PLAYER_CONFIG_ENTRIES
-
- def set_cast_info(self, cast_info: ChromecastInfo) -> None:
- """Set (or update) the cast discovery info."""
- if cast_info.uuid not in self._cast_players:
- player = self._cast_players[cast_info.uuid] = ChromecastDynamicGroupChild(
- self.mass, cast_info, self.on_child_update
- )
- create_task(player.on_add())
- self._cast_players[cast_info.uuid].set_cast_info(cast_info)
- self._active_player = self.get_active_player()
-
- def get_active_player(self) -> Optional[ChromecastDynamicGroupChild]:
- """Return the currently active group player."""
- for player in self._cast_players.values():
- if player.powered:
- return player
- return None
-
- def select_active_player(self) -> Optional[ChromecastDynamicGroupChild]:
- """Select the most suitable player at this time to handle playback."""
- required_players = set()
- for player_id in self.group_childs:
- player = self.mass.players.get_player(player_id)
- if player and player.calculated_state.powered:
- required_players.add(player_id)
- if not required_players:
- return None
- # look for a group which has all required players
- for group_player in sorted(
- self._cast_players.values(), key=lambda x: x.child_count
- ):
- if required_players.issubset(group_player.group_childs):
- return group_player
- return None
-
- def on_child_update(self) -> None:
- """Call when child group player updates."""
- create_task(self.on_poll())
-
- async def on_remove(self) -> None:
- """Call when player is removed from the player manager."""
- for player in self._cast_players.values():
- await player.on_remove()
-
- async def cmd_stop(self) -> None:
- """Send stop command to player."""
- if self._active_player:
- await self._active_player.cmd_stop()
-
- async def cmd_play(self) -> None:
- """Send play command to player."""
- if self._active_player:
- await self._active_player.cmd_play()
-
- async def cmd_pause(self) -> None:
- """Send pause command to player."""
- if self._active_player:
- await self._active_player.cmd_pause()
-
- async def cmd_next(self) -> None:
- """Send next track command to player."""
- if self._active_player:
- await self._active_player.cmd_next()
-
- async def cmd_previous(self) -> None:
- """Send previous track command to player."""
- if self._active_player:
- await self._active_player.cmd_next()
-
- async def cmd_power_on(self) -> None:
- """Send power ON command to player."""
- self._active_player = self.select_active_player()
- await self._active_player.cmd_power_on()
-
- async def cmd_power_off(self) -> None:
- """Send power OFF command to player."""
- if self._active_player:
- await self._active_player.cmd_power_off()
-
- async def cmd_play_uri(self, uri: str) -> None:
- """Play single uri on player."""
- self._active_player = self.select_active_player()
- await self._active_player.cmd_play_uri(uri)
-
- async def cmd_queue_load(
- self, queue_items: List[QueueItem], repeat: bool = False
- ) -> None:
- """Load (overwrite) queue with new items."""
- self._active_player = self.select_active_player()
- await self._active_player.cmd_queue_load(queue_items, repeat)
-
- async def cmd_queue_append(self, queue_items: List[QueueItem]) -> None:
- """Append new items at the end of the queue."""
- self._active_player = self.select_active_player()
- await self._active_player.cmd_queue_append(queue_items)
-
- async def on_poll(self):
- """Call when player is polled by player manager."""
- self._active_player = self.get_active_player()
- if self._active_player is not None:
- required_player = self.select_active_player()
- if required_player and self._active_player != required_player:
- # active player changed, we need to transfer the queue
- if (
- self._active_player
- and self._active_player.state != PlayerState.IDLE
- ):
- player_queue = self.mass.players.get_player_queue(self.player_id)
- await player_queue.stop()
- self._active_player = required_player
- await player_queue.resume()
- self._active_player = required_player
- await super().on_poll()
+ while self._command_busy:
+ await asyncio.sleep(0.05)
+ try:
+ # Sending multiple commands at the same time to the cast socket
+ # will make things unstable, make sure to throttle it.
+ self._command_busy = True
+ await self.mass.loop.run_in_executor(None, func, *args)
+ finally:
+ self._command_busy = False