# last_poll: when was the player last polled (used with needs_poll)
last_poll: float = 0
+ # internal use only
+ _prev_volume_level: int = 0
+
@property
def corrected_elapsed_time(self) -> float:
"""Return the corrected/realtime elapsed time."""
CONF_LANGUAGE: Final[str] = "language"
CONF_SAMPLE_RATES: Final[str] = "sample_rates"
CONF_HTTP_PROFILE: Final[str] = "http_profile"
+CONF_SYNC_LEADER: Final[str] = "sync_leader"
# config default values
DEFAULT_HOST: Final[str] = "0.0.0.0"
CONF_GROUP_MEMBERS,
CONF_HIDE_PLAYER,
CONF_PLAYERS,
+ CONF_SYNC_LEADER,
CONF_TTS_PRE_ANNOUNCE,
SYNCGROUP_PREFIX,
)
if player.powered == powered:
return # nothing to do
+ # grab info about any groups this player is active in
+ # to handle actions on the group when a (sync)group child turns on/off
+ if active_group_player_id := self._get_active_player_group(player):
+ active_group_player = self.get(active_group_player_id)
+ group_player_state = active_group_player.state
+ else:
+ active_group_player = None
+
# always stop player at power off
if (
not powered
+ and player.powered
and player.state in (PlayerState.PLAYING, PlayerState.PAUSED)
and not player.synced_to
- and player.powered
):
await self.cmd_stop(player_id)
# unsync player at power off
if not powered:
- if player.synced_to is not None:
+ if player.synced_to or player.group_childs:
await self.cmd_unsync(player_id)
- for child in self.iter_group_members(player):
- if not child.synced_to:
- continue
- await self.cmd_unsync(child.player_id)
+
if PlayerFeature.POWER in player.supported_features:
- # forward to player provider
+ # player supports power command: forward to player provider
player_provider = self.get_player_provider(player_id)
async with self._player_throttlers[player_id]:
await player_provider.cmd_power(player_id, powered)
else:
# allow the stop command to process and prevent race conditions
await asyncio.sleep(0.2)
+
# always optimistically set the power state to update the UI
# as fast as possible and prevent race conditions
player.powered = powered
- # always MA as active source on power ON
- player.active_source = player_id if powered else None
+ # reset active source
+ player.active_source = None
self.update(player_id)
- # handle actions when a (sync)group child turns on/off
- if active_group_player := self._get_active_player_group(player):
- player_prov = self.get_player_provider(active_group_player)
- player_prov.on_child_power(active_group_player, player.player_id, powered)
+
# handle 'auto play on power on' feature
- elif (
- powered
+ if (
+ not active_group_player
+ and powered
and self.mass.config.get_raw_player_config_value(player_id, CONF_AUTO_PLAY, False)
and player.active_source in (None, player_id)
):
await self.mass.player_queues.resume(player_id)
+ # handle group player actions
+ if not (active_group_player and active_group_player.powered):
+ return
+
+ # run actions suitable for every type of group player
+ powered_childs = list(self.mass.players.iter_group_members(active_group_player, True))
+ if not powered and player in powered_childs:
+ powered_childs.remove(player.player_id)
+ elif powered and player.player_id not in powered_childs:
+ powered_childs.append(player.player_id)
+ # if the last player of a group turned off, turn off the group
+ if len(powered_childs) == 0:
+ self.logger.debug(
+ "Group %s has no more powered members, turning off group player",
+ active_group_player.display_name,
+ )
+ self.mass.create_task(self.mass.players.cmd_power(active_group_player.player_id, False))
+ return
+ # forward to either syncgroup logic or group player logic
+ if active_group_player.type == PlayerType.SYNC_GROUP:
+ self._on_syncgroup_child_power(active_group_player, player, powered, group_player_state)
+ elif active_group_player.type == PlayerType.GROUP:
+ player_prov = self.mass.get_provider(active_group_player.provider)
+ player_prov.on_group_child_power(
+ active_group_player, player, powered, group_player_state
+ )
+
@api_command("players/cmd/volume_set")
@handle_player_command
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
await self.cmd_power(player_id, power)
return
+ if not (group_player.type == PlayerType.SYNC_GROUP or group_player.group_childs):
+ # this is not a (temporary) sync group - nothing to do
+ raise UnsupportedFeaturedException("Player is not a sync group")
+
# make sure to update the group power state
group_player.powered = power
if not power and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
await self.cmd_stop(player_id)
+ # handle syncgroup - this will also work for temporary syncgroups
+ # where players are manually synced against a group leader
any_member_powered = False
async with TaskManager(self.mass) as tg:
for member in self.iter_group_members(group_player, only_powered=True):
player = self.get(player_id, True)
assert player
if PlayerFeature.VOLUME_MUTE not in player.supported_features:
- msg = f"Player {player.display_name} does not support muting"
- raise UnsupportedFeaturedException(msg)
+ self.logger.info(
+ "Player %s does not support muting, using volume instead", player.display_name
+ )
+ if muted:
+ player._prev_volume_level = player.volume_level
+ player.volume_muted = True
+ await self.cmd_volume_set(player_id, 0)
+ else:
+ player.volume_muted = False
+ await self.cmd_volume_set(player_id, player._prev_volume_level)
+ return
player_provider = self.get_player_provider(player_id)
async with self._player_throttlers[player_id]:
await player_provider.cmd_volume_mute(player_id, muted)
- player_id: player_id of the player to handle the command.
"""
+ if (player := self.get(player_id)) and player.group_childs:
+ # this player is a syncgroup leader, unsync all children
+ await self.cmd_unsync_many(player.group_childs)
+ return
await self.cmd_unsync_many([player_id])
@api_command("players/cmd/sync_many")
async def cmd_unsync_many(self, player_ids: list[str]) -> None:
"""Handle UNSYNC command for all the given players."""
# filter all player ids on compatibility and availability
- final_player_ids: UniqueList[str] = UniqueList()
- for player_id in player_ids:
+ for player_id in list(player_ids):
if not (child_player := self.get(player_id)):
self.logger.warning("Player %s is not available", player_id)
continue
"Player %s does not support (un)sync commands", child_player.name
)
continue
- final_player_ids.append(player_id)
- # reset active source player if is unsynced
+ if not child_player.synced_to:
+ continue
+ # reset active source player if it is unsynced
child_player.active_source = None
-
- if not final_player_ids:
- return
-
- # forward command to the player provider after all (base) sanity checks
- player_provider = self.get_player_provider(final_player_ids[0])
- await player_provider.cmd_unsync_many(final_player_ids)
+ # forward command to the player provider
+ if player_provider := self.get_player_provider(player_id):
+ await player_provider.cmd_unsync(player_id)
def set(self, player: Player) -> None:
"""Set/Update player details on the controller."""
):
if child_player.group_childs:
return child_player
+ pref_sync_leader = self.mass.config.get_raw_player_config_value(
+ group_player.player_id, CONF_SYNC_LEADER, "auto"
+ )
+ if pref_sync_leader != "auto" and (player := self.get(pref_sync_leader)):
+ return player
# select new sync leader: return the first playing player
for child_player in self.iter_group_members(
group_player, only_powered=True, only_playing=True
continue
await self.cmd_sync(member.player_id, sync_leader.player_id)
+ def _on_syncgroup_child_power(
+ self, group_player: Player, child_player: Player, new_power: bool, group_state: PlayerState
+ ) -> None:
+ """
+ Call when a power command was executed on one of the child players of a SyncGroup.
+
+ This is used to handle special actions such as (re)syncing.
+ The group state is sent with the state BEFORE the power command was executed.
+ """
+ group_playing = group_state == PlayerState.PLAYING
+ sync_leader = self.mass.players.get_sync_leader(group_player)
+ is_sync_leader = child_player.player_id == sync_leader.player_id
+ if group_playing and not new_power and is_sync_leader:
+ # the current sync leader player turned OFF while the group player
+ # should still be playing - we need to select a new sync leader and resume
+ self.logger.warning(
+ "Syncleader %s turned off while syncgroup is playing, "
+ "a forced resync for syngroup %s will be attempted...",
+ child_player.display_name,
+ group_player.display_name,
+ )
+
+ async def full_resync() -> None:
+ await self.mass.players.sync_syncgroup(group_player.player_id)
+ await self.mass.player_queues.resume(group_player.player_id)
+
+ self.mass.call_later(2, full_resync, task_id=f"forced_resync_{group_player.player_id}")
+ return
+ elif new_power:
+ # if a child player turned ON while the group is already active, we need to resync
+ if sync_leader.player_id != child_player.player_id:
+ self.mass.create_task(
+ self.cmd_sync(child_player.player_id, sync_leader.player_id),
+ )
+
async def _register_syncgroups(self) -> None:
"""Register all (virtual/fake) syncgroup players."""
player_configs = await self.mass.config.get_player_configs()
+++ /dev/null
-"""Implementation of a simple multi-client stream task/job."""
-
-import asyncio
-import logging
-from collections.abc import AsyncGenerator
-from contextlib import suppress
-
-from music_assistant.common.helpers.util import empty_queue
-from music_assistant.common.models.media_items import AudioFormat
-from music_assistant.server.helpers.audio import get_ffmpeg_stream
-
-LOGGER = logging.getLogger(__name__)
-
-
-class MultiClientStream:
- """Implementation of a simple multi-client (audio) stream task/job."""
-
- def __init__(
- self,
- audio_source: AsyncGenerator[bytes, None],
- audio_format: AudioFormat,
- expected_clients: int = 0,
- ) -> None:
- """Initialize MultiClientStream."""
- self.audio_source = audio_source
- self.audio_format = audio_format
- self.subscribers: list[asyncio.Queue] = []
- self.expected_clients = expected_clients
- self.task = asyncio.create_task(self._runner())
-
- @property
- def done(self) -> bool:
- """Return if this stream is already done."""
- return self.task.done()
-
- async def stop(self) -> None:
- """Stop/cancel the stream."""
- if self.done:
- return
- self.task.cancel()
- with suppress(asyncio.CancelledError):
- await self.task
- for sub_queue in list(self.subscribers):
- empty_queue(sub_queue)
-
- async def get_stream(
- self, output_format: AudioFormat, filter_params: list[str] | None = None
- ) -> AsyncGenerator[bytes, None]:
- """Get (client specific encoded) ffmpeg stream."""
- async for chunk in get_ffmpeg_stream(
- audio_input=self.subscribe_raw(),
- input_format=self.audio_format,
- output_format=output_format,
- filter_params=filter_params,
- ):
- yield chunk
-
- async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
- """Subscribe to the raw/unaltered audio stream."""
- try:
- queue = asyncio.Queue(2)
- self.subscribers.append(queue)
- while True:
- chunk = await queue.get()
- if chunk == b"":
- break
- yield chunk
- finally:
- with suppress(ValueError):
- self.subscribers.remove(queue)
-
- async def _runner(self) -> None:
- """Run the stream for the given audio source."""
- expected_clients = self.expected_clients or 1
- # wait for first/all subscriber
- count = 0
- while count < 50:
- await asyncio.sleep(0.5)
- count += 1
- if len(self.subscribers) >= expected_clients:
- break
- if count == 50:
- return
- LOGGER.debug(
- "Starting multi-client stream with %s/%s clients",
- len(self.subscribers),
- self.expected_clients,
- )
- async for chunk in self.audio_source:
- if len(self.subscribers) == 0:
- return
- await asyncio.gather(
- *[sub.put(chunk) for sub in self.subscribers], return_exceptions=True
- )
- # EOF: send empty chunk
- await asyncio.gather(*[sub.put(b"") for sub in self.subscribers], return_exceptions=True)
ConfigValueOption,
PlayerConfig,
)
-from music_assistant.common.models.enums import ConfigEntryType, PlayerState, ProviderFeature
+from music_assistant.common.models.enums import ConfigEntryType, PlayerState
from music_assistant.common.models.player import Player, PlayerMedia
-from music_assistant.constants import CONF_GROUP_MEMBERS, SYNCGROUP_PREFIX
+from music_assistant.constants import CONF_GROUP_MEMBERS, CONF_SYNC_LEADER, SYNCGROUP_PREFIX
from .provider import Provider
multi_value=True,
required=True,
),
+ ConfigEntry(
+ key=CONF_SYNC_LEADER,
+ type=ConfigEntryType.STRING,
+ label="Preferred sync leader",
+ default_value="auto",
+ options=(
+ *tuple(
+ ConfigValueOption(x.display_name, x.player_id)
+ for x in self.mass.players.all(True, False)
+ if x.player_id
+ in self.mass.config.get_raw_player_config_value(
+ player_id, CONF_GROUP_MEMBERS, []
+ )
+ ),
+ ConfigValueOption("Select automatically", "auto"),
+ ),
+ description="By default Music Assistant will automatically assign a "
+ "(random) player as sync leader, meaning the other players in the sync group "
+ "will be synced to that player. If you want to force a specific player to be "
+ "the sync leader, select it here.",
+ required=True,
+ ),
CONF_ENTRY_PLAYER_ICON_GROUP,
)
if not player_id.startswith(SYNCGROUP_PREFIX):
# default implementation, simply call the cmd_sync for all child players
await self.cmd_sync(child_id, target_player)
- async def cmd_unsync_many(self, player_ids: str) -> None:
- """Handle UNSYNC command for all the given players.
-
- Remove the given player from any syncgroups it currently is synced to.
-
- - player_id: player_id of the player to handle the command.
- """
- for player_id in player_ids:
- # default implementation, simply call the cmd_sync for all player_ids
- await self.cmd_unsync(player_id)
-
async def poll_player(self, player_id: str) -> None:
"""Poll player for state updates.
if 'needs_poll' is set to True in the player object.
"""
- def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None:
+ def on_group_child_power(
+ self, group_player: Player, child_player: Player, new_power: bool, group_state: PlayerState
+ ) -> None:
"""
- Call when a power command was executed on one of the child players of a Sync/Player group.
+ Call when a power command was executed on one of the child players of a PlayerGroup.
This is used to handle special actions such as (re)syncing.
+ The group state is sent with the state BEFORE the power command was executed.
"""
- group_player = self.mass.players.get(player_id)
- child_player = self.mass.players.get(child_player_id)
-
- if not group_player.powered:
- # guard, this should be caught in the player controller but just in case...
- return
-
- powered_childs = list(self.mass.players.iter_group_members(group_player, True))
- if not new_power and child_player in powered_childs:
- powered_childs.remove(child_player)
- if new_power and child_player not in powered_childs:
- powered_childs.append(child_player)
-
- # if the last player of a group turned off, turn off the group
- if len(powered_childs) == 0:
- self.logger.debug(
- "Group %s has no more powered members, turning off group player",
- group_player.display_name,
- )
- self.mass.create_task(self.mass.players.cmd_power(player_id, False))
- return
-
- # the below actions are only suitable for syncgroups
- if ProviderFeature.SYNC_PLAYERS not in self.supported_features:
- return
-
- group_playing = group_player.state == PlayerState.PLAYING
- is_sync_leader = (
- len(child_player.group_childs) > 0
- and child_player.active_source == group_player.player_id
- )
- if group_playing and not new_power and is_sync_leader:
- # the current sync leader player turned OFF while the group player
- # should still be playing - we need to select a new sync leader and resume
- self.logger.warning(
- "Syncleader %s turned off while syncgroup is playing, "
- "a forced resume for syngroup %s will be attempted in 5 seconds...",
- child_player.display_name,
- group_player.display_name,
- )
-
- async def full_resync() -> None:
- await self.mass.players.sync_syncgroup(group_player.player_id)
- await self.mass.player_queues.resume(group_player.player_id)
-
- self.mass.call_later(5, full_resync, task_id=f"forced_resync_{player_id}")
- return
- elif new_power:
- # if a child player turned ON while the group is already active, we need to resync
- sync_leader = self.mass.players.get_sync_leader(group_player)
- if sync_leader.player_id != child_player_id:
- self.mass.create_task(
- self.cmd_sync(child_player_id, sync_leader.player_id),
- )
# DO NOT OVERRIDE BELOW
# special case: UGP stream
ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
ugp_stream = ugp_provider.streams[media.queue_id]
- input_format = ugp_stream.audio_format
- audio_source = ugp_stream.subscribe_raw()
+ input_format = ugp_stream.output_format
+ audio_source = ugp_stream.subscribe()
elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id:
# radio stream - consume media stream directly
input_format = AIRPLAY_PCM_FORMAT
"format_id": format_id,
}
]
- await self._post_data("track/reportStreamingStart", data=events)
+ async with self.throttler.bypass():
+ await self._post_data("track/reportStreamingStart", data=events)
async def on_streamed(self, streamdetails: StreamDetails, seconds_streamed: int) -> None:
"""Handle callback when an item completed streaming."""
user_id = self._user_auth_info["user"]["id"]
- await self._get_data(
- "/track/reportStreamingEnd",
- user_id=user_id,
- track_id=str(streamdetails.item_id),
- duration=try_parse_int(seconds_streamed),
- )
+ async with self.throttler.bypass():
+ await self._get_data(
+ "/track/reportStreamingEnd",
+ user_id=user_id,
+ track_id=str(streamdetails.item_id),
+ duration=try_parse_int(seconds_streamed),
+ )
def _parse_artist(self, artist_obj: dict):
"""Parse qobuz artist object to generic layout."""
VERBOSE_LOG_LEVEL,
)
from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params
-from music_assistant.server.helpers.multi_client_stream import MultiClientStream
from music_assistant.server.helpers.util import TaskManager
from music_assistant.server.models.player_provider import PlayerProvider
from music_assistant.server.providers.ugp import UniversalGroupProvider
+from .multi_client_stream import MultiClientStream
+
if TYPE_CHECKING:
from aioslimproto.models import SlimEvent
--- /dev/null
+"""Implementation of a simple multi-client stream task/job."""
+
+import asyncio
+import logging
+from collections.abc import AsyncGenerator
+from contextlib import suppress
+
+from music_assistant.common.helpers.util import empty_queue
+from music_assistant.common.models.media_items import AudioFormat
+from music_assistant.server.helpers.audio import get_ffmpeg_stream
+
+LOGGER = logging.getLogger(__name__)
+
+
+class MultiClientStream:
+ """Implementation of a simple multi-client (audio) stream task/job."""
+
+ def __init__(
+ self,
+ audio_source: AsyncGenerator[bytes, None],
+ audio_format: AudioFormat,
+ expected_clients: int = 0,
+ ) -> None:
+ """Initialize MultiClientStream."""
+ self.audio_source = audio_source
+ self.audio_format = audio_format
+ self.subscribers: list[asyncio.Queue] = []
+ self.expected_clients = expected_clients
+ self.task = asyncio.create_task(self._runner())
+
+ @property
+ def done(self) -> bool:
+ """Return if this stream is already done."""
+ return self.task.done()
+
+ async def stop(self) -> None:
+ """Stop/cancel the stream."""
+ if self.done:
+ return
+ self.task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self.task
+ for sub_queue in list(self.subscribers):
+ empty_queue(sub_queue)
+
+ async def get_stream(
+ self,
+ output_format: AudioFormat,
+ filter_params: list[str] | None = None,
+ ) -> AsyncGenerator[bytes, None]:
+ """Get (client specific encoded) ffmpeg stream."""
+ async for chunk in get_ffmpeg_stream(
+ audio_input=self.subscribe_raw(),
+ input_format=self.audio_format,
+ output_format=output_format,
+ filter_params=filter_params,
+ ):
+ yield chunk
+
+ async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
+ """Subscribe to the raw/unaltered audio stream."""
+ try:
+ queue = asyncio.Queue(2)
+ self.subscribers.append(queue)
+ while True:
+ chunk = await queue.get()
+ if chunk == b"":
+ break
+ yield chunk
+ finally:
+ with suppress(ValueError):
+ self.subscribers.remove(queue)
+
+ async def _runner(self) -> None:
+ """Run the stream for the given audio source."""
+ expected_clients = self.expected_clients or 1
+ # wait for first/all subscriber
+ count = 0
+ while count < 50:
+ await asyncio.sleep(0.1)
+ count += 1
+ if len(self.subscribers) >= expected_clients:
+ break
+ LOGGER.debug(
+ "Starting multi-client stream with %s/%s clients",
+ len(self.subscribers),
+ self.expected_clients,
+ )
+ async for chunk in self.audio_source:
+ fail_count = 0
+ while len(self.subscribers) == 0:
+ await asyncio.sleep(0.1)
+ fail_count += 1
+ if fail_count > 50:
+ LOGGER.warning("No clients connected, stopping stream")
+ return
+ await asyncio.gather(
+ *[sub.put(chunk) for sub in self.subscribers], return_exceptions=True
+ )
+ # EOF: send empty chunk
+ await asyncio.gather(*[sub.put(b"") for sub in self.subscribers], return_exceptions=True)
# special case: UGP stream
ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
ugp_stream = ugp_provider.streams[media.queue_id]
- input_format = ugp_stream.audio_format
- audio_source = ugp_stream.subscribe_raw()
+ input_format = ugp_stream.output_format
+ audio_source = ugp_stream.subscribe()
elif media.media_type == MediaType.RADIO and media.queue_id and media.queue_item_id:
# radio stream - consume media stream directly
input_format = DEFAULT_SNAPCAST_FORMAT
from __future__ import annotations
+import asyncio
+from collections.abc import AsyncGenerator, Awaitable, Callable
from time import time
from typing import TYPE_CHECKING, Final, cast
from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.constants import CONF_GROUP_MEMBERS, CONF_HTTP_PROFILE, SYNCGROUP_PREFIX
from music_assistant.server.controllers.streams import DEFAULT_STREAM_HEADERS
-from music_assistant.server.helpers.audio import (
- get_chunksize,
- get_ffmpeg_stream,
- get_player_filter_params,
-)
-from music_assistant.server.helpers.multi_client_stream import MultiClientStream
+from music_assistant.server.helpers.audio import get_ffmpeg_stream
from music_assistant.server.helpers.util import TaskManager
from music_assistant.server.models.player_provider import PlayerProvider
# ruff: noqa: ARG002
UGP_FORMAT = AudioFormat(
- content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24
+ content_type=ContentType.from_bit_depth(16), sample_rate=44100, bit_depth=16
)
UGP_PREFIX = "ugp_"
CONF_ACTION_CREATE_PLAYER = "create_player"
CONF_ACTION_CREATE_PLAYER_SAVE = "create_player_save"
-CONF_ENTRY_SAMPLE_RATES_UGP = create_sample_rates_config_entry(48000, 24, 48000, 24, True)
+CONF_ENTRY_SAMPLE_RATES_UGP = create_sample_rates_config_entry(44100, 16, 44100, 16, True)
CONF_GROUP_PLAYERS: Final[str] = "group_players"
CONF_NEW_GROUP_NAME: Final[str] = "name"
CONF_NEW_GROUP_MEMBERS: Final[list[str]] = "members"
"""Initialize MusicProvider."""
super().__init__(mass, manifest, config)
self._registered_routes: set[str] = set()
- self.streams: dict[str, MultiClientStream] = {}
+ self.streams: dict[str, UGPStream] = {}
async def loaded_in_mass(self) -> None:
"""Call after the provider has been loaded."""
group_player = self.mass.players.get(player_id)
# stop any existing stream first
if (existing := self.streams.pop(player_id, None)) and not existing.done:
- existing.task.cancel()
+ await existing.stop()
# select audio source
if media.media_type == MediaType.ANNOUNCEMENT:
)
# start the stream task
- self.streams[player_id] = MultiClientStream(
- audio_source=audio_source, audio_format=UGP_FORMAT
- )
- base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.flac"
+ self.streams[player_id] = UGPStream(audio_source=audio_source, audio_format=UGP_FORMAT)
+ base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.aac"
# forward to downstream play_media commands
async with TaskManager(self.mass) as tg:
group_childs=set(members),
)
self.mass.players.register_or_update(player)
- # register dynamic routes for the ugp stream (both flac and mp3)
- for fmt in ("mp3", "flac"):
- route_path = f"/ugp/{group_player_id}.{fmt}"
- self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream)
- self._registered_routes.add(route_path)
+ # register dynamic route for the ugp stream
+ route_path = f"/ugp/{group_player_id}.aac"
+ self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream)
+ self._registered_routes.add(route_path)
return player
- def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None:
+ def on_group_child_power(
+ self, group_player: Player, child_player: Player, new_power: bool, group_state: PlayerState
+ ) -> None:
"""
Call when a power command was executed on one of the child player of a PlayerGroup.
- This is used to handle special actions such as (re)syncing.
+ The group state is sent with the state BEFORE the power command was executed.
"""
- group_player = self.mass.players.get(player_id)
- child_player = self.mass.players.get(child_player_id)
-
if not group_player.powered:
# guard, this should be caught in the player controller but just in case...
return None
powered_childs = [
x
for x in self.mass.players.iter_group_members(group_player, True)
- if not (not new_power and x.player_id == child_player_id)
+ if not (not new_power and x.player_id == child_player.player_id)
]
if new_power and child_player not in powered_childs:
powered_childs.append(child_player)
"Group %s has no more powered members, turning off group player",
group_player.display_name,
)
- self.mass.create_task(self.cmd_power(player_id, False))
+ self.mass.create_task(self.cmd_power(group_player.player_id, False))
return False
# if a child player turned ON while the group player is already playing
# we just direct it to the existing stream (we dont care about the audio being in sync)
- if new_power and group_player.state == PlayerState.PLAYING:
- base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.flac"
+ if new_power and group_state == PlayerState.PLAYING:
+ base_url = f"{self.mass.streams.base_url}/ugp/{group_player.player_id}.aac"
self.mass.create_task(
self.mass.players.play_media(
child_player.player_id,
async def _serve_ugp_stream(self, request: web.Request) -> web.Response:
"""Serve the UGP (multi-client) flow stream audio to a player."""
ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1]
- fmt = request.path.rsplit(".")[-1]
child_player_id = request.query.get("player_id") # optional!
if not (ugp_player := self.mass.players.get(ugp_player_id)):
if not (stream := self.streams.get(ugp_player_id, None)) or stream.done:
raise web.HTTPNotFound(body=f"There is no active UGP stream for {ugp_player_id}!")
- output_format = AudioFormat(
- content_type=ContentType.try_parse(fmt),
- sample_rate=stream.audio_format.sample_rate,
- bit_depth=stream.audio_format.bit_depth,
- )
-
http_profile: str = await self.mass.config.get_player_config_value(
child_player_id, CONF_HTTP_PROFILE
)
headers = {
**DEFAULT_STREAM_HEADERS,
- "Content-Type": f"audio/{fmt}",
+ "Content-Type": "audio/aac",
"Accept-Ranges": "none",
"Cache-Control": "no-cache",
"Connection": "close",
resp = web.StreamResponse(status=200, reason="OK", headers=headers)
if http_profile == "forced_content_length":
- resp.content_length = get_chunksize(output_format, 24 * 3600)
+ resp.content_length = 4294967296
elif http_profile == "chunked":
resp.enable_chunked_encoding()
ugp_player.display_name,
child_player_id or request.remote,
)
-
- async for chunk in stream.get_stream(
- output_format=output_format,
- filter_params=get_player_filter_params(self.mass, child_player_id)
- if child_player_id
- else None,
- ):
+ async for chunk in stream.subscribe():
try:
await resp.write(chunk)
- except (BrokenPipeError, ConnectionResetError):
- # race condition
+ except (ConnectionError, ConnectionResetError):
break
return resp
and x not in syncgroup_childs
and not x.startswith(UGP_PREFIX)
]
+
+
+class UGPStream:
+ """
+ Implementation of a Stream for the Universal Group Player.
+
+ Basiclaly this is like a fake radio radio stream (AAC) format with multiple subscribers.
+ The AAC format is chosen because it is widely supported and has a good balance between
+ quality and bandwidth and also allows for mid-stream joining of (extra) players.
+ """
+
+ def __init__(
+ self,
+ audio_source: AsyncGenerator[bytes, None],
+ audio_format: AudioFormat,
+ ) -> None:
+ """Initialize UGP Stream."""
+ self.audio_source = audio_source
+ self.input_format = audio_format
+ self.output_format = AudioFormat(content_type=ContentType.AAC)
+ self.subscribers: list[Callable[[bytes], Awaitable]] = []
+ self._task: asyncio.Task | None = None
+ self._done: asyncio.Event = asyncio.Event()
+
+ @property
+ def done(self) -> bool:
+ """Return if this stream is already done."""
+ return self._done.is_set() and self._task and self._task.done()
+
+ async def stop(self) -> None:
+ """Stop/cancel the stream."""
+ if self._done.is_set():
+ return
+ if self._task and not self._task.done():
+ self._task.cancel()
+ self._done.set()
+
+ async def subscribe(self) -> AsyncGenerator[bytes, None]:
+ """Subscribe to the raw/unaltered audio stream."""
+ # start the runner as soon as the (first) client connects
+ if not self._task:
+ self._task = asyncio.create_task(self._runner())
+ queue = asyncio.Queue(1)
+ try:
+ self.subscribers.append(queue.put)
+ while True:
+ chunk = await queue.get()
+ if not chunk:
+ break
+ yield chunk
+ finally:
+ self.subscribers.remove(queue.put)
+
+ async def _runner(self) -> None:
+ """Run the stream for the given audio source."""
+ await asyncio.sleep(0.25) # small delay to allow subscribers to connect
+ async for chunk in get_ffmpeg_stream(
+ audio_input=self.audio_source,
+ input_format=self.input_format,
+ output_format=self.output_format,
+ # TODO: enable readrate limiting + initial burst once we have a newer ffmpeg version
+ # extra_input_args=["-readrate", "1.15"],
+ ):
+ await asyncio.gather(*[sub(chunk) for sub in self.subscribers], return_exceptions=True)
+ # empty chunk when done
+ await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True)
+ self._done.set()