"""Models and helpers for a player."""
from __future__ import annotations
+import asyncio
from abc import ABC
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Tuple
from .player_queue import PlayerQueue
-DEFAULT_SUPPORTED_CONTENT_TYPES = (
- # if a player does not report/set its supported content types, we use a pretty safe default
- ContentType.FLAC,
- ContentType.MP3,
- ContentType.WAV,
- ContentType.PCM_S16LE,
- ContentType.PCM_S24LE,
-)
-
-DEFAULT_SUPPORTED_SAMPLE_RATES = (
- # if a player does not report/set its supported sample rates, we use a pretty safe default
- 44100,
- 48000,
- 88200,
- 96000,
-)
-
-
@dataclass(frozen=True)
class DeviceInfo(DataClassDictMixin):
"""Model for a player's deviceinfo."""
"""Model for a music player."""
player_id: str
- _attr_is_group: bool = False
- _attr_group_childs: List[str] = []
+ _attr_group_members: List[str] = []
_attr_name: str = ""
_attr_powered: bool = False
_attr_elapsed_time: float = 0
_attr_volume_level: int = 100
_attr_volume_muted: bool = False
_attr_device_info: DeviceInfo = DeviceInfo()
- _attr_supported_content_types: Tuple[ContentType] = DEFAULT_SUPPORTED_CONTENT_TYPES
- _attr_supported_sample_rates: Tuple[int] = DEFAULT_SUPPORTED_SAMPLE_RATES
- _attr_active_queue_id: str = ""
- _attr_use_multi_stream: bool = False
+ _attr_default_sample_rates: Tuple[int] = (44100, 48000, 88200, 96000)
+ _attr_default_stream_type: ContentType = ContentType.FLAC
# below objects will be set by playermanager at register/update
mass: MusicAssistant = None # type: ignore[assignment]
- _attr_group_parents: List[str] = [] # will be set by player manager
_prev_state: dict = {}
@property
"""Return player name."""
return self._attr_name or self.player_id
- @property
- def is_group(self) -> bool:
- """Return bool if this player is a grouped player (playergroup)."""
- return self._attr_is_group
-
- @property
- def group_childs(self) -> List[str]:
- """Return list of child player id's of PlayerGroup (if player is group)."""
- return self._attr_group_childs
-
- @property
- def group_parents(self) -> List[str]:
- """Return all/any group player id's this player belongs to."""
- return self._attr_group_parents
-
@property
def powered(self) -> bool:
"""Return current power state of player."""
"""Return basic device/provider info for this player."""
return self._attr_device_info
- @property
- def supported_sample_rates(self) -> Tuple[int]:
- """Return the sample rates this player supports."""
- return self._attr_supported_sample_rates
-
- @property
- def supported_content_types(self) -> Tuple[ContentType]:
- """Return the content types this player supports."""
- return self._attr_supported_content_types
-
- @property
- def active_queue(self) -> PlayerQueue:
- """
- Return the currently active queue for this player.
-
- If the player is a group child this will return its parent when that is playing,
- otherwise it will return the player's own queue.
- """
- return self.mass.players.get_player_queue(self._attr_active_queue_id)
-
- @property
- def use_multi_stream(self) -> bool:
- """
- Return bool if this player needs multistream approach.
-
- This is used for groupplayers that do not distribute the audio streams over players.
- Instead this can be used as convenience service where each client receives the same audio
- at more or less the same time. The player's implementation will be responsible for
- synchronization of audio on child players (if possible), Music Assistant will only
- coordinate the start and makes sure that every child received the same audio chunk
- within the same timespan.
- """
- return self._attr_use_multi_stream
-
async def play_url(self, url: str) -> None:
"""Play the specified url on the player."""
raise NotImplementedError
"""Send volume level (0..100) command to player."""
raise NotImplementedError
- # SOME CONVENIENCE METHODS (may be overridden if needed)
+ # DEFAULT PLAYER SETTINGS
@property
- def stream_type(self) -> ContentType:
- """Return supported/preferred stream type for playerqueue. Read only."""
- # determine default stream type from player capabilities
- return next(
- x
- for x in (
- ContentType.FLAC,
- ContentType.WAV,
- ContentType.PCM_S16LE,
- ContentType.MP3,
- ContentType.MPEG,
+ def default_sample_rates(self) -> Tuple[int]:
+ """Return the default supported sample rates."""
+ # if a player does not report/set its supported sample rates, we use a pretty safe default
+ return self._attr_default_sample_rates
+
+ @property
+ def default_stream_type(self) -> ContentType:
+ """Return the default content type to use for streaming."""
+ return self._attr_default_stream_type
+
+ # GROUP PLAYER ATTRIBUTES AND METHODS (may be overridden if needed)
+ # a player can optionally be a group leader (e.g. Sonos)
+ # or be a group player itself (e.g. Cast)
+ # support both scenarios here
+
+ @property
+ def is_group(self) -> bool:
+ """Return if this player represents a playergroup or is grouped with other players."""
+ return len(self.group_members) > 1
+
+ @property
+ def group_members(self) -> List[str]:
+ """
+ Return list of grouped players.
+
+ If this player is a dedicated group player (e.g. cast), returns the grouped child id's.
+ If this is a player grouped with other players within the same platform (e.g. Sonos),
+ this will return the players that are currently grouped together.
+ The first child id should represent the group leader.
+ """
+ return self._attr_group_members
+
+ @property
+ def group_leader(self) -> str:
+ """Return the leader's player_id of this playergroup."""
+ if group_members := self.group_members:
+ return group_members[0]
+ # fallback to own player id if player does not belong to a group
+ return self.player_id
+
+ @property
+ def is_group_leader(self) -> bool:
+ """Return if this player is the leader in a playergroup."""
+ return self.group_leader == self.player_id
+
+ @property
+ def is_passive(self) -> bool:
+ """
+ Return if this player may not accept any playback related commands.
+
+ Usually this means the player is part of a playergroup but not the leader.
+ """
+ return not self.is_group_leader
+
+ @property
+ def group_name(self) -> str:
+ """Return name of this grouped player."""
+ if not self.is_group:
+ return self.name
+ # default to name of groupleader and number of childs
+ num_childs = len([x for x in self.group_members if x != self.player_id])
+ return f"{self.name} +{num_childs}"
+
+ @property
+ def group_powered(self) -> bool:
+ """Calculate a group power state from the grouped members."""
+ if not self.available or not self.is_group:
+ return self.powered
+ for _ in self.get_child_players(True):
+ return True
+ return False
+
+ @property
+ def group_volume_level(self) -> int:
+ """Calculate a group volume from the grouped members."""
+ if not self.available or not self.is_group:
+ return self.volume_level
+ group_volume = 0
+ active_players = 0
+ for child_player in self.get_child_players(True):
+ group_volume += child_player.volume_level
+ active_players += 1
+ if active_players:
+ group_volume = group_volume / active_players
+ return int(group_volume)
+
+ async def set_group_volume(self, volume_level: int) -> None:
+ """Send volume level (0..100) command to groupplayer's member(s)."""
+ # handle group volume by only applying the volume to powered members
+ cur_volume = self.group_volume_level
+ new_volume = volume_level
+ volume_dif = new_volume - cur_volume
+ if cur_volume == 0:
+ volume_dif_percent = 1 + (new_volume / 100)
+ else:
+ volume_dif_percent = volume_dif / cur_volume
+ coros = []
+ for child_player in self.get_child_players(True):
+ cur_child_volume = child_player.volume_level
+ new_child_volume = cur_child_volume + (
+ cur_child_volume * volume_dif_percent
)
- if x in self.supported_content_types
- )
+ coros.append(child_player.volume_set(new_child_volume))
+ await asyncio.gather(*coros)
+
+ async def set_group_power(self, powered: bool) -> None:
+ """Send power command to groupplayer's member(s)."""
+ coros = [
+ player.power(powered) for player in self.get_child_players(not powered)
+ ]
+ await asyncio.gather(*coros)
+
+ # SOME CONVENIENCE METHODS (may be overridden if needed)
async def volume_mute(self, muted: bool) -> None:
"""Send volume mute command to player."""
# DO NOT OVERRIDE BELOW
+ @property
+ def active_queue(self) -> PlayerQueue:
+ """Return the queue that is currently active on/for this player."""
+ for queue in self.mass.players.player_queues:
+ if queue.stream and queue.stream.url == self.current_url:
+ return queue
+ return self.mass.players.get_player_queue(self.player_id)
+
def update_state(self, skip_forward: bool = False) -> None:
"""Update current player state in the player manager."""
if self.mass is None or self.mass.closed:
# guard
return
- self._attr_group_parents = self._get_attr_group_parents()
- # determine active queue for player
- self._attr_active_queue_id = self._get_active_queue_id()
# basic throttle: do not send state changed events if player did not change
cur_state = self.to_dict()
changed_keys = get_changed_keys(self._prev_state, cur_state)
if skip_forward:
return
if self.is_group:
- # update group player childs when parent updates
- for child_player_id in self.group_childs:
+ # update group player members when parent updates
+ for child_player_id in self.group_members:
if player := self.mass.players.get_player(child_player_id):
self.mass.create_task(
player.on_parent_update, self.player_id, changed_keys
)
return
- # update group player when child updates
- for group_player_id in self._attr_group_parents:
- if player := self.mass.players.get_player(group_player_id):
- self.mass.create_task(
- player.on_child_update, self.player_id, changed_keys
- )
-
- def _get_attr_group_parents(self) -> List[str]:
+ # update group player(s) when child updates
+ for group_player in self.get_group_parents():
+ self.mass.create_task(
+ group_player.on_child_update, self.player_id, changed_keys
+ )
+
+ def get_group_parents(self) -> List[Player]:
"""Get any/all group player id's this player belongs to."""
return [
- x.player_id
+ x
for x in self.mass.players
- if x.is_group and self.player_id in x.group_childs
+ if x.is_group and self.player_id in x.group_members and x != self
]
- def _get_active_queue_id(self) -> str:
- """Return the currently active queue for this (grouped) player."""
- for player_id in self._attr_group_parents:
- player = self.mass.players.get_player(player_id)
- if not player or not player.powered:
- continue
- queue = self.mass.players.get_player_queue(player_id)
- if not queue or not queue.active:
- continue
- if queue.stream.stream_id in player.current_url:
- # match found!
- return queue.queue_id
- return self.player_id
-
def to_dict(self) -> Dict[str, Any]:
"""Export object to dict."""
return {
"elapsed_time": int(self.elapsed_time),
"state": self.state.value,
"available": self.available,
- "is_group": self.is_group,
- "group_childs": self.group_childs,
- "group_parents": self.group_parents,
"volume_level": int(self.volume_level),
+ "is_group": self.is_group,
+ "group_members": self.group_members,
+ "is_passive": self.is_passive,
+ "group_name": self.group_name,
+ "group_powered": self.group_powered,
+ "group_volume_level": int(self.group_volume_level),
"device_info": self.device_info.to_dict(),
- "active_queue": self.active_queue.queue_id,
+ "active_queue": self.active_queue.queue_id
+ if self.active_queue
+ else self.player_id,
}
-
-### Some convenience help functions below
-
-
-def get_group_volume(group_player: Player) -> int:
- """Calculate volume level of group player's childs."""
- if not group_player.available:
- return 0
- group_volume = 0
- active_players = 0
- for child_player in get_child_players(group_player, True):
- group_volume += child_player.volume_level
- active_players += 1
- if active_players:
- group_volume = group_volume / active_players
- return int(group_volume)
-
-
-def get_child_players(
- group_player: Player, only_powered: bool = False, only_playing: bool = False
-) -> List[Player]:
- """Get players attached to a grouped player."""
- if not group_player.mass:
- return []
- child_players = []
- for child_id in group_player.group_childs:
- if child_player := group_player.mass.players.get_player(child_id):
- if not (not only_powered or child_player.powered):
- continue
- if not (not only_playing or child_player.state == PlayerState.PLAYING):
- continue
- child_players.append(child_player)
- return child_players
-
-
-async def set_group_volume(group_player: Player, volume_level: int) -> None:
- """Send volume level (0..100) command to groupplayer's child."""
- # handle group volume by only applying the valume to powered childs
- cur_volume = group_player.volume_level
- new_volume = volume_level
- volume_dif = new_volume - cur_volume
- if cur_volume == 0:
- volume_dif_percent = 1 + (new_volume / 100)
- else:
- volume_dif_percent = volume_dif / cur_volume
- for child_player in get_child_players(group_player, True):
- cur_child_volume = child_player.volume_level
- new_child_volume = cur_child_volume + (cur_child_volume * volume_dif_percent)
- await child_player.volume_set(new_child_volume)
+ def get_child_players(
+ self,
+ only_powered: bool = False,
+ only_playing: bool = False,
+ skip_passive: bool = False,
+ ) -> List[Player]:
+ """Get players attached to a grouped player."""
+ if not self.mass:
+ return []
+ child_players = []
+ for child_id in self.group_members:
+ if child_player := self.mass.players.get_player(child_id):
+ if not (not only_powered or child_player.powered):
+ continue
+ if not (not only_playing or child_player.state == PlayerState.PLAYING):
+ continue
+ if skip_passive and child_player.is_passive:
+ continue
+ child_players.append(child_player)
+ return child_players
import asyncio
import random
-from typing import TYPE_CHECKING, Any, Dict, Optional
+from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
from .enums import ContentType, CrossFadeMode, RepeatMode
self._crossfade_duration: int = 6
self._volume_normalization_enabled: bool = True
self._volume_normalization_target: int = -14
+ self._stream_type: Optional[ContentType] = None
+ self._sample_rates: Optional[Tuple[int]] = None
@property
def repeat_mode(self) -> RepeatMode:
@property
def stream_type(self) -> ContentType:
- """Return supported/preferred stream type for playerqueue. Read only."""
- return self._queue.player.stream_type
+ """Return supported/preferred stream type for this playerqueue."""
+ if self._stream_type is None:
+ # return player's default
+ return self._queue.player.default_stream_type
+ return self._stream_type
+
+ @stream_type.setter
+ def stream_type(self, value: ContentType) -> None:
+ """Set supported/preferred stream type for this playerqueue."""
+ if self._stream_type != value:
+ self._stream_type = value
+ self._on_update("stream_type")
+
+ @property
+ def sample_rates(self) -> Tuple[int]:
+ """Return supported/preferred sample rate(s) for this playerqueue."""
+ if self._sample_rates is None:
+ # return player's default
+ return self._queue.player.default_sample_rates
+ return self._sample_rates
+
+ @sample_rates.setter
+ def sample_rates(self, value: ContentType) -> None:
+ """Set supported/preferred sample rate(s) for this playerqueue."""
+ if self._stream_type != value:
+ self._stream_type = value
+ self._on_update("sample_rates")
+
+ @property
+ def max_sample_rate(self) -> int:
+ """Return the maximum samplerate supported by this playerqueue."""
+ return max(self.sample_rates)
def to_dict(self) -> Dict[str, Any]:
"""Return dict from settings."""
"crossfade_duration": self.crossfade_duration,
"volume_normalization_enabled": self.volume_normalization_enabled,
"volume_normalization_target": self.volume_normalization_target,
+ "stream_type": self.stream_type.value,
+ "sample_rates": self.sample_rates,
}
async def restore(self) -> None:
("crossfade_duration", int),
("volume_normalization_enabled", bool),
("volume_normalization_target", float),
+ ("stream_type", ContentType),
+ ("sample_rates", tuple),
):
db_key = f"{self._queue.queue_id}_{key}"
if db_value := await self.mass.database.get_setting(db_key, db=_db):