# Redirect to queue controller if it is active
if active_queue := self.get_active_queue(player):
await self.mass.player_queues.stop(active_queue.queue_id)
- return
- # handle command on player directly
- async with self._player_throttlers[player.player_id]:
- await player.stop()
+ else:
+ # handle command on player directly
+ async with self._player_throttlers[player.player_id]:
+ await player.stop()
@api_command("players/cmd/play")
@handle_player_command
await player.play()
return
if active_source and not active_source.passive:
- await player.select_source(active_source.id)
+ await self.select_source(player_id, active_source.id)
return
if media:
# try to re-play the current media item
# power on the player if needed
if player.powered is False and player.power_control != PLAYER_CONTROL_NONE:
await self.cmd_power(player.player_id, True)
+ if media.source_id:
+ player.set_active_mass_source(media.source_id)
await player.play_media(media)
@api_command("players/cmd/select_source")
# just try to stop (regardless of state)
await self.cmd_stop(player_id)
await asyncio.sleep(0.5) # small delay to allow stop to process
- player.state.active_source = None
- player.state.current_media = None
# check if source is a pluginsource
# in that case the source id is the instance_id of the plugin provider
if plugin_prov := self.mass.get_provider(source):
+ player.set_active_mass_source(source)
await self._handle_select_plugin_source(player, cast("PluginProvider", plugin_prov))
return
# check if source is a mass queue
# this can be used to restore the queue after a source switch
if mass_queue := self.mass.player_queues.get(source):
try:
+ player.set_active_mass_source(mass_queue.queue_id)
await self.mass.player_queues.play(mass_queue.queue_id)
except QueueEmpty:
# queue is empty: we just set the active source optimistically
for plugin_source in self.get_plugin_sources():
if plugin_source.in_use_by == player.player_id:
return plugin_source
+ if player.active_source == plugin_source.id:
+ return plugin_source
return None
def _get_player_groups(
) -> None:
"""Handle playback/select of given plugin source on player."""
plugin_source = plugin_prov.get_source()
+ if plugin_source.in_use_by and (current_player := self.get(plugin_source.in_use_by)):
+ self.logger.debug(
+ "Plugin source %s is already in use by player %s, stopping playback there first.",
+ plugin_source.name,
+ current_player.display_name,
+ )
+ await self.cmd_stop(current_player.player_id)
stream_url = await self.mass.streams.get_plugin_source_url(plugin_source, player.player_id)
+ plugin_source.in_use_by = player.player_id
+ # Call on_select callback if available
+ if plugin_source.on_select:
+ await plugin_source.on_select()
await self.play_media(
player_id=player.player_id,
media=PlayerMedia(
self._attr_name = self.config.name or self.config.default_name or f"SyncGroup {player_id}"
self._attr_available = True
self._attr_powered = False # group players are always powered off by default
- self._attr_active_source = None
self._attr_device_info = DeviceInfo(model="Sync Group", manufacturer=provider.name)
self._attr_supported_features = {
PlayerFeature.POWER,
if not powered and self.playback_state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
await self.stop()
self._attr_current_media = None
- self._attr_active_source = None
# optimistically set the group state
# Reset to unfiltered static members list when powered off
# (the frontend will hide unavailable members)
self._attr_group_members = self._attr_static_group_members.copy()
- self._attr_active_source = None
# and clear the sync leader
self.sync_leader = None
self.update_state()
if sync_leader := self.sync_leader:
await sync_leader.play_media(media)
self._attr_current_media = deepcopy(media)
- self._attr_active_source = media.source_id
self.update_state()
else:
raise RuntimeError("an empty group cannot play media, consider adding members first")
"""
if sync_leader := self.sync_leader:
await sync_leader.select_source(source)
- self._attr_active_source = source
self.update_state()
async def set_members(
# need to pass player_id from the PlayerMedia object
# because this could have been a group
player_id=media.custom_data["player_id"],
- chunk_size=get_chunksize(pcm_format, 1), # ensure 1 second chunks
)
elif media.source_id and media.source_id.startswith(UGP_PREFIX):
# special case: UGP stream
output_format: AudioFormat,
player_id: str,
player_filter_params: list[str] | None = None,
- chunk_size: int | None = None,
) -> AsyncGenerator[bytes, None]:
"""Get the special plugin source stream."""
plugin_prov: PluginProvider = self.mass.get_provider(plugin_source_id)
plugin_source = plugin_prov.get_source()
- if plugin_source.in_use_by and plugin_source.in_use_by != player_id:
- # kick out existing player using this source
- plugin_source.in_use_by = player_id
- await asyncio.sleep(0.5) # give some time to the other player to stop
-
self.logger.debug(
"Start streaming PluginSource %s to %s using output format %s",
plugin_source_id,
player_id,
output_format,
)
+ # this should already be set by the player controller, but just to be sure
plugin_source.in_use_by = player_id
+
try:
async for chunk in get_ffmpeg_stream(
audio_input=(
output_format=output_format,
filter_params=player_filter_params,
extra_input_args=["-y", "-re"],
- chunk_size=chunk_size,
):
if plugin_source.in_use_by != player_id:
- self.logger.info(
+ self.logger.debug(
"Aborting streaming PluginSource %s to %s "
"- another player took over control",
plugin_source_id,
self.logger.debug(
"Finished streaming PluginSource %s to %s", plugin_source_id, player_id
)
- plugin_source.in_use_by = None
+ await asyncio.sleep(1) # prevent race conditions when selecting source
+ if plugin_source.in_use_by == player_id:
+ # release control
+ plugin_source.in_use_by = None
async def get_queue_item_stream(
self,
import asyncio
import logging
import time
+import warnings
from typing import TYPE_CHECKING
import aiofiles
) -> SmartFadesAnalysis | None:
"""Perform beat analysis using librosa."""
try:
- tempo, beats_array = librosa.beat.beat_track(
- y=audio_array,
- sr=sample_rate,
- units="time",
- )
+ # Suppress librosa UserWarnings about empty mel filters
+ # These warnings are harmless and occur with certain audio characteristics
+ with warnings.catch_warnings():
+ warnings.filterwarnings(
+ "ignore",
+ message="Empty filters detected in mel frequency basis",
+ category=UserWarning,
+ )
+ tempo, beats_array = librosa.beat.beat_track(
+ y=audio_array,
+ sr=sample_rate,
+ units="time",
+ )
# librosa returns np.float64 arrays when units="time"
if len(beats_array) < 2:
self._extra_data: dict[str, Any] = {}
self._extra_attributes: dict[str, Any] = {}
self._on_unload_callbacks: list[Callable[[], None]] = []
+ self.__active_mass_source = player_id
# The PlayerState is the (snapshotted) final state of the player
# after applying any config overrides and other transformations,
# such as the display name and player controls.
"""
Return the (id of) the active source of the player.
+ Only required if the player supports PlayerFeature.SELECT_SOURCE.
+
Set to None if the player is not currently playing a source or
the player_id if the player is currently playing a MA queue.
for plugin_source in self.mass.players.get_plugin_sources():
if plugin_source.in_use_by == self.player_id:
return plugin_source.id
- # in case player's source is None, return the player_id (to indicate MA is active source)
- return self._active_source or self.player_id
+ if self.playback_state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
+ # active source as reported by the player itself
+ # but only if playing/paused, otherwise we always prefer the MA source
+ return self._active_source
+ # return the (last) known MA source
+ return self.__active_mass_source
@cached_property
@final
sources.append(plugin_source)
return sources
+ # The id of the (last) active mass source.
+ # This is to keep track of the last active MA source for the player,
+ # so we can restore it when needed (e.g. after switching to a plugin source).
+ __active_mass_source: str = ""
+
+ def set_active_mass_source(self, value: str) -> None:
+ """
+ Set the id of the (last) active mass source.
+
+ This is to keep track of the last active MA source for the player,
+ so we can restore it when needed (e.g. after switching to a plugin source).
+ """
+ self.__active_mass_source = value
+
def __hash__(self) -> int:
"""Return a hash of the Player."""
return hash(self.player_id)
repr=False,
)
+ # Callback for when this source is selected: async def callback() -> None
+ on_select: Callable[[], Awaitable[None]] | None = field(
+ default=None,
+ compare=False,
+ metadata=field_options(serialize="omit", deserialize=pass_through),
+ repr=False,
+ )
+
def as_player_source(self) -> PlayerSource:
"""Return a basic PlayerSource representation without unpicklable callbacks."""
return PlayerSource(
if self.stream and self.stream.session:
# forward stop to the entire stream session
await self.stream.session.stop()
- self._attr_active_source = None
self._attr_current_media = None
self.update_state()
if self.synced_to:
# this should not happen, but guard anyways
raise RuntimeError("Player is synced")
-
- # set the active source for the player to the media queue
- # this accounts for syncgroups and linked players (e.g. sonos)
- self._attr_active_source = media.source_id
self._attr_current_media = media
# select audio source
async def stop(self) -> None:
"""Handle STOP command on the player."""
await self.api.stop()
- self._attr_active_source = None
self._attr_current_media = None
self._attr_playback_state = PlaybackState.IDLE
self.update_state()
if play_state == "stop":
self._set_polling_dynamic()
self._attr_playback_state = PlaybackState.IDLE
- self._attr_active_source = None
self._attr_current_media = None
self.update_state()
# Optimistically update state
self._attr_current_media = media
- self._attr_active_source = media.source_id
self._attr_elapsed_time = 0
self._attr_elapsed_time_last_updated = time.time()
self.update_state()
if removed_player:
removed_player._set_polling_dynamic()
removed_player._attr_current_media = None
- removed_player._attr_active_source = None
removed_player.update_state()
if player_ids_to_add:
self.player_id,
BuiltinPlayerEvent(type=BuiltinPlayerEventType.STOP),
)
- self._attr_active_source = None
self._attr_current_media = None
self.update_state()
url = f"builtin_player/flow/{self.player_id}.mp3"
self._attr_current_media = media
self._attr_playback_state = PlaybackState.PLAYING
- self._attr_active_source = media.source_id
self.update_state()
self.mass.signal_event(
EventType.BUILTIN_PLAYER,
_device_uri = self.device.current_track_uri or ""
self.set_current_media(uri=_device_uri, clear_all=True)
- if self.player_id in _device_uri:
- self._attr_active_source = self.player_id
- elif "spotify" in _device_uri:
+ if "spotify" in _device_uri:
self._attr_active_source = "spotify"
elif _device_uri.startswith("http"):
self._attr_active_source = "http"
else:
- # TODO: handle other possible sources here
+ # TODO: extend this list with other possible sources
self._attr_active_source = None
if self.device.media_position:
# only update elapsed_time if the device actually reports it
"""Send STOP command to given player."""
await self.fully_kiosk.stopSound()
self._attr_playback_state = PlaybackState.IDLE
- self._attr_active_source = None
self._attr_current_media = None
self.update_state()
await self.pause()
finally:
self._attr_current_media = None
- self._attr_active_source = None
self.update_state()
async def volume_set(self, volume_level: int) -> None:
# Optimistically update state
self._attr_current_media = media
- self._attr_active_source = media.source_id
self._attr_elapsed_time = 0
self._attr_elapsed_time_last_updated = time.time()
self._attr_playback_state = PlaybackState.PLAYING
await self.api.group.stop()
# Clear the playback task reference (group.stop() handles stopping the stream)
self._playback_task = None
- self._attr_active_source = None
self._attr_current_media = None
self.update_state()
self._attr_current_media = media
self._attr_elapsed_time = 0
self._attr_elapsed_time_last_updated = time.time()
- self._attr_active_source = media.source_id
# playback_state will be set by the group state change event
# Stop previous stream in case we were already playing something
# There's no real way to "Power" on the app since device wake up / app start
# is handled by The roku once it receives the Play Media request
if not powered:
- self._attr_active_source = None
if app_running:
await self.roku.remote("home")
await self.roku.remote("power")
logger = self.provider.logger.getChild(self.player_id)
logger.info("Received STOP command on player %s", self.display_name)
self._attr_playback_state = PlaybackState.IDLE
- self._attr_active_source = None
self._attr_current_media = None
self.update_state()
except Exception:
)
self._attr_powered = True
self._attr_current_media = media
- self._attr_active_source = self.player_id
self.update_state()
except Exception:
self.logger.error("Failed to Play Media on: %s", self.name)
if device_info.app is not None:
app_running = device_info.app.app_id == self.provider.config.get_value(CONF_ROKU_APP_ID)
- # Update Device State
- if not app_running:
- self._attr_active_source = None
-
self._attr_powered = app_running
# If Media's Playing update its state
# finishes the player.state should be IDLE.
self._attr_playback_state = PlaybackState.IDLE
self._attr_current_media = None
- self._attr_active_source = None
self._set_childs_state()
self.update_state()
await snap_group.set_stream(stream.identifier)
self._attr_current_media = media
- self._attr_active_source = media.source_id
# select audio source
audio_source = self.mass.streams.get_stream(media, DEFAULT_SNAPCAST_FORMAT)
return
await asyncio.to_thread(self.soco.stop)
self.mass.call_later(2, self.poll)
- self._attr_active_source = None
self.update_state()
async def play(self) -> None:
ConfigEntryType,
ContentType,
EventType,
+ PlaybackState,
ProviderFeature,
ProviderType,
StreamType,
name=self.name,
# we set passive to true because we
# dont allow this source to be selected directly
- passive=True,
+ passive=False,
# Playback control capabilities will be enabled when Spotify Web API is available
can_play_pause=False,
can_seek=False,
self._spotify_provider: SpotifyProvider | None = None
self._on_unload_callbacks: list[Callable[..., None]] = []
self._runner_error_count = 0
+ self._spotify_device_id: str | None = None
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
self._source_details.can_play_pause = has_web_api
self._source_details.can_seek = has_web_api
self._source_details.can_next_previous = has_web_api
+ self._source_details.passive = not has_web_api
# Register or unregister callbacks based on availability
if has_web_api:
self._source_details.on_next = self._on_next
self._source_details.on_previous = self._on_previous
self._source_details.on_seek = self._on_seek
+ self._source_details.on_select = self._on_select
else:
self._source_details.on_play = None
self._source_details.on_pause = None
self._source_details.on_next = None
self._source_details.on_previous = None
self._source_details.on_seek = None
+ self._source_details.on_select = None
# Trigger player update to reflect capability changes
if self._source_details.in_use_by:
self.mass.players.trigger_player_update(self._source_details.in_use_by)
+ async def _on_select(self) -> None:
+ """Handle source selection - transfer Spotify playback to this device."""
+ if not self._spotify_provider:
+ return
+ try:
+ # Transfer playback to this device when it's selected
+ await self._ensure_active_device()
+ await self._spotify_provider._put_data("me/player/play")
+ except Exception as err:
+ self.logger.debug("Failed to transfer playback on source selection: %s", err)
+
async def _on_play(self) -> None:
"""Handle play command via Spotify Web API."""
+ attached_player = self.mass.players.get(self.mass_player_id)
+ if attached_player and attached_player.playback_state == PlaybackState.IDLE:
+ # edge case: player is not paused, so we need to select this source first
+ await self.mass.players.select_source(self.mass_player_id, self.instance_id)
if not self._spotify_provider:
raise UnsupportedFeaturedException(
"Playback control requires a matching Spotify music provider"
)
try:
+ # First try to transfer playback to this device if needed
+ await self._ensure_active_device()
await self._spotify_provider._put_data("me/player/play")
except Exception as err:
self.logger.warning("Failed to send play command via Spotify Web API: %s", err)
self.logger.warning("Failed to send seek command via Spotify Web API: %s", err)
raise
+ async def _get_spotify_device_id(self) -> str | None:
+ """Get the Spotify Connect device ID for this instance.
+
+ :return: Device ID if found, None otherwise.
+ """
+ if not self._spotify_provider:
+ return None
+
+ try:
+ # Get list of available devices from Spotify Web API
+ devices_data = await self._spotify_provider._get_data("me/player/devices")
+ devices = devices_data.get("devices", [])
+
+ # Look for our device by name
+ connect_name = cast("str", self.config.get_value(CONF_PUBLISH_NAME)) or self.name
+ for device in devices:
+ if device.get("name") == connect_name and device.get("type") == "Speaker":
+ device_id: str | None = device.get("id")
+ self.logger.debug("Found Spotify Connect device ID: %s", device_id)
+ return device_id
+
+ self.logger.debug(
+ "Could not find Spotify Connect device '%s' in available devices", connect_name
+ )
+ return None
+ except Exception as err:
+ self.logger.debug("Failed to get Spotify devices: %s", err)
+ return None
+
+ async def _ensure_active_device(self) -> None:
+ """
+ Ensure this Spotify Connect device is the active player on Spotify.
+
+ Transfers playback to this device if it's not already active.
+ """
+ if not self._spotify_provider:
+ return
+
+ try:
+ # Get current playback state
+ try:
+ playback_data = await self._spotify_provider._get_data("me/player")
+ current_device = playback_data.get("device", {}) if playback_data else {}
+ current_device_id = current_device.get("id")
+ except Exception as err:
+ if getattr(err, "status", None) == 204:
+ # No active device
+ current_device_id = None
+ else:
+ raise
+
+ # Get our device ID if we don't have it cached
+ if not self._spotify_device_id:
+ self._spotify_device_id = await self._get_spotify_device_id()
+
+ # If we couldn't find our device ID, we can't transfer
+ if not self._spotify_device_id:
+ self.logger.debug("Cannot transfer playback - device ID not found")
+ return
+
+ # Check if we're already the active device
+ if current_device_id == self._spotify_device_id:
+ self.logger.debug("Already the active Spotify device")
+ return
+
+ # Transfer playback to this device
+ self.logger.info("Transferring Spotify playback to this device")
+ await self._spotify_provider._put_data(
+ "me/player",
+ data={"device_ids": [self._spotify_device_id], "play": False},
+ )
+ except Exception as err:
+ self.logger.debug("Failed to ensure active device: %s", err)
+ # Don't raise - this is a best-effort operation
+
def _on_provider_event(self, event: MassEvent) -> None:
"""Handle provider added/removed events to check for Spotify provider."""
# Re-check for matching Spotify provider when providers change
if not self._connected_spotify_username or not self._spotify_provider:
await self._check_spotify_provider_match()
+ # Make this device the active Spotify player via Web API
+ if self._spotify_provider:
+ self.mass.create_task(self._ensure_active_device())
+
# initiate playback by selecting this source on the default player
self.mass.create_task(
self.mass.players.select_source(self.mass_player_id, self.instance_id)
image_url = images[0] if (images := common_meta.get("covers")) else None
if self._source_details.metadata is None:
self._source_details.metadata = StreamMetadata(uri=uri, title=title)
- self._source_details.metadata.uri = uri
- self._source_details.metadata.title = title
- self._source_details.metadata.artist = None
- self._source_details.metadata.album = None
- self._source_details.metadata.image_url = image_url
- self._source_details.metadata.description = None
- duration_ms = common_meta.get("duration_ms", 0)
- self._source_details.metadata.duration = (
- int(duration_ms) // 1000 if duration_ms is not None else None
- )
+ self._source_details.metadata.uri = uri
+ self._source_details.metadata.title = title
+ self._source_details.metadata.artist = None
+ self._source_details.metadata.album = None
+ self._source_details.metadata.image_url = image_url
+ self._source_details.metadata.description = None
+ duration_ms = common_meta.get("duration_ms", 0)
+ self._source_details.metadata.duration = (
+ int(duration_ms) // 1000 if duration_ms is not None else None
+ )
if track_meta := json_data.get("track_metadata_fields", {}):
if artists := track_meta.get("artists"):
async with TaskManager(self.mass) as tg:
for client in self._get_sync_clients():
tg.create_task(client.stop())
- self._attr_active_source = None
self.update_state()
async def play(self) -> None:
self._attr_name = self.config.name or f"Universal Group {player_id}"
self._attr_available = True
self._attr_powered = False # group players are always powered off by default
- self._attr_active_source = player_id
self._attr_device_info = DeviceInfo(model="Universal Group", manufacturer=provider.name)
self._attr_supported_features = {*BASE_FEATURES}
self._attr_needs_poll = True
self._attr_elapsed_time = 0
self._attr_elapsed_time_last_updated = time() - 1
self._attr_playback_state = PlaybackState.PLAYING
- self._attr_active_source = media.source_id
self.update_state()
# forward to downstream play_media commands