"""Return (join_pending_ids, active_pipelines) under lock."""
async with self._state_lock:
members = self._members
+ leader_id = self.player.player_id
return set(self._join_catchup), tuple(
- (mid, p) for mid, p in self._member_pipelines.items() if mid in members
+ (mid, p)
+ for mid, p in self._member_pipelines.items()
+ if mid in members or mid == leader_id
)
# -- Public API ------------------------------------------------------------
if player_id in self._members:
self.pending_join_members.discard(player_id)
return
- # Force a fresh channel identity for every new join cycle.
- self._preassigned_channels[player_id] = uuid4()
+ # Preserve any channel pre-resolved during add_client so join-time
+ # role requirements and prepared audio stay on the same channel.
+ self._preassigned_channels.setdefault(player_id, uuid4())
self.pending_join_members.add(player_id)
try:
await self._start_join_catchup(player_id)
self._first_commit_monotonic_us = None
self._produced_audio_us = 0
self._history.clear()
+ # Drop cached DSP decisions so next playback reflects latest config.
+ self._pipeline_config_cache.clear()
+ # Only emit a group STOP when MA stream playback reached natural EOF.
+ # Skip this on cancellation/error paths to avoid stop-event races with transitions.
+ if producer_stopped_cleanly:
+ with suppress(Exception):
+ await self.player.api.group.stop()
# -- Join injection --------------------------------------------------------
self._mapping_dirty = False
for member_id in member_ids:
await self._sync_member_pipeline(member_id)
+ # Keep leader pipeline in sync so leader DSP can be applied when required.
+ await self._sync_member_pipeline(self.player.player_id)
async def _sync_member_pipeline(self, player_id: str) -> _MemberPipeline:
"""Create/update pipeline state for one member from current MA config."""
pipeline = self._member_pipelines.get(player_id)
if pipeline is not None:
return pipeline.channel_id
- # The leader always receives MAIN_CHANNEL audio directly from the
- # commit loop; only group members get per-player DSP channels.
- if player_id == self.player.player_id:
- return MAIN_CHANNEL
# Force a fresh config read for pending/unknown joiners so the very
# first resolution (triggered by add_client) uses up-to-date DSP settings.
- force = player_id not in self._members
+ force = player_id not in self._members and player_id != self.player.player_id
config = self._get_pipeline_config_cached(player_id, force_refresh=force)
if not config.requires_transform:
return MAIN_CHANNEL
async def _handle_group_member_removed(self, group: SendspinGroup, client_id: str) -> None:
"""Handle a group member being removed asynchronously."""
if client_id == self.player_id:
- if len(group.clients) > 0:
- # We were just removed as a leader:
- # 1. stop playback on the old group
+ was_leader = (
+ bool(self._attr_group_members) and self._attr_group_members[0] == self.player_id
+ )
+ if was_leader and len(group.clients) > 0:
+ # We were removed as the group leader:
+ # stop playback on the old group before we continue as solo.
await group.stop()
- # 2. clear our members (since we are now alone in a new group)
- self._attr_group_members = []
+ elif not was_leader:
+ self.logger.debug(
+ "Player %s removed from group as non-leader; keeping old group playing",
+ self.player_id,
+ )
+ # Clear members for our detached/solo state.
+ self._attr_group_members = []
self.update_state()
elif client_id in self._attr_group_members:
# Someone else left our group
self._attr_elapsed_time_last_updated = time.time()
# playback_state will be set by the group state change event
- # Stop previous stream in case we were already playing something
+ # Stop previous stream in case we were already playing something.
+ # Do not call group.stop() here to avoid STOPPED-event races with next-track transitions.
await self.playback_session.cancel("new media requested")
- await self.api.group.stop()
await self.playback_session.start(media)
self.update_state()