if len(changed_values) == 0 and not force_update:
return
+ # handle DSP reload when player is grouped or ungrouped
+ prev_is_grouped = bool(prev_state.get("synced_to")) or bool(prev_state.get("group_childs"))
+ new_is_grouped = bool(new_state.get("synced_to")) or bool(new_state.get("group_childs"))
+
+ if prev_is_grouped != new_is_grouped:
+ dsp_config = self.mass.config.get_player_dsp_config(player_id)
+ supports_multi_device_dsp = PlayerFeature.MULTI_DEVICE_DSP in player.supported_features
+ if dsp_config.enabled and not supports_multi_device_dsp:
+ # We now know that that the player was grouped or ungrouped,
+ # the player has a custom DSP enabled, but the player provider does
+ # not support multi-device DSP.
+ # So we need to reload the DSP configuration.
+ self.mass.create_task(self.mass.players.on_player_dsp_change(player_id))
+
if changed_values.keys() != {"elapsed_time"} or force_update:
# ignore elapsed_time only changes
self.mass.signal_event(EventType.PLAYER_UPDATED, object_id=player_id, data=player)
from music_assistant_models.enums import (
ContentType,
MediaType,
+ PlayerFeature,
StreamType,
VolumeNormalizationMode,
)
dsp = mass.config.get_player_dsp_config(player_id)
+ if player := mass.players.get(player_id):
+ is_grouped = bool(player.synced_to) or bool(player.group_childs)
+ if is_grouped and PlayerFeature.MULTI_DEVICE_DSP not in player.supported_features:
+ # We can not correctly apply DSP to a grouped player without multi-device DSP support,
+ # so we disable it.
+ dsp.enabled = False
+
if dsp.enabled:
# Apply input gain
if dsp.input_gain != 0:
# special case: UGP stream
ugp_provider = cast(PlayerGroupProvider, self.mass.get_provider("player_group"))
ugp_stream = ugp_provider.ugp_streams[media.queue_id]
- input_format = ugp_stream.output_format
- audio_source = ugp_stream.subscribe()
+ input_format = ugp_stream.base_pcm_format
+ audio_source = ugp_stream.subscribe_raw()
elif media.queue_id and media.queue_item_id:
# regular queue (flow) stream request
input_format = AIRPLAY_FLOW_PCM_FORMAT
supported_features={
PlayerFeature.PAUSE,
PlayerFeature.SET_MEMBERS,
+ PlayerFeature.MULTI_DEVICE_DSP,
PlayerFeature.VOLUME_SET,
},
volume_level=volume,
create_sample_rates_config_entry,
)
from music_assistant.controllers.streams import DEFAULT_STREAM_HEADERS
+from music_assistant.helpers.audio import get_player_filter_params
from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
from music_assistant.helpers.util import TaskManager
from music_assistant.models.player_provider import PlayerProvider
)
# start the stream task
- self.ugp_streams[player_id] = UGPStream(audio_source=audio_source, audio_format=UGP_FORMAT)
+ self.ugp_streams[player_id] = UGPStream(
+ audio_source=audio_source, audio_format=UGP_FORMAT, base_pcm_format=UGP_FORMAT
+ )
base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.mp3"
# set the state optimistically
self, group_player_id: str, group_type: str, name: str, members: Iterable[str]
) -> Player:
"""Register a syncgroup player."""
- player_features = {PlayerFeature.POWER, PlayerFeature.VOLUME_SET}
+ player_features = {
+ PlayerFeature.POWER,
+ PlayerFeature.VOLUME_SET,
+ PlayerFeature.MULTI_DEVICE_DSP,
+ }
if not (self.mass.players.get(x) for x in members):
raise PlayerUnavailableError("One or more members are not available!")
ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1]
child_player_id = request.query.get("player_id") # optional!
+ # Right now we default to MP3 output format, since it's the most compatible
+ # TODO: use the player's preferred output format
+ output_format = AudioFormat(content_type=ContentType.MP3)
+
if not (ugp_player := self.mass.players.get(ugp_player_id)):
raise web.HTTPNotFound(reason=f"Unknown UGP player: {ugp_player_id}")
ugp_player.display_name,
child_player_id or request.remote,
)
- async for chunk in stream.subscribe():
+
+ # Generate filter params for the player specific DSP settings
+ filter_params = None
+ if child_player_id:
+ filter_params = get_player_filter_params(
+ self.mass, child_player_id, stream.input_format
+ )
+
+ async for chunk in stream.get_stream(
+ output_format,
+ filter_params=filter_params,
+ ):
try:
await resp.write(chunk)
except (ConnectionError, ConnectionResetError):
import asyncio
from collections.abc import AsyncGenerator, Awaitable, Callable
+from typing import TYPE_CHECKING
-from music_assistant_models.enums import ContentType
-from music_assistant_models.media_items import AudioFormat
+if TYPE_CHECKING:
+ from music_assistant_models.media_items import AudioFormat
from music_assistant.helpers.audio import get_ffmpeg_stream
from music_assistant.helpers.util import empty_queue
self,
audio_source: AsyncGenerator[bytes, None],
audio_format: AudioFormat,
+ base_pcm_format: AudioFormat,
) -> None:
"""Initialize UGP Stream."""
self.audio_source = audio_source
self.input_format = audio_format
- self.output_format = AudioFormat(content_type=ContentType.MP3)
+ self.base_pcm_format = base_pcm_format
self.subscribers: list[Callable[[bytes], Awaitable]] = []
self._task: asyncio.Task | None = None
self._done: asyncio.Event = asyncio.Event()
self._task.cancel()
self._done.set()
- async def subscribe(self) -> AsyncGenerator[bytes, None]:
- """Subscribe to the raw/unaltered audio stream."""
+ async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
+ """
+ Subscribe to the raw/unaltered audio stream.
+
+ The returned stream has the format `self.base_pcm_format`.
+ """
# start the runner as soon as the (first) client connects
if not self._task:
self._task = asyncio.create_task(self._runner())
empty_queue(queue)
del queue
+ async def get_stream(
+ self, output_format: AudioFormat, filter_params: list[str] | None = None
+ ) -> AsyncGenerator[bytes, None]:
+ """Subscribe to the client specific audio stream."""
+ # start the runner as soon as the (first) client connects
+ async for chunk in get_ffmpeg_stream(
+ audio_input=self.subscribe_raw(),
+ input_format=self.base_pcm_format,
+ output_format=output_format,
+ filter_params=filter_params,
+ ):
+ yield chunk
+
async def _runner(self) -> None:
"""Run the stream for the given audio source."""
await asyncio.sleep(0.25) # small delay to allow subscribers to connect
async for chunk in get_ffmpeg_stream(
audio_input=self.audio_source,
input_format=self.input_format,
- output_format=self.output_format,
+ output_format=self.base_pcm_format,
# we don't allow the player to buffer too much ahead so we use readrate limiting
extra_input_args=["-readrate", "1.1", "-readrate_initial_burst", "10"],
):
# special case: UGP stream
ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
ugp_stream = ugp_provider.ugp_streams[media.queue_id]
- audio_source = ugp_stream.subscribe()
+ # Filter is later applied in MultiClientStream
+ audio_source = ugp_stream.get_stream(master_audio_format, filter_params=None)
elif media.queue_id and media.queue_item_id:
# regular queue stream request
audio_source = self.mass.streams.get_flow_stream(
supported_features={
PlayerFeature.POWER,
PlayerFeature.SET_MEMBERS,
+ PlayerFeature.MULTI_DEVICE_DSP,
PlayerFeature.VOLUME_SET,
PlayerFeature.PAUSE,
PlayerFeature.VOLUME_MUTE,
# special case: UGP stream
ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
ugp_stream = ugp_provider.ugp_streams[media.queue_id]
- input_format = ugp_stream.output_format
- audio_source = ugp_stream.subscribe()
+ input_format = ugp_stream.base_pcm_format
+ audio_source = ugp_stream.subscribe_raw()
elif media.queue_id and media.queue_item_id:
# regular queue (flow) stream request
input_format = DEFAULT_SNAPCAST_PCM_FORMAT