media=media,
)
- async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
- """Handle enqueuing of a next media item on the player."""
- player = self.get(player_id, raise_unavailable=True)
- if PlayerFeature.ENQUEUE not in player.supported_features:
- raise UnsupportedFeaturedException(
- f"Player {player.display_name} does not support enqueueing"
- )
- player_prov = self.mass.get_provider(player.provider)
- async with self._player_throttlers[player_id]:
- await player_prov.enqueue_next_media(player_id=player_id, media=media)
-
+ @api_command("players/cmd/select_source")
async def select_source(self, player_id: str, source: str) -> None:
"""
Handle SELECT SOURCE command on given player.
provider = self.mass.get_provider(player.provider)
await provider.select_source(player_id, source)
+ async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
+ """Handle enqueuing of a next media item on the player."""
+ player = self.get(player_id, raise_unavailable=True)
+ if PlayerFeature.ENQUEUE not in player.supported_features:
+ raise UnsupportedFeaturedException(
+ f"Player {player.display_name} does not support enqueueing"
+ )
+ player_prov = self.mass.get_provider(player.provider)
+ async with self._player_throttlers[player_id]:
+ await player_prov.enqueue_next_media(player_id=player_id, media=media)
+
@api_command("players/cmd/group")
@handle_player_command
async def cmd_group(self, player_id: str, target_player: str) -> None:
from aiosonos.api.models import PlayBackState as SonosPlayBackState
from music_assistant_models.enums import PlayerFeature, PlayerState
+from music_assistant_models.player import PlayerSource
PLAYBACK_STATE_MAP = {
SonosPlayBackState.PLAYBACK_STATE_BUFFERING: PlayerState.PLAYING,
PlayerFeature.SET_MEMBERS,
PlayerFeature.PAUSE,
PlayerFeature.ENQUEUE,
+ PlayerFeature.NEXT_PREVIOUS,
+ PlayerFeature.SEEK,
+ PlayerFeature.SELECT_SOURCE,
}
SOURCE_LINE_IN = "line_in"
SOURCE_AIRPLAY = "airplay"
SOURCE_SPOTIFY = "spotify"
SOURCE_UNKNOWN = "unknown"
+SOURCE_TV = "tv"
SOURCE_RADIO = "radio"
CONF_AIRPLAY_MODE = "airplay_mode"
+
+PLAYER_SOURCE_MAP = {
+ SOURCE_LINE_IN: PlayerSource(
+ id=SOURCE_LINE_IN,
+ name="Line-in",
+ passive=False,
+ can_play_pause=False,
+ can_next_previous=False,
+ can_seek=False,
+ ),
+ SOURCE_TV: PlayerSource(
+ id=SOURCE_TV,
+ name="TV",
+ passive=False,
+ can_play_pause=False,
+ can_next_previous=False,
+ can_seek=False,
+ ),
+ SOURCE_AIRPLAY: PlayerSource(
+ id=SOURCE_AIRPLAY,
+ name="Spotify",
+ passive=True,
+ can_play_pause=True,
+ can_next_previous=True,
+ can_seek=True,
+ ),
+ SOURCE_SPOTIFY: PlayerSource(
+ id=SOURCE_SPOTIFY,
+ name="Spotify",
+ passive=True,
+ can_play_pause=True,
+ can_next_previous=True,
+ can_seek=True,
+ ),
+ SOURCE_RADIO: PlayerSource(
+ id=SOURCE_RADIO,
+ name="Spotify",
+ passive=True,
+ can_play_pause=True,
+ can_next_previous=True,
+ can_seek=True,
+ ),
+}
CONF_AIRPLAY_MODE,
PLAYBACK_STATE_MAP,
PLAYER_FEATURES_BASE,
+ PLAYER_SOURCE_MAP,
SOURCE_AIRPLAY,
SOURCE_LINE_IN,
SOURCE_RADIO,
SOURCE_SPOTIFY,
+ SOURCE_TV,
)
if TYPE_CHECKING:
# but for now we assume we only have one
can_group_with={self.prov.lookup_key},
)
+ if SonosCapability.LINE_IN in self.discovery_info["device"]["capabilities"]:
+ mass_player.source_list.append(PLAYER_SOURCE_MAP[SOURCE_LINE_IN])
+ if SonosCapability.HT_PLAYBACK in self.discovery_info["device"]["capabilities"]:
+ mass_player.source_list.append(PLAYER_SOURCE_MAP[SOURCE_TV])
+ if SonosCapability.AIRPLAY in self.discovery_info["device"]["capabilities"]:
+ mass_player.source_list.append(PLAYER_SOURCE_MAP[SOURCE_AIRPLAY])
+
self.update_attributes()
await self.mass.players.register_or_update(mass_player)
if player_provider := self.mass.get_provider(airplay.provider):
await player_provider.cmd_stop(airplay.player_id)
return
- try:
- await self.client.player.group.stop()
- except FailedCommand as err:
- if "ERROR_PLAYBACK_NO_CONTENT" not in str(err):
- raise
+ await self.client.player.group.stop()
async def cmd_play(self) -> None:
"""Send PLAY command to given player."""
if player_provider := self.mass.get_provider(airplay.provider):
await player_provider.cmd_pause(airplay.player_id)
return
+ if not self.client.player.group.playback_actions.can_pause:
+ await self.cmd_stop()
+ return
await self.client.player.group.pause()
+ async def cmd_seek(self, position: int) -> None:
+ """Handle SEEK command for given player.
+
+ - position: position in seconds to seek to in the current playing item.
+ """
+ if self.client.player.is_passive:
+ self.logger.debug("Ignore STOP command: Player is synced to another player.")
+ return
+ await self.client.player.group.seek(position)
+
async def cmd_volume_set(self, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
await self.client.player.set_volume(volume_level)
"""Send VOLUME MUTE command to given player."""
await self.client.player.set_volume(muted=muted)
+ async def select_source(self, source: str) -> None:
+ """Handle SELECT SOURCE command on given player."""
+ if source == SOURCE_LINE_IN:
+ await self.client.player.group.load_line_in(play_on_completion=True)
+ elif source == SOURCE_TV:
+ await self.client.player.load_home_theater_playback()
+ else:
+ # unsupported source - try to clear the queue/player
+ await self.cmd_stop()
+
def update_attributes(self) -> None: # noqa: PLR0915
"""Update the player attributes."""
if not self.mass_player:
container = active_group.playback_metadata.get("container")
if container_type == ContainerType.LINEIN:
self.mass_player.active_source = SOURCE_LINE_IN
+ elif container_type in (ContainerType.HOME_THEATER_HDMI, ContainerType.HOME_THEATER_SPDIF):
+ self.mass_player.active_source = SOURCE_TV
elif container_type == ContainerType.AIRPLAY:
# check if the MA airplay player is active
if airplay_player and airplay_player.state in (
self.mass_player.active_source = SOURCE_AIRPLAY
elif container_type == ContainerType.STATION:
self.mass_player.active_source = SOURCE_RADIO
+ # add radio to source list if not yet there
+ if SOURCE_RADIO not in [x.id for x in self.mass_player.source_list]:
+ self.mass_player.source_list.append(PLAYER_SOURCE_MAP[SOURCE_RADIO])
elif active_service == MusicService.SPOTIFY:
self.mass_player.active_source = SOURCE_SPOTIFY
+ # add spotify to source list if not yet there
+ if SOURCE_SPOTIFY not in [x.id for x in self.mass_player.source_list]:
+ self.mass_player.source_list.append(PLAYER_SOURCE_MAP[SOURCE_SPOTIFY])
elif active_service == MusicService.MUSIC_ASSISTANT:
if self.client.player.is_coordinator:
self.mass_player.active_source = self.mass_player.player_id
self.mass_player.active_source = object_id.split(":")[-1]
else:
self.mass_player.active_source = None
- else:
- # its playing some service we did not yet map
+ # its playing some service we did not yet map
+ elif container and container.get("service", {}).get("name"):
+ self.mass_player.active_source = container["service"]["name"]
+ elif container and container.get("name"):
+ self.mass_player.active_source = container["name"]
+ elif active_service:
self.mass_player.active_source = active_service
-
- # sonos has this weirdness that it maps idle to paused
- # which is annoying to figure out if we want to resume or let
- # MA back in control again. So for now, we just map it to idle here.
- if (
- self.mass_player.state == PlayerState.PAUSED
- and active_service != MusicService.MUSIC_ASSISTANT
- ):
- self.mass_player.state = PlayerState.IDLE
+ elif container_type:
+ self.mass_player.active_source = container_type
+ else:
+ # the player has nothing loaded at all (empty queue and no service active)
+ self.mass_player.active_source = None
# parse current media
self.mass_player.elapsed_time = self.client.player.group.position
if sonos_player := self.sonos_players[player_id]:
await sonos_player.cmd_pause()
+ async def cmd_seek(self, player_id: str, position: int) -> None:
+ """Handle SEEK command for given player.
+
+ - player_id: player_id of the player to handle the command.
+ - position: position in seconds to seek to in the current playing item.
+ """
+ if sonos_player := self.sonos_players[player_id]:
+ await sonos_player.cmd_seek(position)
+
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
if sonos_player := self.sonos_players[player_id]:
duration = media_info.duration or 10
await asyncio.sleep(duration)
+ async def select_source(self, player_id: str, source: str) -> None:
+ """Handle SELECT SOURCE command on given player."""
+ if sonos_player := self.sonos_players[player_id]:
+ await sonos_player.select_source(source)
+
async def _setup_player(self, player_id: str, name: str, info: AsyncServiceInfo) -> None:
"""Handle setup of a new player that is discovered using mdns."""
assert player_id not in self.sonos_players