From 7ea8b60839706564b09f231850bed4f499d814c7 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Fri, 20 Feb 2026 14:05:22 +0100 Subject: [PATCH] Fix: remove rate limiting from single item streams --- music_assistant/constants.py | 1 + .../controllers/players/controller.py | 105 +++++++++++++++++- .../controllers/players/protocol_linking.py | 9 +- .../controllers/streams/streams_controller.py | 21 +--- music_assistant/models/player.py | 34 ++++++ .../providers/airplay/stream_session.py | 2 + 6 files changed, 148 insertions(+), 24 deletions(-) diff --git a/music_assistant/constants.py b/music_assistant/constants.py index 276a9a37..761ca953 100644 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -932,6 +932,7 @@ ATTR_ELAPSED_TIME: Final[str] = "elapsed_time" ATTR_ENABLED: Final[str] = "enabled" ATTR_AVAILABLE: Final[str] = "available" ATTR_MUTE_LOCK: Final[str] = "mute_lock" +ATTR_ACTIVE_SOURCE: Final[str] = "active_source" # Album type detection patterns LIVE_INDICATORS = [ diff --git a/music_assistant/controllers/players/controller.py b/music_assistant/controllers/players/controller.py index 26536a5b..ed33de88 100644 --- a/music_assistant/controllers/players/controller.py +++ b/music_assistant/controllers/players/controller.py @@ -51,6 +51,7 @@ from music_assistant_models.player_control import PlayerControl # noqa: TC002 from music_assistant.constants import ( ANNOUNCE_ALERT_FILE, + ATTR_ACTIVE_SOURCE, ATTR_ANNOUNCEMENT_IN_PROGRESS, ATTR_AVAILABLE, ATTR_ELAPSED_TIME, @@ -863,7 +864,6 @@ class PlayerController(ProtocolLinkingMixin, CoreController): player, required_feature=PlayerFeature.PLAY_ANNOUNCEMENT, require_active=False, - allow_native=True, ): native_announce_support = True else: @@ -1559,6 +1559,12 @@ class PlayerController(ProtocolLinkingMixin, CoreController): if removed_player := self.get_player(_removed_player_id): removed_player.update_state() + # Handle external source takeover - detect when active_source changes to + # something external while we have a grouped protocol active + if ATTR_ACTIVE_SOURCE in changed_values: + prev_source, new_source = changed_values[ATTR_ACTIVE_SOURCE] + self._handle_external_source_takeover(player, prev_source, new_source) + became_inactive = False if ATTR_AVAILABLE in changed_values: became_inactive = changed_values[ATTR_AVAILABLE][1] is False @@ -2268,6 +2274,87 @@ class PlayerController(ProtocolLinkingMixin, CoreController): # - the leader has DSP enabled self.mass.create_task(self.mass.players.on_player_dsp_change(player.player_id)) + def _handle_external_source_takeover( + self, player: Player, prev_source: str | None, new_source: str | None + ) -> None: + """ + Handle when an external source takes over playback on a player. + + When a player has an active grouped output protocol (e.g., AirPlay group) and + an external source (e.g., Spotify Connect, TV input) takes over playback, + we need to clear the active output protocol and ungroup the protocol players. + + This prevents the situation where the player appears grouped via protocol + but is actually playing from a different source. + + :param player: The player whose active_source changed. + :param prev_source: The previous active_source value. + :param new_source: The new active_source value. + """ + # Only relevant for non-protocol players + if player.type == PlayerType.PROTOCOL: + return + + # Only relevant if we have an active output protocol (not native) + if not player.active_output_protocol or player.active_output_protocol == "native": + return + + # Check if new source is external (not MA-managed) + if self._is_ma_managed_source(player, new_source): + return + + # Get the active protocol player + protocol_player = self.get_player(player.active_output_protocol) + if not protocol_player: + return + + # Only relevant if the protocol is grouped + if not self._is_protocol_grouped(protocol_player): + return + + # External source took over while protocol was grouped - unbond + self.logger.info( + "External source '%s' took over on %s while grouped via protocol %s - " + "clearing active output protocol and ungrouping", + new_source, + player.display_name, + protocol_player.provider.domain, + ) + + # Clear active output protocol + player.set_active_output_protocol(None) + + # Ungroup the protocol player (async task) + self.mass.create_task(protocol_player.ungroup()) + + def _is_ma_managed_source(self, player: Player, source: str | None) -> bool: + """ + Check if a source is managed by Music Assistant. + + MA-managed sources include: + - None (no source active) + - The player's own ID (MA queue) + - Any active queue ID + - Any plugin source ID + + :param player: The player to check. + :param source: The source ID to check. + :return: True if the source is MA-managed, False if external. + """ + if source is None: + return True + + # Player's own ID means MA queue is active + if source == player.player_id: + return True + + # Check if it's a known queue ID + if self.mass.player_queues.get(source): + return True + + # Check if it's a plugin source + return any(plugin_source.id == source for plugin_source in self.get_plugin_sources()) + def _schedule_update_all_players(self, delay: float = 2.0) -> None: """ Schedule a debounced update of all players' state. @@ -2661,13 +2748,15 @@ class PlayerController(ProtocolLinkingMixin, CoreController): player = self.get_player(player_id, raise_unavailable=True) assert player is not None if target_player := self._get_control_target( - player, required_feature=PlayerFeature.ENQUEUE, require_active=True, allow_native=False + player, + required_feature=PlayerFeature.ENQUEUE, + require_active=True, ): self.logger.debug( "Redirecting enqueue command to protocol player %s", target_player.provider.manifest.name, ) - await self._handle_enqueue_next_media(target_player.player_id, media) + await target_player.enqueue_next_media(media) return if PlayerFeature.ENQUEUE not in player.state.supported_features: @@ -2782,7 +2871,9 @@ class PlayerController(ProtocolLinkingMixin, CoreController): ) raise PlayerCommandFailed(msg) # Delegate to active protocol player if one is active - if target_player := self._get_control_target(player, PlayerFeature.PAUSE, True): + if target_player := self._get_control_target( + player, PlayerFeature.PAUSE, require_active=True + ): await target_player.play() return @@ -2840,7 +2931,11 @@ class PlayerController(ProtocolLinkingMixin, CoreController): ) raise PlayerCommandFailed(msg) # Delegate to active protocol player if one is active - if not (target_player := self._get_control_target(player, PlayerFeature.PAUSE, True)): + if not ( + target_player := self._get_control_target( + player, PlayerFeature.PAUSE, require_active=True + ) + ): # if player(protocol) does not support pause, we need to send stop self.logger.debug( "Player/protocol %s does not support pause, using STOP instead", diff --git a/music_assistant/controllers/players/protocol_linking.py b/music_assistant/controllers/players/protocol_linking.py index 37020bbb..2cb4d808 100644 --- a/music_assistant/controllers/players/protocol_linking.py +++ b/music_assistant/controllers/players/protocol_linking.py @@ -959,7 +959,6 @@ class ProtocolLinkingMixin: player: Player, required_feature: PlayerFeature, require_active: bool = False, - allow_native: bool = True, ) -> Player | None: """ Get the best player(protocol) to send control commands to. @@ -977,13 +976,19 @@ class ProtocolLinkingMixin: return protocol_player # if the player natively supports the required feature, use that - if allow_native and required_feature in player.supported_features: + if ( + player.active_output_protocol == "native" + and required_feature in player.supported_features + ): return player # If require_active is set, and no active protocol found, return None if require_active: return None + # if the player natively supports the required feature, use that + if required_feature in player.supported_features: + return player # Otherwise, use the first available linked protocol for linked in player.linked_output_protocols: if ( diff --git a/music_assistant/controllers/streams/streams_controller.py b/music_assistant/controllers/streams/streams_controller.py index beb6d596..67e7e251 100644 --- a/music_assistant/controllers/streams/streams_controller.py +++ b/music_assistant/controllers/streams/streams_controller.py @@ -514,27 +514,15 @@ class StreamsController(CoreController): ) else: # no crossfade, just a regular single item stream - audio_input = buffered( - self.get_queue_item_stream( - queue_item=queue_item, - pcm_format=pcm_format, - seek_position=queue_item.streamdetails.seek_position, - ), - buffer_size=10, - min_buffer_before_yield=2, + audio_input = self.get_queue_item_stream( + queue_item=queue_item, + pcm_format=pcm_format, + seek_position=queue_item.streamdetails.seek_position, ) # stream the audio # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into # the desired output format for the player including any player specific filter params # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats - if queue_item.media_type == MediaType.RADIO: - # keep very short buffer for radio streams - # to keep them (more or less) realtime and prevent time outs - read_rate_input_args = ["-readrate", "1.0", "-readrate_initial_burst", "2"] - else: - # just allow the player to buffer whatever it wants for single item streams - read_rate_input_args = None - first_chunk_received = False bytes_sent = 0 async for chunk in get_ffmpeg_stream( @@ -547,7 +535,6 @@ class StreamsController(CoreController): input_format=pcm_format, output_format=output_format, ), - extra_input_args=read_rate_input_args, ): try: await resp.write(chunk) diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index e4c31a29..bb44dad6 100644 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -1738,6 +1738,12 @@ class Player(ABC): if self.type == PlayerType.PROTOCOL: return result + # Scenario 2: External source is active - don't include protocol-based grouping + # When an external source (e.g., Spotify Connect, TV) is active, grouping via + # protocols (AirPlay, Sendspin, etc.) wouldn't work - only native grouping is available. + if self._has_external_source_active(): + return result + # Translate can_group_with from active linked protocol(s) and add to result for linked in self.__attr_linked_protocols: if protocol_player := self.mass.players.get_player(linked.output_protocol_id): @@ -1809,6 +1815,34 @@ class Player(ABC): result.add(parent_player) return result + @final + def _has_external_source_active(self) -> bool: + """ + Check if an external (non-MA-managed) source is currently active. + + External sources include things like Spotify Connect, TV input, etc. + When an external source is active, protocol-based grouping is not available. + + :return: True if an external source is active, False otherwise. + """ + active_source = self.__final_active_source + if active_source is None: + return False + + # Player's own ID means MA queue is (or was) active + if active_source == self.player_id: + return False + + # Check if it's a known queue ID + if self.mass.player_queues.get(active_source): + return False + + # Check if it's a plugin source - if not, it's an external source + return not any( + plugin_source.id == active_source + for plugin_source in self.mass.players.get_plugin_sources() + ) + @final def _expand_can_group_with(self) -> set[Player]: """ diff --git a/music_assistant/providers/airplay/stream_session.py b/music_assistant/providers/airplay/stream_session.py index e7984d32..c5a2f99b 100644 --- a/music_assistant/providers/airplay/stream_session.py +++ b/music_assistant/providers/airplay/stream_session.py @@ -9,6 +9,7 @@ from collections.abc import AsyncGenerator from contextlib import suppress from typing import TYPE_CHECKING +from music_assistant_models.enums import PlaybackState from music_assistant_models.errors import PlayerCommandFailed from music_assistant.constants import CONF_SYNC_ADJUST @@ -105,6 +106,7 @@ class AirPlayStreamSession: return self.sync_clients.remove(airplay_player) await self.stop_client(airplay_player) + airplay_player.set_state_from_stream(PlaybackState.IDLE) # If this was the last client, stop the session if not self.sync_clients: await self.stop() -- 2.34.1