"""Logic to play music from MusicProviders to supported players."""
from __future__ import annotations
-from typing import TYPE_CHECKING, Dict, Tuple, Union
+from typing import TYPE_CHECKING, Dict, Tuple
from music_assistant.models.enums import EventType
from music_assistant.models.errors import AlreadyRegisteredError
from music_assistant.models.event import MassEvent
-from music_assistant.models.player import Player, PlayerGroup
+from music_assistant.models.player import Player
from music_assistant.models.player_queue import PlayerQueue
if TYPE_CHECKING:
from music_assistant.mass import MusicAssistant
-PlayerType = Union[Player, PlayerGroup]
DB_TABLE = "queue_settings"
"""Initialize class."""
self.mass = mass
self.logger = mass.logger.getChild("players")
- self._players: Dict[str, PlayerType] = {}
+ self._players: Dict[str, Player] = {}
self._player_queues: Dict[str, PlayerQueue] = {}
async def setup(self) -> None:
self._player_queues.pop(queue_id)
@property
- def players(self) -> Tuple[PlayerType]:
+ def players(self) -> Tuple[Player]:
"""Return all available players."""
return tuple(x for x in self._players.values() if x.available)
def get_player(
self, player_id: str, include_unavailable: bool = False
- ) -> PlayerType | None:
+ ) -> Player | None:
"""Return Player by player_id or None if not found/unavailable."""
if player := self._players.get(player_id):
if player.available or include_unavailable:
"""Return PlayerQueue by id or None if not found/unavailable."""
return self._player_queues.get(queue_id)
- def get_player_by_name(self, name: str) -> PlayerType | None:
+ def get_player_by_name(self, name: str) -> Player | None:
"""Return Player by name or None if no match is found."""
return next((x for x in self._players.values() if x.name == name), None)
- async def register_player(self, player: PlayerType) -> None:
+ async def register_player(self, player: Player) -> None:
"""Register a new player on the controller."""
if self.mass.closed:
return
"""Models and helpers for a player."""
from __future__ import annotations
-import asyncio
from abc import ABC
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Tuple
}
-class PlayerGroup(Player):
- """Convenience Model for a player group with some additional helper methods."""
+### Some convenience help functions below
- _attr_is_group: bool = True
- _attr_group_childs: List[str] = []
-
- @property
- def volume_level(self) -> int:
- """Return current volume level of player (scale 0..100)."""
- if not self.available:
- return 0
- # calculate group volume from powered players for convenience
- # may be overridden if implementation provides this natively
- group_volume = 0
- active_players = 0
- for child_player in self._get_child_players(True):
- group_volume += child_player.volume_level
- active_players += 1
- if active_players:
- group_volume = group_volume / active_players
- return int(group_volume)
- @property
- def elapsed_time(self) -> float:
- """Return the corrected/precise elsapsed time of the grouped player."""
- if not self.use_multi_stream:
- return super().elapsed_time
- # calculate from group childs
- for child_player in self._get_child_players(True):
- if not child_player.current_url:
- continue
- if self.player_id not in child_player.current_url:
- continue
- if child_player.state not in [PlayerState.PLAYING, PlayerState.PAUSED]:
- continue
- return child_player.elapsed_time
+def get_group_volume(group_player: Player) -> int:
+ """Calculate volume level of group player's childs."""
+ if not group_player.available:
return 0
-
- @property
- def state(self) -> PlayerState:
- """Return the state of the grouped player."""
- if not self.use_multi_stream:
- return super().state
- # calculate from group childs
- for child_player in self._get_child_players(True):
- if not child_player.current_url:
- continue
- if self.player_id not in child_player.current_url:
+ group_volume = 0
+ active_players = 0
+ for child_player in get_child_players(group_player, True):
+ group_volume += child_player.volume_level
+ active_players += 1
+ if active_players:
+ group_volume = group_volume / active_players
+ return int(group_volume)
+
+
+def get_child_players(
+ group_player: Player, only_powered: bool = False, only_playing: bool = False
+) -> List[Player]:
+ """Get players attached to a grouped player."""
+ if not group_player.mass:
+ return []
+ child_players = []
+ for child_id in group_player.group_childs:
+ if child_player := group_player.mass.players.get_player(child_id):
+ if not (not only_powered or child_player.powered):
continue
- if child_player.state not in [PlayerState.PLAYING, PlayerState.PAUSED]:
+ if not (not only_playing or child_player.state == PlayerState.PLAYING):
continue
- return child_player.state
- return super().state
-
- @property
- def current_url(self) -> str:
- """Return the current_url of the grouped player."""
- if not self.use_multi_stream:
- return super().current_url
- # calculate from group childs
- for child_player in self._get_child_players(True):
- if not child_player.current_url:
- continue
- if self.player_id not in child_player.current_url:
- continue
- return child_player.current_url
- return super().current_url
-
- @property
- def supported_content_types(self) -> Tuple[ContentType]:
- """Return the content types this player supports."""
- # return contenttypes that are supported by all child players
- return tuple(
- content_type
- for content_type in ContentType
- if all(
- (
- content_type in child_player.supported_content_types
- for child_player in self._get_child_players(False, False)
- )
- )
- )
-
- @property
- def supported_sample_rates(self) -> Tuple[int]:
- """Return the sample rates this player supports."""
- return tuple(
- sample_rate
- for sample_rate in DEFAULT_SUPPORTED_SAMPLE_RATES
- if all(
- (
- sample_rate in child_player.supported_sample_rates
- for child_player in self._get_child_players(False, False)
- )
- )
- )
-
- async def stop(self) -> None:
- """Send STOP command to player."""
- if not self.use_multi_stream:
- return await super().stop()
- # redirect command to all child players
- await asyncio.gather(*[x.stop() for x in self._get_child_players(True)])
-
- async def play(self) -> None:
- """Send PLAY/UNPAUSE command to player."""
- if not self.use_multi_stream:
- return await super().play()
- # redirect command to all child players
- await asyncio.gather(*[x.play() for x in self._get_child_players(True)])
-
- async def pause(self) -> None:
- """Send PAUSE command to player."""
- if not self.use_multi_stream:
- return await super().pause()
- # redirect command to all child players
- await asyncio.gather(*[x.pause() for x in self._get_child_players(True)])
-
- async def power(self, powered: bool) -> None:
- """Send POWER command to player."""
- if self.use_multi_stream:
- # redirect command to all child players
- await asyncio.gather(
- *[x.power(powered) for x in self._get_child_players(True)]
- )
- else:
- return await super().power(powered)
-
- async def volume_set(self, volume_level: int) -> None:
- """Send volume level (0..100) command to player."""
- # handle group volume by only applying the valume to powered childs
- # may be overridden if implementation provides this natively
- cur_volume = self.volume_level
- new_volume = volume_level
- volume_dif = new_volume - cur_volume
- if cur_volume == 0:
- volume_dif_percent = 1 + (new_volume / 100)
- else:
- volume_dif_percent = volume_dif / cur_volume
- for child_player in self._get_child_players(True):
- cur_child_volume = child_player.volume_level
- new_child_volume = cur_child_volume + (
- cur_child_volume * volume_dif_percent
- )
- await child_player.volume_set(new_child_volume)
-
- def _get_child_players(
- self, only_powered: bool = False, only_playing: bool = False
- ) -> List[Player]:
- """Get players attached to this group."""
- if not self.mass:
- return []
- child_players = []
- for child_id in self.group_childs:
- if child_player := self.mass.players.get_player(child_id):
- if not (not only_powered or child_player.powered):
- continue
- if not (not only_playing or child_player.state == PlayerState.PLAYING):
- continue
- child_players.append(child_player)
- return child_players
-
- def on_child_update(self, player_id: str, changed_keys: set) -> None:
- """Call when one of the child players of a playergroup updates."""
- self.update_state(True)
-
- # convenience helper:
- # power off group player if last child player turns off
- if "powered" not in changed_keys or not self.active_queue.active:
- return
- powered_childs = set()
- for child_id in self._attr_group_childs:
- if player := self.mass.players.get_player(child_id):
- if player.powered:
- powered_childs.add(child_id)
- if self.powered and len(powered_childs) == 0:
-
- async def auto_turn_off_group():
- await self.active_queue.stop()
- await self.power(False)
-
- self.mass.create_task(auto_turn_off_group())
+ child_players.append(child_player)
+ return child_players
+
+
+async def set_group_volume(group_player: Player, volume_level: int) -> None:
+ """Send volume level (0..100) command to groupplayer's child."""
+ # handle group volume by only applying the valume to powered childs
+ cur_volume = group_player.volume_level
+ new_volume = volume_level
+ volume_dif = new_volume - cur_volume
+ if cur_volume == 0:
+ volume_dif_percent = 1 + (new_volume / 100)
+ else:
+ volume_dif_percent = volume_dif / cur_volume
+ for child_player in get_child_players(group_player, True):
+ cur_child_volume = child_player.volume_level
+ new_child_volume = cur_child_volume + (cur_child_volume * volume_dif_percent)
+ await child_player.volume_set(new_child_volume)
from music_assistant.models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant.models.event import MassEvent
-from .player import Player, PlayerGroup, PlayerState
+from .player import Player, PlayerState, get_child_players
from .queue_item import QueueItem
from .queue_settings import QueueSettings
return self._settings
@property
- def player(self) -> Player | PlayerGroup:
+ def player(self) -> Player:
"""Return the player attached to this queue."""
return self.mass.players.get_player(self.queue_id, include_unavailable=True)
self, start_index: int, seek_position: int, fade_in: bool, passive: bool = False
) -> None:
"""Start the queue stream runner."""
- players: List[Player] = []
output_format = self._settings.stream_type
- # if multi stream is enabled, all child players should receive the same audio stream
if self.player.use_multi_stream:
- for child_id in self.player.group_childs:
- child_player = self.mass.players.get_player(child_id)
- if not child_player or not child_player.powered:
- continue
- players.append(child_player)
+ # if multi stream is enabled, all child players should receive the same audio stream
+ expected_clients = len(get_child_players(self.player, True))
else:
- # regular (single player) request
- players.append(self.player)
+ expected_clients = 1
self._current_item_elapsed_time = 0
self._current_index = start_index
# start the queue stream background task
stream = await self.mass.streams.start_queue_stream(
queue=self,
- expected_clients=len(players),
+ expected_clients=expected_clients,
start_index=start_index,
seek_position=seek_position,
fade_in=fade_in,
self._stream_id = stream.stream_id
# execute the play command on the player(s)
if not passive:
- await asyncio.gather(*[x.play_url(stream.url) for x in players])
+ await self.player.play_url(stream.url)
def get_next_index(self, cur_index: Optional[int]) -> int:
"""Return the next index for the queue, accounting for repeat settings."""