# all attempts to find a playable item failed
raise MediaNotFoundError("No playable item found to start playback")
- # Select and set the output protocol before evaluating flow_mode,
- # since flow_mode depends on the active output protocol
- self.mass.players.select_output_protocol(queue_id)
- # work out if we need to use flow mode
- flow_mode = target_player.flow_mode and queue_item.media_type not in (
- # don't use flow mode for duration-less streams
- MediaType.RADIO,
- MediaType.PLUGIN_SOURCE,
- )
+ # Reset flow_mode - the streams controller will set it if flow mode is used.
+ queue.flow_mode = False
await asyncio.sleep(0.5 if debounce else 0.1)
- queue.flow_mode = flow_mode
await self.mass.players.play_media(
player_id=queue_id,
- media=await self.player_media_from_queue_item(queue_item, flow_mode),
+ media=await self.player_media_from_queue_item(queue_item),
)
queue.current_index = index
queue.current_item = queue_item
return index
return None
- async def player_media_from_queue_item(
- self, queue_item: QueueItem, flow_mode: bool
- ) -> PlayerMedia:
- """Parse PlayerMedia from QueueItem."""
+ async def player_media_from_queue_item(self, queue_item: QueueItem) -> PlayerMedia:
+ """
+ Parse PlayerMedia from QueueItem.
+
+ :param queue_item: The queue item to create media from.
+ """
queue = self._queues[queue_item.queue_id]
- if flow_mode:
- duration = None
- elif queue_item.streamdetails:
+ if queue_item.streamdetails:
# prefer netto duration
# when seeking, the player only receives the remaining duration
duration = queue_item.streamdetails.duration or queue_item.duration
raise InvalidDataError("Queue session_id is None")
media = PlayerMedia(
uri=queue_item.uri,
- media_type=MediaType.FLOW_STREAM if flow_mode else queue_item.media_type,
- title="Music Assistant" if flow_mode else queue_item.name,
+ media_type=queue_item.media_type,
+ title=queue_item.name,
image_url=MASS_LOGO_ONLINE,
duration=duration,
source_id=queue_item.queue_id,
custom_data={
"session_id": queue.session_id,
"original_uri": queue_item.uri,
- "flow_mode": flow_mode,
},
)
- if not flow_mode and queue_item.media_item:
+ if queue_item.media_item:
media.title = queue_item.media_item.name
media.artist = getattr(queue_item.media_item, "artist_str", "")
media.album = (
async def _enqueue_next_item_on_player(next_item: QueueItem) -> None:
await self.mass.players.enqueue_next_media(
player_id=queue_id,
- media=await self.player_media_from_queue_item(next_item, False),
+ media=await self.player_media_from_queue_item(next_item),
)
if queue.next_item_id_enqueued != next_item.queue_item_id:
queue.next_item_id_enqueued = next_item.queue_item_id
ProviderUnavailableError,
UnsupportedFeaturedException,
)
-from music_assistant_models.player import OutputProtocol, PlayerOptionValueType # noqa: TC002
+from music_assistant_models.player import PlayerOptionValueType # noqa: TC002
from music_assistant_models.player_control import PlayerControl # noqa: TC002
from music_assistant.constants import (
# Delegate to internal handler for actual implementation
await self._handle_play_media(player.player_id, media)
- def select_output_protocol(self, player_id: str) -> Player:
- """
- Select and set the best output protocol for a player.
-
- This method determines the optimal output protocol for playback and sets it
- on the player. Should be called before evaluating protocol-dependent properties
- like flow_mode.
-
- :param player_id: player_id of the player to select protocol for.
- :return: The target player that will handle playback (may be a protocol player).
- """
- player = self.get_player(player_id, raise_unavailable=True)
- assert player is not None
-
- target_player, output_protocol = self._select_best_output_protocol(player)
-
- if target_player.player_id != player.player_id:
- # Playing via linked protocol
- assert output_protocol is not None
- player.set_active_output_protocol(output_protocol.output_protocol_id)
- else:
- # Native playback
- player.set_active_output_protocol("native")
-
- return target_player
-
@api_command("players/cmd/select_sound_mode")
@handle_player_command
async def select_sound_mode(self, player_id: str, sound_mode: str) -> None:
if media.source_id:
player.set_active_mass_source(media.source_id)
- # Check if active output protocol was already set (e.g., by select_output_protocol)
- # and is still valid. If so, reuse it to avoid re-selecting.
- target_player: Player | None = None
- output_protocol: OutputProtocol | None = None
- if active_protocol_id := player.active_output_protocol:
- if active_protocol_id in ("native", player.player_id):
- target_player = player
- elif protocol_player := self.get_player(active_protocol_id):
- if protocol_player.available:
- target_player = protocol_player
- # Find the matching OutputProtocol
- for linked in player.linked_output_protocols:
- if linked.output_protocol_id == active_protocol_id:
- output_protocol = linked
- break
-
- # If no valid pre-selected protocol, select the best one now
- if target_player is None:
- target_player, output_protocol = self._select_best_output_protocol(player)
+ # Select best output protocol for playback
+ target_player, output_protocol = self._select_best_output_protocol(player)
if target_player.player_id != player.player_id:
# Playing via linked protocol - update active output protocol
player_id: str,
media: PlayerMedia,
) -> str:
- """Resolve the stream URL for the given PlayerMedia."""
- conf_output_codec = await self.mass.config.get_player_config_value(
- player_id, CONF_OUTPUT_CODEC, default="flac", return_type=str
+ """
+ Resolve the stream URL for the given PlayerMedia.
+
+ :param player_id: The (protocol) player ID requesting the stream.
+ :param media: The PlayerMedia object for which to resolve the stream URL.
+ :return: The resolved stream URL as a string.
+ """
+ protocol_player = self.mass.players.get_player(player_id)
+ conf_output_codec = cast(
+ "str",
+ protocol_player.config.get_value(CONF_OUTPUT_CODEC, default="flac")
+ if protocol_player
+ else "flac",
)
- output_codec = ContentType.try_parse(conf_output_codec or "flac")
+ output_codec = ContentType.try_parse(conf_output_codec)
fmt = output_codec.value
# handle raw pcm without exact format specifiers
if output_codec.is_pcm() and ";" not in fmt:
fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
extra_data = media.custom_data or {}
- flow_mode = extra_data.get("flow_mode", False)
session_id = extra_data.get("session_id")
queue_item_id = media.queue_item_id
if not session_id or not queue_item_id:
raise InvalidDataError("Can not resolve stream URL: Invalid PlayerMedia data")
queue_id = media.source_id
+ crossfade_needs_flow_mode = (
+ # if the player(queue) has crossfade enabled but the player(protocol) does not support
+ # gapless playback, we need to enforce flow mode
+ queue_id
+ and (queue_player := self.mass.players.get_player(queue_id))
+ and queue_player.config.get_value(CONF_SMART_FADES_MODE) != SmartFadesMode.DISABLED
+ and protocol_player
+ and not protocol_player.supports_gapless
+ )
+ # Determine flow_mode based on the actual player's capabilities.
+ # This is done here (just-in-time) because the player's protocol determines this
+ flow_mode = (
+ protocol_player is not None
+ and (protocol_player.flow_mode or crossfade_needs_flow_mode)
+ and media.media_type not in (MediaType.RADIO, MediaType.PLUGIN_SOURCE)
+ )
base_path = "flow" if flow_mode else "single"
return f"{self._server.base_url}/{base_path}/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}" # noqa: E501
return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
def get_stream(
- self, media: PlayerMedia, pcm_format: AudioFormat, force_flow_mode: bool = False
+ self,
+ media: PlayerMedia,
+ pcm_format: AudioFormat,
+ player_id: str | None = None,
+ force_flow_mode: bool = False,
) -> AsyncGenerator[bytes, None]:
"""
Get a stream of the given media as raw PCM audio.
This is used as helper for player providers that can consume the raw PCM
audio stream directly (e.g. AirPlay) and not rely on HTTP transport.
+
+ :param media: The PlayerMedia to stream.
+ :param pcm_format: The desired output PCM format.
+ :param player_id: The player ID requesting the stream. Used to determine
+ if flow mode should be used based on the player's capabilities.
+ :param force_flow_mode: Force flow mode regardless of player capabilities.
+ Used for multi-client streaming scenarios that require continuous streams.
"""
# select audio source
if media.media_type == MediaType.ANNOUNCEMENT:
# special case: stream announcement
assert media.custom_data
- audio_source = self.get_announcement_stream(
+ return self.get_announcement_stream(
media.custom_data["announcement_url"],
output_format=pcm_format,
pre_announce=media.custom_data["pre_announce"],
pre_announce_url=media.custom_data["pre_announce_url"],
)
- elif media.media_type == MediaType.PLUGIN_SOURCE:
+ if media.media_type == MediaType.PLUGIN_SOURCE:
# special case: plugin source stream
assert media.custom_data
- audio_source = self.get_plugin_source_stream(
+ return self.get_plugin_source_stream(
plugin_source_id=media.custom_data["source_id"],
output_format=pcm_format,
# need to pass player_id from the PlayerMedia object
# because this could have been a group
player_id=media.custom_data["player_id"],
)
- elif (
- media.media_type == MediaType.FLOW_STREAM
- and media.source_id
+ if (
+ media.source_id
and media.source_id.startswith(UGP_PREFIX)
and media.uri
and "/ugp/" in media.uri
assert ugp_stream is not None # for type checker
if ugp_stream.base_pcm_format == pcm_format:
# no conversion needed
- audio_source = ugp_stream.subscribe_raw()
- else:
- audio_source = ugp_stream.get_stream(output_format=pcm_format)
- elif (
- media.source_id
- and media.queue_item_id
- and (media.media_type == MediaType.FLOW_STREAM or force_flow_mode)
- ):
- # regular queue (flow) stream request
- queue = self.mass.player_queues.get(media.source_id)
- assert queue
- start_queue_item = self.mass.player_queues.get_item(
- media.source_id, media.queue_item_id
+ return ugp_stream.subscribe_raw()
+ return ugp_stream.get_stream(output_format=pcm_format)
+ if media.source_id and media.queue_item_id:
+ # Queue stream request - determine flow_mode based on player capabilities
+ # or force it if explicitly requested (e.g., for multi-client streaming)
+ protocol_player = self.mass.players.get_player(player_id) if player_id else None
+ queue_id = media.source_id
+ crossfade_needs_flow_mode = (
+ # if the player(queue) has crossfade enabled but the player(protocol)
+ # does not support gapless playback, we need to enforce flow mode
+ queue_id
+ and (queue_player := self.mass.players.get_player(queue_id))
+ and queue_player.config.get_value(CONF_SMART_FADES_MODE) != SmartFadesMode.DISABLED
+ and protocol_player
+ and not protocol_player.supports_gapless
)
- assert start_queue_item
- audio_source = self.mass.streams.get_queue_flow_stream(
- queue=queue,
- start_queue_item=start_queue_item,
- pcm_format=pcm_format,
+ flow_mode = (
+ force_flow_mode
+ or (protocol_player is not None and protocol_player.flow_mode)
+ or crossfade_needs_flow_mode
)
- elif media.source_id and media.queue_item_id:
- # single item stream (e.g. radio)
+ if media.media_type == MediaType.RADIO:
+ # flow_mode for radio is pointless
+ flow_mode = False
+ if flow_mode:
+ # flow stream request
+ queue = self.mass.player_queues.get(media.source_id)
+ assert queue
+ start_queue_item = self.mass.player_queues.get_item(
+ media.source_id, media.queue_item_id
+ )
+ assert start_queue_item
+ return self.mass.streams.get_queue_flow_stream(
+ queue=queue,
+ start_queue_item=start_queue_item,
+ pcm_format=pcm_format,
+ )
+ # single item stream (e.g. radio or non-flow mode)
queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
assert queue_item
- audio_source = buffered(
+ return buffered(
self.get_queue_item_stream(
queue_item=queue_item,
pcm_format=pcm_format,
buffer_size=10,
min_buffer_before_yield=2,
)
- else:
- # assume url or some other direct path
- # NOTE: this will fail if its an uri not playable by ffmpeg
- audio_source = get_ffmpeg_stream(
- audio_input=media.uri,
- input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
- output_format=pcm_format,
- )
- return audio_source
+ # assume url or some other direct path
+ # NOTE: this will fail if its an uri not playable by ffmpeg
+ return get_ffmpeg_stream(
+ audio_input=media.uri,
+ input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
+ output_format=pcm_format,
+ )
@use_buffer(buffer_size=30, min_buffer_before_yield=2)
async def get_queue_flow_stream(
CONF_MUTE_CONTROL,
CONF_PLAYERS,
CONF_POWER_CONTROL,
- CONF_SMART_FADES_MODE,
CONF_VOLUME_CONTROL,
PROTOCOL_FEATURES,
PROTOCOL_PRIORITY,
@property
def requires_flow_mode(self) -> bool:
- """
- Return if the player needs flow mode.
-
- Default implementation: True if the player does not support PlayerFeature.ENQUEUE
- or has crossfade enabled without gapless support. Can be overridden by providers if needed.
- """
- if PlayerFeature.ENQUEUE not in self.supported_features:
- # without enqueue support, flow mode is required
- return True
- return (
- # player has crossfade enabled without gapless support - flow mode is required
- PlayerFeature.GAPLESS_PLAYBACK not in self.supported_features
- and str(self._config.get_value(CONF_SMART_FADES_MODE)) != "disabled"
- )
+ """Return if the player needs flow mode for (queue) playback."""
+ # Default implementation: True if the player does not support PlayerFeature.ENQUEUE
+ return PlayerFeature.ENQUEUE not in self.supported_features
@property
def device_info(self) -> DeviceInfo:
@final
def flow_mode(self) -> bool:
"""
- Return if the player needs flow mode.
+ Return if the player(protocol) needs flow mode.
Will use 'requires_flow_mode' unless overridden by flow_mode config.
- Considers the active output protocol's flow_mode if a protocol is active.
"""
- # If an output protocol is active (and not native), use the protocol player's flow_mode
- # The protocol player will handle its own config check
- if (
- self.__attr_active_output_protocol
- and self.__attr_active_output_protocol != "native"
- and (
- protocol_player := self.mass.players.get_player(self.__attr_active_output_protocol)
- )
- ):
- return protocol_player.flow_mode
- # Check native player's config override
+ # Check config override
if bool(self._config.get_value(CONF_FLOW_MODE)) is True:
# flow mode explicitly enabled in config
return True
"""
return self._check_feature_with_active_protocol(PlayerFeature.ENQUEUE)
+ @property
+ @final
+ def supports_gapless(self) -> bool:
+ """
+ Return if the player supports gapless playback.
+
+ This considers the active output protocol's capabilities if one is active.
+ If a protocol player is active, checks that protocol's GAPLESS_PLAYBACK feature.
+ Otherwise checks the native player's GAPLESS_PLAYBACK feature.
+ """
+ return self._check_feature_with_active_protocol(PlayerFeature.GAPLESS_PLAYBACK)
+
@property
@final
def state(self) -> PlayerState:
self.stream = None
# select audio source
- audio_source = self.mass.streams.get_stream(media, AIRPLAY_FLOW_PCM_FORMAT)
+ audio_source = self.mass.streams.get_stream(media, AIRPLAY_FLOW_PCM_FORMAT, self.player_id)
# setup StreamSession for player (and its sync childs if any)
sync_clients = self._get_sync_clients()
async def _produce_pending_chunks() -> None:
nonlocal pending_duration_us
- audio_source = self.player.mass.streams.get_stream(media, _PCM_FORMAT)
+ audio_source = self.player.mass.streams.get_stream(
+ media, _PCM_FORMAT, self.player.player_id
+ )
async for chunk in audio_source:
if not chunk:
continue
DEFAULT_SNAPCAST_FORMAT,
DEFAULT_SNAPCAST_FORMAT,
)
- audio_source = self._mass.streams.get_stream(self.media, DEFAULT_SNAPCAST_FORMAT)
+ audio_source = self._mass.streams.get_stream(
+ self.media, DEFAULT_SNAPCAST_FORMAT, self._filter_settings_owner
+ )
try:
async with FFMpeg(
audio_input=audio_source,
for idx in range(offset, current_index):
if queue_item := self.mass.player_queues.get_item(queue_id, idx):
if queue_item.available:
- media = await self.mass.player_queues.player_media_from_queue_item(
- queue_item, False
- )
+ media = await self.mass.player_queues.player_media_from_queue_item(queue_item)
media.uri = await self.provider.mass.streams.resolve_stream_url(
self.player_id, media
)
# Add the current item
if current_item := self.mass.player_queues.get_item(queue_id, current_index):
if current_item.available:
- media = await self.mass.player_queues.player_media_from_queue_item(
- current_item, False
- )
+ media = await self.mass.player_queues.player_media_from_queue_item(current_item)
media.uri = await self.provider.mass.streams.resolve_stream_url(
self.player_id, media
)
next_item = self.mass.player_queues.get_next_item(queue_id, last_index)
if next_item is None:
break
- media = await self.mass.player_queues.player_media_from_queue_item(next_item, False)
+ media = await self.mass.player_queues.player_media_from_queue_item(next_item)
media.uri = await self.provider.mass.streams.resolve_stream_url(self.player_id, media)
items.append(media)
last_index = next_item.queue_item_id
# select audio source, we force flow mode
# because multi-client streaming does not support enqueueing
audio_source = self.mass.streams.get_stream(
- media, master_audio_format, force_flow_mode=True
+ media, master_audio_format, player_id=self.player_id, force_flow_mode=True
)
# start the stream task
await self.stream.stop()
# select audio source
- audio_source = self.mass.streams.get_stream(media, UGP_FORMAT)
+ audio_source = self.mass.streams.get_stream(media, UGP_FORMAT, self.player_id)
# start the stream task
self.stream = UGPStream(
import logging
import pathlib
from collections.abc import AsyncGenerator
+from unittest.mock import AsyncMock, MagicMock, NonCallableMagicMock, patch
import pytest
+from zeroconf.asyncio import AsyncZeroconf
from music_assistant.controllers.cache import CacheController
from music_assistant.controllers.config import ConfigController
return caplog
+def _create_mock_zeroconf() -> MagicMock:
+ """Create a mock AsyncZeroconf that prevents real network I/O.
+
+ Uses spec=AsyncZeroconf to ensure the mock only has valid attributes,
+ preventing it from being mistakenly registered as an API handler.
+ """
+ mock_zc = MagicMock(spec=AsyncZeroconf)
+ # Set up nested zeroconf object with proper spec
+ mock_inner_zc = NonCallableMagicMock()
+ mock_inner_zc.cache = NonCallableMagicMock()
+ mock_inner_zc.cache.cache = {} # Empty cache - no discovered services
+ mock_zc.zeroconf = mock_inner_zc
+ # Set up async methods
+ mock_zc.async_register_service = AsyncMock()
+ mock_zc.async_update_service = AsyncMock()
+ mock_zc.async_unregister_service = AsyncMock()
+ mock_zc.async_close = AsyncMock()
+ return mock_zc
+
+
@pytest.fixture
async def mass(tmp_path: pathlib.Path) -> AsyncGenerator[MusicAssistant, None]:
"""Start a Music Assistant in test mode.
# work correctly - the settings.json file is created but the config isn't respected.
# For now, tests that use the `mass` fixture will fail if MA is running on port 8095.
- await mass_instance.start()
+ # Mock zeroconf to prevent real network I/O during tests
+ mock_zc = _create_mock_zeroconf()
+ mock_browser = NonCallableMagicMock() # Use NonCallable to avoid api_cmd issues
- try:
- yield mass_instance
- finally:
- await mass_instance.stop()
+ with (
+ patch("music_assistant.mass.AsyncZeroconf", return_value=mock_zc),
+ patch("music_assistant.mass.AsyncServiceBrowser", return_value=mock_browser),
+ ):
+ await mass_instance.start()
+
+ try:
+ yield mass_instance
+ finally:
+ await mass_instance.stop()
@pytest.fixture
from unittest.mock import MagicMock
import pytest
-from music_assistant_models.enums import PlayerFeature, PlayerType
+from music_assistant_models.enums import PlayerFeature
from music_assistant_models.errors import UnsupportedFeaturedException
-from music_assistant_models.player import OutputProtocol
from music_assistant.controllers.players import PlayerController
from music_assistant.helpers.throttle_retry import Throttler
asyncio.run(controller.cmd_set_members("leader", player_ids_to_add=["member"]))
-class TestSelectOutputProtocol:
- """Test select_output_protocol method."""
-
- def test_select_native_playback_when_no_linked_protocols(self, mock_mass: MagicMock) -> None:
- """Test that native playback is selected when player has no linked protocols."""
- controller = PlayerController(mock_mass)
- provider = MockProvider("test_provider", instance_id="test", mass=mock_mass)
-
- player = MockPlayer(provider, "test_player", "Test Player")
- player._attr_supported_features.add(PlayerFeature.PLAY_MEDIA)
-
- controller._players = {"test_player": player}
- controller._player_throttlers = {"test_player": Throttler(1, 0.05)}
- mock_mass.players = controller
-
- player.update_state(signal_event=False)
-
- # Execute select_output_protocol
- target_player = controller.select_output_protocol("test_player")
-
- # Should return the same player (native playback)
- assert target_player.player_id == player.player_id
- # Active protocol should be set to "native"
- assert player.active_output_protocol == "native"
-
- def test_select_preferred_protocol(self, mock_mass: MagicMock) -> None:
- """Test that preferred protocol is selected when configured."""
- controller = PlayerController(mock_mass)
- provider = MockProvider("sonos", instance_id="sonos_instance", mass=mock_mass)
- airplay_provider = MockProvider("airplay", instance_id="airplay_instance", mass=mock_mass)
-
- # Create main player (e.g., Sonos)
- main_player = MockPlayer(provider, "sonos_player", "Sonos Speaker")
- main_player._attr_supported_features.add(PlayerFeature.PLAY_MEDIA)
-
- # Create protocol player (e.g., AirPlay)
- protocol_player = MockPlayer(
- airplay_provider, "airplay_player", "Sonos via AirPlay", PlayerType.PROTOCOL
- )
- protocol_player._attr_supported_features.add(PlayerFeature.PLAY_MEDIA)
- protocol_player._attr_available = True
-
- controller._players = {
- "sonos_player": main_player,
- "airplay_player": protocol_player,
- }
- controller._player_throttlers = {
- "sonos_player": Throttler(1, 0.05),
- "airplay_player": Throttler(1, 0.05),
- }
- mock_mass.players = controller
-
- # Set up linked protocols on main player
- linked_protocol = OutputProtocol(
- output_protocol_id="airplay_player",
- name="AirPlay",
- protocol_domain="airplay",
- is_native=False,
- priority=10,
- available=True,
- )
- main_player.set_linked_output_protocols([linked_protocol])
-
- main_player.update_state(signal_event=False)
- protocol_player.update_state(signal_event=False)
-
- # Configure preferred protocol to be airplay
- mock_mass.config.get_raw_player_config_value = MagicMock(return_value="airplay_player")
-
- # Execute select_output_protocol
- target_player = controller.select_output_protocol("sonos_player")
-
- # Should return the protocol player
- assert target_player.player_id == "airplay_player"
- # Active protocol should be set to the protocol player id
- assert main_player.active_output_protocol == "airplay_player"
-
- def test_select_protocol_sets_active_before_flow_mode_check(self, mock_mass: MagicMock) -> None:
- """
- Test that selecting protocol sets active_output_protocol before flow_mode is checked.
-
- This is the core regression test for the timing issue where flow_mode
- was evaluated before active_output_protocol was set.
- """
- controller = PlayerController(mock_mass)
- provider = MockProvider("sonos", instance_id="sonos_instance", mass=mock_mass)
- airplay_provider = MockProvider("airplay", instance_id="airplay_instance", mass=mock_mass)
-
- # Create main player
- main_player = MockPlayer(provider, "sonos_player", "Sonos Speaker")
- main_player._attr_supported_features.add(PlayerFeature.PLAY_MEDIA)
-
- # Create protocol player that requires flow mode
- protocol_player = MockPlayer(
- airplay_provider, "airplay_player", "Sonos via AirPlay", PlayerType.PROTOCOL
- )
- protocol_player._attr_supported_features.add(PlayerFeature.PLAY_MEDIA)
- # AirPlay typically doesn't support enqueue, so flow mode would be needed
- protocol_player._attr_available = True
-
- controller._players = {
- "sonos_player": main_player,
- "airplay_player": protocol_player,
- }
- controller._player_throttlers = {
- "sonos_player": Throttler(1, 0.05),
- "airplay_player": Throttler(1, 0.05),
- }
- mock_mass.players = controller
-
- # Set up linked protocols
- linked_protocol = OutputProtocol(
- output_protocol_id="airplay_player",
- name="AirPlay",
- protocol_domain="airplay",
- is_native=False,
- priority=10,
- available=True,
- )
- main_player.set_linked_output_protocols([linked_protocol])
-
- main_player.update_state(signal_event=False)
- protocol_player.update_state(signal_event=False)
-
- # Configure preferred protocol
- mock_mass.config.get_raw_player_config_value = MagicMock(return_value="airplay_player")
-
- # Verify active_output_protocol is not set before calling select_output_protocol
- assert main_player.active_output_protocol is None
-
- # Execute select_output_protocol
- controller.select_output_protocol("sonos_player")
-
- # Active protocol should now be set BEFORE any flow_mode check would occur
- assert main_player.active_output_protocol == "airplay_player"
-
- # Now when we check flow_mode, it should correctly consider the protocol player
- # (This verifies the timing fix - flow_mode now uses the active protocol)
- _ = main_player.flow_mode # This should not raise and should use protocol's flow_mode
-
-
if __name__ == "__main__":
pytest.main([__file__, "-v"])