Fix: remove rate limiting from single item streams
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 20 Feb 2026 13:05:22 +0000 (14:05 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 20 Feb 2026 13:05:22 +0000 (14:05 +0100)
music_assistant/constants.py
music_assistant/controllers/players/controller.py
music_assistant/controllers/players/protocol_linking.py
music_assistant/controllers/streams/streams_controller.py
music_assistant/models/player.py
music_assistant/providers/airplay/stream_session.py

index 276a9a37f7690881dd78ff9c9c403d03bd842397..761ca953bebd8d05e22219d659b70eb695d4ae6a 100644 (file)
@@ -932,6 +932,7 @@ ATTR_ELAPSED_TIME: Final[str] = "elapsed_time"
 ATTR_ENABLED: Final[str] = "enabled"
 ATTR_AVAILABLE: Final[str] = "available"
 ATTR_MUTE_LOCK: Final[str] = "mute_lock"
+ATTR_ACTIVE_SOURCE: Final[str] = "active_source"
 
 # Album type detection patterns
 LIVE_INDICATORS = [
index 26536a5b6097f8a3ff6ddd08a9a978536d2448b9..ed33de88f002cd5cb1a7ee1c70b6947dc01a931c 100644 (file)
@@ -51,6 +51,7 @@ from music_assistant_models.player_control import PlayerControl  # noqa: TC002
 
 from music_assistant.constants import (
     ANNOUNCE_ALERT_FILE,
+    ATTR_ACTIVE_SOURCE,
     ATTR_ANNOUNCEMENT_IN_PROGRESS,
     ATTR_AVAILABLE,
     ATTR_ELAPSED_TIME,
@@ -863,7 +864,6 @@ class PlayerController(ProtocolLinkingMixin, CoreController):
                 player,
                 required_feature=PlayerFeature.PLAY_ANNOUNCEMENT,
                 require_active=False,
-                allow_native=True,
             ):
                 native_announce_support = True
             else:
@@ -1559,6 +1559,12 @@ class PlayerController(ProtocolLinkingMixin, CoreController):
                 if removed_player := self.get_player(_removed_player_id):
                     removed_player.update_state()
 
+        # Handle external source takeover - detect when active_source changes to
+        # something external while we have a grouped protocol active
+        if ATTR_ACTIVE_SOURCE in changed_values:
+            prev_source, new_source = changed_values[ATTR_ACTIVE_SOURCE]
+            self._handle_external_source_takeover(player, prev_source, new_source)
+
         became_inactive = False
         if ATTR_AVAILABLE in changed_values:
             became_inactive = changed_values[ATTR_AVAILABLE][1] is False
@@ -2268,6 +2274,87 @@ class PlayerController(ProtocolLinkingMixin, CoreController):
             # - the leader has DSP enabled
             self.mass.create_task(self.mass.players.on_player_dsp_change(player.player_id))
 
+    def _handle_external_source_takeover(
+        self, player: Player, prev_source: str | None, new_source: str | None
+    ) -> None:
+        """
+        Handle when an external source takes over playback on a player.
+
+        When a player has an active grouped output protocol (e.g., AirPlay group) and
+        an external source (e.g., Spotify Connect, TV input) takes over playback,
+        we need to clear the active output protocol and ungroup the protocol players.
+
+        This prevents the situation where the player appears grouped via protocol
+        but is actually playing from a different source.
+
+        :param player: The player whose active_source changed.
+        :param prev_source: The previous active_source value.
+        :param new_source: The new active_source value.
+        """
+        # Only relevant for non-protocol players
+        if player.type == PlayerType.PROTOCOL:
+            return
+
+        # Only relevant if we have an active output protocol (not native)
+        if not player.active_output_protocol or player.active_output_protocol == "native":
+            return
+
+        # Check if new source is external (not MA-managed)
+        if self._is_ma_managed_source(player, new_source):
+            return
+
+        # Get the active protocol player
+        protocol_player = self.get_player(player.active_output_protocol)
+        if not protocol_player:
+            return
+
+        # Only relevant if the protocol is grouped
+        if not self._is_protocol_grouped(protocol_player):
+            return
+
+        # External source took over while protocol was grouped - unbond
+        self.logger.info(
+            "External source '%s' took over on %s while grouped via protocol %s - "
+            "clearing active output protocol and ungrouping",
+            new_source,
+            player.display_name,
+            protocol_player.provider.domain,
+        )
+
+        # Clear active output protocol
+        player.set_active_output_protocol(None)
+
+        # Ungroup the protocol player (async task)
+        self.mass.create_task(protocol_player.ungroup())
+
+    def _is_ma_managed_source(self, player: Player, source: str | None) -> bool:
+        """
+        Check if a source is managed by Music Assistant.
+
+        MA-managed sources include:
+        - None (no source active)
+        - The player's own ID (MA queue)
+        - Any active queue ID
+        - Any plugin source ID
+
+        :param player: The player to check.
+        :param source: The source ID to check.
+        :return: True if the source is MA-managed, False if external.
+        """
+        if source is None:
+            return True
+
+        # Player's own ID means MA queue is active
+        if source == player.player_id:
+            return True
+
+        # Check if it's a known queue ID
+        if self.mass.player_queues.get(source):
+            return True
+
+        # Check if it's a plugin source
+        return any(plugin_source.id == source for plugin_source in self.get_plugin_sources())
+
     def _schedule_update_all_players(self, delay: float = 2.0) -> None:
         """
         Schedule a debounced update of all players' state.
@@ -2661,13 +2748,15 @@ class PlayerController(ProtocolLinkingMixin, CoreController):
         player = self.get_player(player_id, raise_unavailable=True)
         assert player is not None
         if target_player := self._get_control_target(
-            player, required_feature=PlayerFeature.ENQUEUE, require_active=True, allow_native=False
+            player,
+            required_feature=PlayerFeature.ENQUEUE,
+            require_active=True,
         ):
             self.logger.debug(
                 "Redirecting enqueue command to protocol player %s",
                 target_player.provider.manifest.name,
             )
-            await self._handle_enqueue_next_media(target_player.player_id, media)
+            await target_player.enqueue_next_media(media)
             return
 
         if PlayerFeature.ENQUEUE not in player.state.supported_features:
@@ -2782,7 +2871,9 @@ class PlayerController(ProtocolLinkingMixin, CoreController):
                 )
                 raise PlayerCommandFailed(msg)
             # Delegate to active protocol player if one is active
-            if target_player := self._get_control_target(player, PlayerFeature.PAUSE, True):
+            if target_player := self._get_control_target(
+                player, PlayerFeature.PAUSE, require_active=True
+            ):
                 await target_player.play()
                 return
 
@@ -2840,7 +2931,11 @@ class PlayerController(ProtocolLinkingMixin, CoreController):
             )
             raise PlayerCommandFailed(msg)
         # Delegate to active protocol player if one is active
-        if not (target_player := self._get_control_target(player, PlayerFeature.PAUSE, True)):
+        if not (
+            target_player := self._get_control_target(
+                player, PlayerFeature.PAUSE, require_active=True
+            )
+        ):
             # if player(protocol) does not support pause, we need to send stop
             self.logger.debug(
                 "Player/protocol %s does not support pause, using STOP instead",
index 37020bbb78240ae8ec7a8ee90188882e75f1ccb0..2cb4d808715ebb6ec8928af419ff5a08c7bebbbf 100644 (file)
@@ -959,7 +959,6 @@ class ProtocolLinkingMixin:
         player: Player,
         required_feature: PlayerFeature,
         require_active: bool = False,
-        allow_native: bool = True,
     ) -> Player | None:
         """
         Get the best player(protocol) to send control commands to.
@@ -977,13 +976,19 @@ class ProtocolLinkingMixin:
             return protocol_player
 
         # if the player natively supports the required feature, use that
-        if allow_native and required_feature in player.supported_features:
+        if (
+            player.active_output_protocol == "native"
+            and required_feature in player.supported_features
+        ):
             return player
 
         # If require_active is set, and no active protocol found, return None
         if require_active:
             return None
 
+        # if the player natively supports the required feature, use that
+        if required_feature in player.supported_features:
+            return player
         # Otherwise, use the first available linked protocol
         for linked in player.linked_output_protocols:
             if (
index beb6d596f01faa4efd97f8a14a9a9c8022d26818..67e7e251ce926a04baba37d43dc1eba8be2529fb 100644 (file)
@@ -514,27 +514,15 @@ class StreamsController(CoreController):
             )
         else:
             # no crossfade, just a regular single item stream
-            audio_input = buffered(
-                self.get_queue_item_stream(
-                    queue_item=queue_item,
-                    pcm_format=pcm_format,
-                    seek_position=queue_item.streamdetails.seek_position,
-                ),
-                buffer_size=10,
-                min_buffer_before_yield=2,
+            audio_input = self.get_queue_item_stream(
+                queue_item=queue_item,
+                pcm_format=pcm_format,
+                seek_position=queue_item.streamdetails.seek_position,
             )
         # stream the audio
         # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
         # the desired output format for the player including any player specific filter params
         # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
-        if queue_item.media_type == MediaType.RADIO:
-            # keep very short buffer for radio streams
-            # to keep them (more or less) realtime and prevent time outs
-            read_rate_input_args = ["-readrate", "1.0", "-readrate_initial_burst", "2"]
-        else:
-            # just allow the player to buffer whatever it wants for single item streams
-            read_rate_input_args = None
-
         first_chunk_received = False
         bytes_sent = 0
         async for chunk in get_ffmpeg_stream(
@@ -547,7 +535,6 @@ class StreamsController(CoreController):
                 input_format=pcm_format,
                 output_format=output_format,
             ),
-            extra_input_args=read_rate_input_args,
         ):
             try:
                 await resp.write(chunk)
index e4c31a29086726c69e73cd57463966ea1779570e..bb44dad634f72c7f65add9e04a5dc44d2633c1d5 100644 (file)
@@ -1738,6 +1738,12 @@ class Player(ABC):
         if self.type == PlayerType.PROTOCOL:
             return result
 
+        # Scenario 2: External source is active - don't include protocol-based grouping
+        # When an external source (e.g., Spotify Connect, TV) is active, grouping via
+        # protocols (AirPlay, Sendspin, etc.) wouldn't work - only native grouping is available.
+        if self._has_external_source_active():
+            return result
+
         # Translate can_group_with from active linked protocol(s) and add to result
         for linked in self.__attr_linked_protocols:
             if protocol_player := self.mass.players.get_player(linked.output_protocol_id):
@@ -1809,6 +1815,34 @@ class Player(ABC):
             result.add(parent_player)
         return result
 
+    @final
+    def _has_external_source_active(self) -> bool:
+        """
+        Check if an external (non-MA-managed) source is currently active.
+
+        External sources include things like Spotify Connect, TV input, etc.
+        When an external source is active, protocol-based grouping is not available.
+
+        :return: True if an external source is active, False otherwise.
+        """
+        active_source = self.__final_active_source
+        if active_source is None:
+            return False
+
+        # Player's own ID means MA queue is (or was) active
+        if active_source == self.player_id:
+            return False
+
+        # Check if it's a known queue ID
+        if self.mass.player_queues.get(active_source):
+            return False
+
+        # Check if it's a plugin source - if not, it's an external source
+        return not any(
+            plugin_source.id == active_source
+            for plugin_source in self.mass.players.get_plugin_sources()
+        )
+
     @final
     def _expand_can_group_with(self) -> set[Player]:
         """
index e7984d3291a60e6fee6ce92cfac91f67f43a3fb6..c5a2f99ba4dd2511be5648cc7681eb60be2d8d60 100644 (file)
@@ -9,6 +9,7 @@ from collections.abc import AsyncGenerator
 from contextlib import suppress
 from typing import TYPE_CHECKING
 
+from music_assistant_models.enums import PlaybackState
 from music_assistant_models.errors import PlayerCommandFailed
 
 from music_assistant.constants import CONF_SYNC_ADJUST
@@ -105,6 +106,7 @@ class AirPlayStreamSession:
                 return
             self.sync_clients.remove(airplay_player)
         await self.stop_client(airplay_player)
+        airplay_player.set_state_from_stream(PlaybackState.IDLE)
         # If this was the last client, stop the session
         if not self.sync_clients:
             await self.stop()