PAUSE = "pause"
SYNC = "sync"
SEEK = "seek"
+ NEXT_PREVIOUS = "next_previous"
PLAY_ANNOUNCEMENT = "play_announcement"
UNKNOWN = "unknown"
HTTP = "http" # regular http stream
ENCRYPTED_HTTP = "encrypted_http" # encrypted http stream
HLS = "hls" # http HLS stream
+ ENCRYPTED_HLS = "encrypted_hls" # encrypted HLS stream
ICY = "icy" # http stream with icy metadata
LOCAL_FILE = "local_file"
CUSTOM = "custom"
RepeatMode,
)
from music_assistant.common.models.errors import (
+ InvalidCommand,
MediaNotFoundError,
MusicAssistantError,
PlayerUnavailableError,
- queue_id: queue_id of the playerqueue to handle the command.
"""
- if queue := self.get(queue_id):
+ queue_player: Player = self.mass.players.get(queue_id, True)
+ if queue_player.announcement_in_progress:
+ self.logger.warning("Ignore queue command: An announcement is in progress")
+ return
+ if (queue := self.get(queue_id)) and queue.active:
queue.resume_pos = queue.corrected_elapsed_time
queue.stream_finished = None
queue.end_of_track_reached = None
- # forward the actual command to the player controller
- await self.mass.players.cmd_stop(queue_id, skip_redirect=True)
+ # forward the actual command to the player provider
+ if player_provider := self.mass.players.get_player_provider(queue.queue_id):
+ await player_provider.cmd_stop(queue_id)
@api_command("player_queues/play")
async def play(self, queue_id: str) -> None:
if (
(queue := self._queues.get(queue_id))
and queue.active
- and queue_player.powered
- and queue.state == PlayerState.PAUSED
+ and queue_player.state == PlayerState.PAUSED
):
- # forward the actual command to the player controller
- await self.mass.players.cmd_play(queue_id, skip_redirect=True)
- else:
- await self.resume(queue_id)
+ # forward the actual play/unpause command to the player provider
+ if player_provider := self.mass.players.get_player_provider(queue.queue_id):
+ await player_provider.cmd_play(queue_id)
+ return
+ # player is not paused, perform resume instead
+ await self.resume(queue_id)
@api_command("player_queues/pause")
async def pause(self, queue_id: str) -> None:
if queue_player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
- if (queue := self.get(queue_id)) is None or not queue.active:
- # TODO: forward to underlying player if not active
+ if not (queue := self.get(queue_id)):
return
- assert queue.current_item, "No item loaded"
- assert queue.current_item.media_item.media_type == MediaType.TRACK
- assert queue.current_item.duration
- assert position < queue.current_item.duration
- await self.play_index(queue_id, queue.current_index, position)
+ if not queue.current_item:
+ raise InvalidCommand(f"Queue {queue_player.display_name} has no item(s) loaded.")
+ if (
+ queue.current_item.media_item.media_type != MediaType.TRACK
+ or not queue.current_item.duration
+ ):
+ raise InvalidCommand("Can not seek on non track items.")
+ position = max(0, int(position))
+ if position > queue.current_item.duration:
+ raise InvalidCommand("Can not seek outside of duration range.")
+ await self.play_index(queue_id, queue.current_index, seek_position=position)
@api_command("player_queues/resume")
async def resume(self, queue_id: str, fade_in: bool | None = None) -> None:
@api_command("players/cmd/stop")
@handle_player_command
- async def cmd_stop(self, player_id: str, skip_redirect: bool = False) -> None:
+ async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player.
- player_id: player_id of the player to handle the command.
"""
- player = self._get_player_with_redirect(player_id, skip_redirect=skip_redirect)
- # Redirect to queue controller if active (as it also handles some other logic)
- # Note that skip_redirect will be set by the queue controller
- # to prevent an endless loop.
- if not skip_redirect and player.active_source == player_id:
- await self.mass.player_queues.stop(player_id)
+ player = self._get_player_with_redirect(player_id)
+ # Redirect to queue controller if it is active
+ if active_queue := self.mass.player_queues.get(player.active_source):
+ await self.mass.player_queues.stop(active_queue.queue_id)
return
- if player_provider := self.get_player_provider(player_id):
- await player_provider.cmd_stop(player_id)
+ # send to player provider
+ async with self._player_throttlers[player_id]:
+ if player_provider := self.get_player_provider(player_id):
+ await player_provider.cmd_stop(player_id)
@api_command("players/cmd/play")
@handle_player_command
- async def cmd_play(self, player_id: str, skip_redirect: bool = False) -> None:
+ async def cmd_play(self, player_id: str) -> None:
"""Send PLAY (unpause) command to given player.
- player_id: player_id of the player to handle the command.
"""
- player = self._get_player_with_redirect(player_id, skip_redirect=skip_redirect)
- if player.announcement_in_progress:
- self.logger.warning("Ignore queue command: An announcement is in progress")
- return
- # Redirect to queue controller if active (as it also handles some other logic)
- # Note that skip_redirect will be set by the queue controller
- # to prevent an endless loop.
- if not skip_redirect and player.active_source == player_id:
- await self.mass.player_queues.play(player_id)
+ player = self._get_player_with_redirect(player_id)
+ # Redirect to queue controller if it is active
+ active_source = player.active_source or player.player_id
+ if (active_queue := self.mass.player_queues.get(active_source)) and active_queue.items:
+ await self.mass.player_queues.play(active_queue.queue_id)
return
+ # send to player provider
player_provider = self.get_player_provider(player_id)
async with self._player_throttlers[player_id]:
await player_provider.cmd_play(player_id)
@api_command("players/cmd/pause")
@handle_player_command
- async def cmd_pause(self, player_id: str, skip_redirect: bool = False) -> None:
+ async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player.
- player_id: player_id of the player to handle the command.
"""
- player = self._get_player_with_redirect(player_id, skip_redirect=skip_redirect)
+ player = self._get_player_with_redirect(player_id)
if player.announcement_in_progress:
self.logger.warning("Ignore command: An announcement is in progress")
return
if PlayerFeature.PAUSE not in player.supported_features:
# if player does not support pause, we need to send stop
+ self.logger.info(
+ "Player %s does not support pause, using STOP instead", player.display_name
+ )
await self.cmd_stop(player_id)
return
player_provider = self.get_player_provider(player_id)
await self.cmd_stop(_player_id)
# we auto stop a player from paused when its paused for 30 seconds
- self.mass.create_task(_watch_pause(player_id))
+ if not player.announcement_in_progress:
+ self.mass.create_task(_watch_pause(player_id))
@api_command("players/cmd/play_pause")
async def cmd_play_pause(self, player_id: str) -> None:
- player_id: player_id of the player to handle the command.
"""
- player = self._get_player_with_redirect(player_id, skip_redirect=False)
+ player = self._get_player_with_redirect(player_id)
if player.state == PlayerState.PLAYING:
await self.cmd_pause(player_id)
else:
await self.cmd_play(player_id)
+ @api_command("players/cmd/seek")
+ async def cmd_seek(self, player_id: str, position: int) -> None:
+ """Handle SEEK command for given player.
+
+ - player_id: player_id of the player to handle the command.
+ - position: position in seconds to seek to in the current playing item.
+ """
+ player = self._get_player_with_redirect(player_id)
+ # Redirect to queue controller if it is active
+ active_source = player.active_source or player.player_id
+ if active_queue := self.mass.player_queues.get(active_source):
+ await self.mass.player_queues.seek(active_queue.queue_id, position)
+ return
+ if PlayerFeature.SEEK not in player.supported_features:
+ msg = f"Player {player.display_name} does not support seeking"
+ raise UnsupportedFeaturedException(msg)
+ player_prov = self.mass.players.get_player_provider(player_id)
+ await player_prov.cmd_seek(player_id, position)
+
+ @api_command("players/cmd/next")
+ async def cmd_next_track(self, player_id: str) -> None:
+ """Handle NEXT TRACK command for given player."""
+ player = self._get_player_with_redirect(player_id)
+ # Redirect to queue controller if it is active
+ active_source = player.active_source or player.player_id
+ if active_queue := self.mass.player_queues.get(active_source):
+ await self.mass.player_queues.next(active_queue.queue_id)
+ return
+ if PlayerFeature.NEXT_PREVIOUS not in player.supported_features:
+ msg = f"Player {player.display_name} does not support skipping to the next track."
+ raise UnsupportedFeaturedException(msg)
+ player_prov = self.mass.players.get_player_provider(player_id)
+ await player_prov.cmd_next(player_id)
+
+ @api_command("players/cmd/previous")
+ async def cmd_previous_track(self, player_id: str) -> None:
+ """Handle PREVIOUS TRACK command for given player."""
+ player = self._get_player_with_redirect(player_id)
+ # Redirect to queue controller if it is active
+ active_source = player.active_source or player.player_id
+ if active_queue := self.mass.player_queues.get(active_source):
+ await self.mass.player_queues.previous(active_queue.queue_id)
+ return
+ if PlayerFeature.NEXT_PREVIOUS not in player.supported_features:
+ msg = f"Player {player.display_name} does not support skipping to the previous track."
+ raise UnsupportedFeaturedException(msg)
+ player_prov = self.mass.players.get_player_provider(player_id)
+ await player_prov.cmd_previous(player_id)
+
@api_command("players/cmd/power")
@handle_player_command
- async def cmd_power(
- self, player_id: str, powered: bool, skip_redirect: bool = False, skip_update: bool = False
- ) -> None:
+ async def cmd_power(self, player_id: str, powered: bool, skip_update: bool = False) -> None:
"""Send POWER command to given player.
- player_id: player_id of the player to handle the command.
if player.powered == powered:
return # nothing to do
- if player.active_group and not powered and not skip_redirect:
+ if player.active_group and not powered:
# this is simply not possible (well, not without major headaches)
# the player is part of a permanent (sync)group and the user tries to power off
# one child player... we can't allow this, as it would break the group so we
async with self._player_throttlers[player_id]:
await player_provider.cmd_volume_mute(player_id, muted)
- @api_command("players/cmd/seek")
- async def cmd_seek(self, player_id: str, position: int) -> None:
- """Handle SEEK command for given player (directly).
-
- - player_id: player_id of the player to handle the command.
- - position: position in seconds to seek to in the current playing item.
- """
- player = self._get_player_with_redirect(player_id)
- if PlayerFeature.SEEK not in player.supported_features:
- msg = f"Player {player.display_name} does not support seeking"
- raise UnsupportedFeaturedException(msg)
- player_prov = self.mass.players.get_player_provider(player_id)
- await player_prov.cmd_seek(player_id, position)
-
@api_command("players/cmd/play_announcement")
async def play_announcement(
self,
player.announcement_in_progress = False
@handle_player_command
- async def play_media(
- self, player_id: str, media: PlayerMedia, skip_redirect: bool = False
- ) -> None:
+ async def play_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA on given player.
- player_id: player_id of the player to handle the command.
- media: The Media that needs to be played on the player.
"""
- player = self._get_player_with_redirect(player_id, skip_redirect=skip_redirect)
+ player = self._get_player_with_redirect(player_id)
# power on the player if needed
if not player.powered:
await self.cmd_power(player_id, True)
self.mass.signal_event(EventType.PLAYER_REMOVED, player_id)
def update(
- self, player_id: str, skip_redirect: bool = False, force_update: bool = False
+ self, player_id: str, skip_forward: bool = False, force_update: bool = False
) -> None:
"""Update player state."""
if self.mass.closing:
self.mass.signal_event(EventType.PLAYER_UPDATED, object_id=player_id, data=player)
- if skip_redirect:
+ if skip_forward:
return
# update/signal group player(s) child's when group updates
if player.type == PlayerType.GROUP:
for child_player in self.iter_group_members(player, exclude_self=True):
- self.update(child_player.player_id, skip_redirect=True)
+ self.update(child_player.player_id, skip_forward=True)
# update/signal group player(s) when child updates
for group_player in self._get_player_groups(player, powered_only=False):
if player_prov := self.mass.get_provider(group_player.provider):
# ensure the result is an integer
return None if volume_level is None else int(volume_level)
- def _get_player_with_redirect(self, player_id: str, skip_redirect: bool = False) -> Player:
+ def _get_player_with_redirect(self, player_id: str) -> Player:
"""Get player with check if playback related command should be redirected."""
player = self.get(player_id, True)
- if skip_redirect:
- return player
if player.synced_to and (sync_leader := self.get(player.synced_to)):
self.logger.info(
"Player %s is synced to %s and can not accept "
)
from .ffmpeg import FFMpeg, get_ffmpeg_stream
-from .playlists import HLS_CONTENT_TYPES, IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
+from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from .process import AsyncProcess, check_output, communicate
from .tags import parse_tags
from .throttle_retry import BYPASS_THROTTLER
streamdetails.stream_type in (StreamType.ICY, StreamType.HLS, StreamType.HTTP)
and streamdetails.media_type == MediaType.RADIO
):
- resolved_url, is_icy, is_hls = await resolve_radio_stream(mass, streamdetails.path)
+ resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path)
streamdetails.path = resolved_url
- if is_hls:
- streamdetails.stream_type = StreamType.HLS
- elif is_icy:
- streamdetails.stream_type = StreamType.ICY
+ streamdetails.stream_type = stream_type
# set queue_id on the streamdetails so we know what is being streamed
streamdetails.queue_id = queue_item.queue_id
# handle skip/fade_in details
return file.getvalue()
-async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool, bool]:
+async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, StreamType]:
"""
Resolve a streaming radio URL.
Returns tuple;
- unfolded URL as string
- - bool if the URL represents a ICY (radio) stream.
- - bool uf the URL represents a HLS stream/playlist.
+ - StreamType to determine ICY (radio) or HLS stream.
"""
- cache_base_key = "resolved_radio"
+ cache_base_key = "resolved_radio_info"
if cache := await mass.cache.get(url, base_key=cache_base_key):
return cache
- is_hls = False
- is_icy = False
+ stream_type = StreamType.HTTP
resolved_url = url
timeout = ClientTimeout(total=0, connect=10, sock_read=5)
try:
resp.raise_for_status()
if not resp.headers:
raise InvalidDataError("no headers found")
- is_icy = headers.get("icy-metaint") is not None
- is_hls = headers.get("content-type") in HLS_CONTENT_TYPES
+ if headers.get("icy-metaint") is not None:
+ stream_type = StreamType.ICY
if (
url.endswith((".m3u", ".m3u8", ".pls"))
or ".m3u?" in url
# unfold first url of playlist
return await resolve_radio_stream(mass, line.path)
raise InvalidDataError("No content found in playlist")
- except IsHLSPlaylist:
- is_hls = True
+ except IsHLSPlaylist as err:
+ stream_type = StreamType.ENCRYPTED_HLS if err.encrypted else StreamType.HLS
except Exception as err:
LOGGER.warning("Error while parsing radio URL %s: %s", url, err)
- return (url, is_icy, is_hls)
+ return (url, stream_type)
- result = (resolved_url, is_icy, is_hls)
+ result = (resolved_url, stream_type)
cache_expiration = 3600 * 3
await mass.cache.set(url, result, expiration=cache_expiration, base_key=cache_base_key)
return result
async def get_hls_substream(
mass: MusicAssistant,
url: str,
+ allow_encrypted: bool = False,
) -> PlaylistItem:
"""Select the (highest quality) HLS substream for given HLS playlist/URL."""
timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
raw_data = await resp.read()
encoding = resp.charset or await detect_charset(raw_data)
master_m3u_data = raw_data.decode(encoding)
+ if not allow_encrypted and "EXT-X-KEY:METHOD=AES-128" in master_m3u_data:
+ # for now we don't support encrypted HLS streams
+ raise InvalidDataError("HLS stream is encrypted, not supported")
substreams = parse_m3u(master_m3u_data)
- if any(x for x in substreams if x.length and not x.key):
+ if any(x for x in substreams if x.length or x.key):
# this is already a substream!
return PlaylistItem(
path=url,
class IsHLSPlaylist(InvalidDataError):
"""The playlist from an HLS stream and should not be parsed."""
+ encrypted: bool = False
+
@dataclass
class PlaylistItem:
raise InvalidDataError(msg) from err
if "#EXT-X-VERSION:" in playlist_data or "#EXT-X-STREAM-INF:" in playlist_data:
- raise IsHLSPlaylist
+ raise IsHLSPlaylist(encrypted="#EXT-X-KEY:" in playlist_data)
if url.endswith((".m3u", ".m3u8")):
playlist = parse_m3u(playlist_data)
raise NotImplementedError
async def cmd_seek(self, player_id: str, position: int) -> None:
- """Handle SEEK command for given queue.
+ """Handle SEEK command for given player.
- player_id: player_id of the player to handle the command.
- position: position in seconds to seek to in the current playing item.
# will only be called for players with Seek feature set.
raise NotImplementedError
+ async def cmd_next(self, player_id: str) -> None:
+ """Handle NEXT TRACK command for given player."""
+ # will only be called for players with 'next_previous' feature set.
+ raise NotImplementedError
+
+ async def cmd_previous(self, player_id: str) -> None:
+ """Handle PREVIOUS TRACK command for given player."""
+ # will only be called for players with 'next_previous' feature set.
+ raise NotImplementedError
+
async def cmd_sync(self, player_id: str, target_player: str) -> None:
"""Handle SYNC command for given player.
raise RuntimeError("Player is synced")
if player.group_childs:
# pause is not supported while synced, use stop instead
+ self.logger.debug("Player is synced, using STOP instead of PAUSE")
await self.cmd_stop(player_id)
return
airplay_player = self._players[player_id]
)
else:
# make sure that the player manager gets an update
- self.mass.players.update(child_player.player_id, skip_redirect=True)
- self.mass.players.update(parent_player.player_id, skip_redirect=True)
+ self.mass.players.update(child_player.player_id, skip_forward=True)
+ self.mass.players.update(parent_player.player_id, skip_forward=True)
@lock
async def cmd_unsync(self, player_id: str) -> None:
airplay_player = self._players.get(player_id)
await airplay_player.cmd_stop()
# make sure that the player manager gets an update
- self.mass.players.update(player.player_id, skip_redirect=True)
- self.mass.players.update(group_leader.player_id, skip_redirect=True)
+ self.mass.players.update(player.player_id, skip_forward=True)
+ self.mass.players.update(group_leader.player_id, skip_forward=True)
async def _getcliraop_binary(self):
"""Find the correct raop/airplay binary belonging to the platform."""
castplayer = self.castplayers[player_id]
await asyncio.to_thread(castplayer.cc.media_controller.pause)
+ async def cmd_next(self, player_id: str) -> None:
+ """Handle NEXT TRACK command for given player."""
+ castplayer = self.castplayers[player_id]
+ await asyncio.to_thread(castplayer.cc.media_controller.queue_next)
+
+ async def cmd_previous(self, player_id: str) -> None:
+ """Handle PREVIOUS TRACK command for given player."""
+ castplayer = self.castplayers[player_id]
+ await asyncio.to_thread(castplayer.cc.media_controller.queue_prev)
+
async def cmd_power(self, player_id: str, powered: bool) -> None:
"""Send POWER command to given player."""
castplayer = self.castplayers[player_id]
PlayerFeature.VOLUME_MUTE,
PlayerFeature.VOLUME_SET,
PlayerFeature.PAUSE,
+ PlayerFeature.NEXT_PREVIOUS,
),
enabled_by_default=enabled_by_default,
needs_poll=True,
status.app_id,
status.volume_level,
)
- castplayer.player.name = castplayer.cast_info.friendly_name
- castplayer.player.volume_level = int(status.volume_level * 100)
- castplayer.player.volume_muted = status.volume_muted
- castplayer.player.powered = (
- castplayer.cc.app_id is not None and castplayer.cc.app_id != pychromecast.IDLE_APP_ID
- )
# handle stereo pairs
if castplayer.cast_info.is_multichannel_group:
castplayer.player.type = PlayerType.STEREO_PAIR
PlayerFeature.PAUSE,
)
+ # update player status
+ castplayer.player.name = castplayer.cast_info.friendly_name
+ castplayer.player.volume_level = int(status.volume_level * 100)
+ castplayer.player.volume_muted = status.volume_muted
+ new_powered = (
+ castplayer.cc.app_id is not None and castplayer.cc.app_id != pychromecast.IDLE_APP_ID
+ )
+ if (
+ castplayer.player.powered
+ and not new_powered
+ and castplayer.player.type == PlayerType.GROUP
+ ):
+ # group is being powered off, update group childs
+ for child_id in castplayer.player.group_childs:
+ if child := self.castplayers.get(child_id):
+ child.player.powered = False
+ child.player.active_group = None
+ child.player.active_source = None
+ castplayer.player.powered = new_powered
# send update to player manager
self.mass.loop.call_soon_threadsafe(self.mass.players.update, castplayer.player_id)
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
group_player = self.mass.players.get(player_id)
+ # syncgroup: forward command to sync leader
if player_id.startswith(SYNCGROUP_PREFIX):
- # syncgroup: forward command to sync leader
if sync_leader := self._get_sync_leader(group_player):
- await self.mass.players.cmd_stop(sync_leader.player_id, skip_redirect=True)
- else:
- # ugp: forward command to all active members
- async with TaskManager(self.mass) as tg:
- for member in self.mass.players.iter_group_members(group_player, active_only=True):
- if member.state not in (PlayerState.PAUSED, PlayerState.PLAYING):
- continue
- tg.create_task(self.mass.players.cmd_stop(member.player_id, skip_redirect=True))
- # abort the stream session
- if (stream := self.ugp_streams.pop(player_id, None)) and not stream.done:
- await stream.stop()
- # set state optimistically
- group_player.state = PlayerState.IDLE
- self.mass.players.update(player_id)
+ if player_provider := self.mass.get_provider(sync_leader.provider):
+ await player_provider.cmd_stop(sync_leader.player_id)
+ return
+ # ugp: forward command to all members
+ async with TaskManager(self.mass) as tg:
+ for member in self.mass.players.iter_group_members(group_player, active_only=True):
+ if player_provider := self.mass.get_provider(member.provider):
+ tg.create_task(player_provider.cmd_stop(member.player_id))
+ # abort the stream session
+ if (stream := self.ugp_streams.pop(player_id, None)) and not stream.done:
+ await stream.stop()
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY command to given player."""
group_player = self.mass.players.get(player_id)
if not player_id.startswith(SYNCGROUP_PREFIX):
# this shouldn't happen, but just in case
- raise UnsupportedFeaturedException("Command is not supported for UGP players")
+ raise UnsupportedFeaturedException
# forward command to sync leader
if sync_leader := self._get_sync_leader(group_player):
- await self.mass.players.cmd_play(sync_leader.player_id, skip_redirect=True)
+ if player_provider := self.mass.get_provider(sync_leader.provider):
+ await player_provider.cmd_play(sync_leader.player_id)
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player."""
group_player = self.mass.players.get(player_id)
if not player_id.startswith(SYNCGROUP_PREFIX):
- raise UnsupportedFeaturedException("Command is not supported for UGP players")
+ # this shouldn't happen, but just in case
+ raise UnsupportedFeaturedException
# forward command to sync leader
if sync_leader := self._get_sync_leader(group_player):
- await self.mass.players.cmd_pause(sync_leader.player_id, skip_redirect=True)
+ if player_provider := self.mass.get_provider(sync_leader.provider):
+ await player_provider.cmd_pause(sync_leader.player_id)
async def cmd_power(self, player_id: str, powered: bool) -> None:
"""Handle POWER command to group player."""
and child_player.enabled
}
- async with TaskManager(self.mass) as tg:
- if powered:
- # handle TURN_ON of the group player by turning on all members
- for member in self.mass.players.iter_group_members(
- group_player, only_powered=False, active_only=False
+ if powered:
+ # handle TURN_ON of the group player by turning on all members
+ for member in self.mass.players.iter_group_members(
+ group_player, only_powered=False, active_only=False
+ ):
+ player_provider = self.mass.get_provider(member.provider)
+ assert player_provider # for typing
+ if (
+ member.state in (PlayerState.PLAYING, PlayerState.PAUSED)
+ and member.active_source != group_player.active_source
):
- if (
- member.state in (PlayerState.PLAYING, PlayerState.PAUSED)
- and member.active_source != group_player.active_source
- ):
- # stop playing existing content on member if we start the group player
- tg.create_task(
- self.mass.players.cmd_stop(member.player_id, skip_redirect=True)
- )
- if not member.powered:
- tg.create_task(
- self.mass.players.cmd_power(member.player_id, True, skip_redirect=True)
- )
- # set active source to group player if the group (is going to be) powered
- member.active_group = group_player.player_id
- member.active_source = group_player.active_source
- else:
+ # stop playing existing content on member if we start the group player
+ await player_provider.cmd_stop(member.player_id)
+ if not member.powered:
+ member.active_group = None # needed to prevent race conditions
+ await self.mass.players.cmd_power(member.player_id, True)
+ # set active source to group player if the group (is going to be) powered
+ member.active_group = group_player.player_id
+ member.active_source = group_player.active_source
+ else:
+ # handle TURN_OFF of the group player by turning off all members
+ for member in self.mass.players.iter_group_members(
+ group_player, only_powered=True, active_only=True
+ ):
+ # reset active group on player when the group is turned off
+ member.active_group = None
+ member.active_source = None
# handle TURN_OFF of the group player by turning off all members
- for member in self.mass.players.iter_group_members(
- group_player, only_powered=True, active_only=True
- ):
- # reset active group on player when the group is turned off
- member.active_group = None
- member.active_source = None
- # handle TURN_OFF of the group player by turning off all members
- if member.powered:
- tg.create_task(
- self.mass.players.cmd_power(member.player_id, False, skip_redirect=True)
- )
+ if member.powered:
+ await self.mass.players.cmd_power(member.player_id, False)
+
if powered and player_id.startswith(SYNCGROUP_PREFIX):
await self._sync_syncgroup(group_player)
# optimistically set the group state
# handle play_media for sync group
if player_id.startswith(SYNCGROUP_PREFIX):
# simply forward the command to the sync leader
- if sync_leader := self._select_sync_leader(group_player):
- await self.mass.players.play_media(
- sync_leader.player_id, media=media, skip_redirect=True
- )
+ sync_leader = self._select_sync_leader(group_player)
+ assert sync_leader # for typing
+ player_provider = self.mass.get_provider(sync_leader.provider)
+ assert player_provider # for typing
+ await player_provider.play_media(
+ sync_leader.player_id,
+ media=media,
+ )
return
# handle play_media for UGP group
for member in self.mass.players.iter_group_members(
group_player, only_powered=True, active_only=True
):
+ player_provider = self.mass.get_provider(member.provider)
+ assert player_provider # for typing
tg.create_task(
- self.mass.players.play_media(
+ player_provider.play_media(
member.player_id,
media=PlayerMedia(
uri=f"{base_url}?player_id={member.player_id}",
title=group_player.display_name,
queue_id=group_player.player_id,
),
- skip_redirect=True,
)
)
return
if group_player.powered:
# edge case: the group player is powered and being removed
- for member in self.mass.players.iter_group_members(group_player, only_powered=True):
- member.active_group = None
- if member.state == PlayerState.IDLE:
- continue
- if member.synced_to:
- continue
- await self.mass.players.cmd_stop(member.player_id, skip_redirect=True)
+ # make sure to turn it off first (which will also unsync a syncgroup)
+ await self.cmd_power(player_id, False)
async def _register_all_players(self) -> None:
"""Register all (virtual/fake) group players in the Player controller."""
sync_leader = self._select_sync_leader(group_player)
members_to_sync: list[str] = []
for member in self.mass.players.iter_group_members(group_player, active_only=False):
+ if member.synced_to and member.synced_to != sync_leader.player_id:
+ # unsync first
+ await self.mass.players.cmd_unsync(member.player_id)
if sync_leader.player_id == member.player_id:
# skip sync leader
continue
if member.synced_to == sync_leader.player_id:
# already synced
continue
- if member.synced_to and member.synced_to != sync_leader.player_id:
- # unsync first
- await self.mass.players.cmd_unsync(member.player_id)
members_to_sync.append(member.player_id)
if members_to_sync:
await self.mass.players.cmd_sync_many(sync_leader.player_id, members_to_sync)
)
else:
# make sure that the player manager gets an update
- self.mass.players.update(child_player.player_id, skip_redirect=True)
- self.mass.players.update(parent_player.player_id, skip_redirect=True)
+ self.mass.players.update(child_player.player_id, skip_forward=True)
+ self.mass.players.update(parent_player.player_id, skip_forward=True)
async def cmd_unsync(self, player_id: str) -> None:
"""Handle UNSYNC command for given player.
if slimclient := self.slimproto.get_player(player_id):
await slimclient.stop()
# make sure that the player manager gets an update
- self.mass.players.update(player.player_id, skip_redirect=True)
- self.mass.players.update(group_leader.player_id, skip_redirect=True)
+ self.mass.players.update(player.player_id, skip_forward=True)
+ self.mass.players.update(group_leader.player_id, skip_forward=True)
def _client_callback(
self,
await self._get_snapgroup(player_id).set_stream("default")
await self.cmd_stop(player_id=player_id)
# make sure that the player manager gets an update
- self.mass.players.update(player_id, skip_redirect=True)
- self.mass.players.update(mass_player.synced_to, skip_redirect=True)
+ self.mass.players.update(player_id, skip_forward=True)
+ self.mass.players.update(mass_player.synced_to, skip_forward=True)
async def play_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA on given player."""
self.mass_player.active_source = airplay.active_source
self.mass_player.elapsed_time = airplay.elapsed_time
self.mass_player.elapsed_time_last_updated = airplay.elapsed_time_last_updated
+ # mark 'next_previous' feature as unsupported when airplay mode is active
+ if PlayerFeature.NEXT_PREVIOUS in self.mass_player.supported_features:
+ self.mass_player.supported_features = (
+ x
+ for x in self.mass_player.supported_features
+ if x != PlayerFeature.NEXT_PREVIOUS
+ )
return
+ # ensure 'next_previous' feature is supported when airplay mode is not active
+ if PlayerFeature.NEXT_PREVIOUS not in self.mass_player.supported_features:
+ self.mass_player.supported_features = (
+ *self.mass_player.supported_features,
+ PlayerFeature.NEXT_PREVIOUS,
+ )
# map playback state
self.mass_player.state = PLAYBACK_STATE_MAP[active_group.playback_state]
media.uri, {"name": media.title, "type": "track"}
)
+ async def cmd_next(self, player_id: str) -> None:
+ """Handle NEXT TRACK command for given player."""
+ if sonos_player := self.sonos_players[player_id]:
+ await sonos_player.client.player.group.skip_to_next_track()
+
+ async def cmd_previous(self, player_id: str) -> None:
+ """Handle PREVIOUS TRACK command for given player."""
+ if sonos_player := self.sonos_players[player_id]:
+ await sonos_player.client.player.group.skip_to_previous_track()
+
async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle enqueuing of the next queue item on the player."""
sonos_player = self.sonos_players[player_id]