From 2a5f2009b12c95e9de16e5fc4dc0eaf9443d440b Mon Sep 17 00:00:00 2001 From: Maxim Raznatovski Date: Fri, 10 Jan 2025 00:08:32 +0100 Subject: [PATCH] Feat: Add multi device DSP support (#1839) * Fix: Disable DSP in unsupported multiroom scenarios * Fix: Reload DSP in case either entering or exiting grouped playback * Feat: Mark Airplay and Slimproto as multi device DSP compatible * Feat: Add client specific stream support to universal groups * Feat: Apply DSP to `ugp_*` streams * Feat: Mark all universal groups as MULTI_DEVICE_DSP compatible * Refactor: decouple input and shared base format in UGPStream --- music_assistant/controllers/players.py | 14 +++++++++ music_assistant/helpers/audio.py | 8 +++++ music_assistant/providers/airplay/provider.py | 5 +-- .../providers/player_group/__init__.py | 28 +++++++++++++++-- .../providers/player_group/ugp_stream.py | 31 +++++++++++++++---- .../providers/slimproto/__init__.py | 4 ++- .../providers/snapcast/__init__.py | 4 +-- 7 files changed, 80 insertions(+), 14 deletions(-) diff --git a/music_assistant/controllers/players.py b/music_assistant/controllers/players.py index 72dc2276..76a6610f 100644 --- a/music_assistant/controllers/players.py +++ b/music_assistant/controllers/players.py @@ -964,6 +964,20 @@ class PlayerController(CoreController): if len(changed_values) == 0 and not force_update: return + # handle DSP reload when player is grouped or ungrouped + prev_is_grouped = bool(prev_state.get("synced_to")) or bool(prev_state.get("group_childs")) + new_is_grouped = bool(new_state.get("synced_to")) or bool(new_state.get("group_childs")) + + if prev_is_grouped != new_is_grouped: + dsp_config = self.mass.config.get_player_dsp_config(player_id) + supports_multi_device_dsp = PlayerFeature.MULTI_DEVICE_DSP in player.supported_features + if dsp_config.enabled and not supports_multi_device_dsp: + # We now know that that the player was grouped or ungrouped, + # the player has a custom DSP enabled, but the player provider does + # not support multi-device DSP. + # So we need to reload the DSP configuration. + self.mass.create_task(self.mass.players.on_player_dsp_change(player_id)) + if changed_values.keys() != {"elapsed_time"} or force_update: # ignore elapsed_time only changes self.mass.signal_event(EventType.PLAYER_UPDATED, object_id=player_id, data=player) diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index 5811fe07..c9af6693 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -17,6 +17,7 @@ from aiohttp import ClientTimeout from music_assistant_models.enums import ( ContentType, MediaType, + PlayerFeature, StreamType, VolumeNormalizationMode, ) @@ -836,6 +837,13 @@ def get_player_filter_params( dsp = mass.config.get_player_dsp_config(player_id) + if player := mass.players.get(player_id): + is_grouped = bool(player.synced_to) or bool(player.group_childs) + if is_grouped and PlayerFeature.MULTI_DEVICE_DSP not in player.supported_features: + # We can not correctly apply DSP to a grouped player without multi-device DSP support, + # so we disable it. + dsp.enabled = False + if dsp.enabled: # Apply input gain if dsp.input_gain != 0: diff --git a/music_assistant/providers/airplay/provider.py b/music_assistant/providers/airplay/provider.py index 517f3ce0..fc41997b 100644 --- a/music_assistant/providers/airplay/provider.py +++ b/music_assistant/providers/airplay/provider.py @@ -306,8 +306,8 @@ class AirplayProvider(PlayerProvider): # special case: UGP stream ugp_provider = cast(PlayerGroupProvider, self.mass.get_provider("player_group")) ugp_stream = ugp_provider.ugp_streams[media.queue_id] - input_format = ugp_stream.output_format - audio_source = ugp_stream.subscribe() + input_format = ugp_stream.base_pcm_format + audio_source = ugp_stream.subscribe_raw() elif media.queue_id and media.queue_item_id: # regular queue (flow) stream request input_format = AIRPLAY_FLOW_PCM_FORMAT @@ -528,6 +528,7 @@ class AirplayProvider(PlayerProvider): supported_features={ PlayerFeature.PAUSE, PlayerFeature.SET_MEMBERS, + PlayerFeature.MULTI_DEVICE_DSP, PlayerFeature.VOLUME_SET, }, volume_level=volume, diff --git a/music_assistant/providers/player_group/__init__.py b/music_assistant/providers/player_group/__init__.py index 9d89d133..747bb01b 100644 --- a/music_assistant/providers/player_group/__init__.py +++ b/music_assistant/providers/player_group/__init__.py @@ -56,6 +56,7 @@ from music_assistant.constants import ( create_sample_rates_config_entry, ) from music_assistant.controllers.streams import DEFAULT_STREAM_HEADERS +from music_assistant.helpers.audio import get_player_filter_params from music_assistant.helpers.ffmpeg import get_ffmpeg_stream from music_assistant.helpers.util import TaskManager from music_assistant.models.player_provider import PlayerProvider @@ -453,7 +454,9 @@ class PlayerGroupProvider(PlayerProvider): ) # start the stream task - self.ugp_streams[player_id] = UGPStream(audio_source=audio_source, audio_format=UGP_FORMAT) + self.ugp_streams[player_id] = UGPStream( + audio_source=audio_source, audio_format=UGP_FORMAT, base_pcm_format=UGP_FORMAT + ) base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.mp3" # set the state optimistically @@ -659,7 +662,11 @@ class PlayerGroupProvider(PlayerProvider): self, group_player_id: str, group_type: str, name: str, members: Iterable[str] ) -> Player: """Register a syncgroup player.""" - player_features = {PlayerFeature.POWER, PlayerFeature.VOLUME_SET} + player_features = { + PlayerFeature.POWER, + PlayerFeature.VOLUME_SET, + PlayerFeature.MULTI_DEVICE_DSP, + } if not (self.mass.players.get(x) for x in members): raise PlayerUnavailableError("One or more members are not available!") @@ -825,6 +832,10 @@ class PlayerGroupProvider(PlayerProvider): ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1] child_player_id = request.query.get("player_id") # optional! + # Right now we default to MP3 output format, since it's the most compatible + # TODO: use the player's preferred output format + output_format = AudioFormat(content_type=ContentType.MP3) + if not (ugp_player := self.mass.players.get(ugp_player_id)): raise web.HTTPNotFound(reason=f"Unknown UGP player: {ugp_player_id}") @@ -860,7 +871,18 @@ class PlayerGroupProvider(PlayerProvider): ugp_player.display_name, child_player_id or request.remote, ) - async for chunk in stream.subscribe(): + + # Generate filter params for the player specific DSP settings + filter_params = None + if child_player_id: + filter_params = get_player_filter_params( + self.mass, child_player_id, stream.input_format + ) + + async for chunk in stream.get_stream( + output_format, + filter_params=filter_params, + ): try: await resp.write(chunk) except (ConnectionError, ConnectionResetError): diff --git a/music_assistant/providers/player_group/ugp_stream.py b/music_assistant/providers/player_group/ugp_stream.py index ed9654a6..5b13815b 100644 --- a/music_assistant/providers/player_group/ugp_stream.py +++ b/music_assistant/providers/player_group/ugp_stream.py @@ -9,9 +9,10 @@ from __future__ import annotations import asyncio from collections.abc import AsyncGenerator, Awaitable, Callable +from typing import TYPE_CHECKING -from music_assistant_models.enums import ContentType -from music_assistant_models.media_items import AudioFormat +if TYPE_CHECKING: + from music_assistant_models.media_items import AudioFormat from music_assistant.helpers.audio import get_ffmpeg_stream from music_assistant.helpers.util import empty_queue @@ -31,11 +32,12 @@ class UGPStream: self, audio_source: AsyncGenerator[bytes, None], audio_format: AudioFormat, + base_pcm_format: AudioFormat, ) -> None: """Initialize UGP Stream.""" self.audio_source = audio_source self.input_format = audio_format - self.output_format = AudioFormat(content_type=ContentType.MP3) + self.base_pcm_format = base_pcm_format self.subscribers: list[Callable[[bytes], Awaitable]] = [] self._task: asyncio.Task | None = None self._done: asyncio.Event = asyncio.Event() @@ -53,8 +55,12 @@ class UGPStream: self._task.cancel() self._done.set() - async def subscribe(self) -> AsyncGenerator[bytes, None]: - """Subscribe to the raw/unaltered audio stream.""" + async def subscribe_raw(self) -> AsyncGenerator[bytes, None]: + """ + Subscribe to the raw/unaltered audio stream. + + The returned stream has the format `self.base_pcm_format`. + """ # start the runner as soon as the (first) client connects if not self._task: self._task = asyncio.create_task(self._runner()) @@ -71,13 +77,26 @@ class UGPStream: empty_queue(queue) del queue + async def get_stream( + self, output_format: AudioFormat, filter_params: list[str] | None = None + ) -> AsyncGenerator[bytes, None]: + """Subscribe to the client specific audio stream.""" + # start the runner as soon as the (first) client connects + async for chunk in get_ffmpeg_stream( + audio_input=self.subscribe_raw(), + input_format=self.base_pcm_format, + output_format=output_format, + filter_params=filter_params, + ): + yield chunk + async def _runner(self) -> None: """Run the stream for the given audio source.""" await asyncio.sleep(0.25) # small delay to allow subscribers to connect async for chunk in get_ffmpeg_stream( audio_input=self.audio_source, input_format=self.input_format, - output_format=self.output_format, + output_format=self.base_pcm_format, # we don't allow the player to buffer too much ahead so we use readrate limiting extra_input_args=["-readrate", "1.1", "-readrate_initial_burst", "10"], ): diff --git a/music_assistant/providers/slimproto/__init__.py b/music_assistant/providers/slimproto/__init__.py index ab50aa94..601103c6 100644 --- a/music_assistant/providers/slimproto/__init__.py +++ b/music_assistant/providers/slimproto/__init__.py @@ -380,7 +380,8 @@ class SlimprotoProvider(PlayerProvider): # special case: UGP stream ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group") ugp_stream = ugp_provider.ugp_streams[media.queue_id] - audio_source = ugp_stream.subscribe() + # Filter is later applied in MultiClientStream + audio_source = ugp_stream.get_stream(master_audio_format, filter_params=None) elif media.queue_id and media.queue_item_id: # regular queue stream request audio_source = self.mass.streams.get_flow_stream( @@ -644,6 +645,7 @@ class SlimprotoProvider(PlayerProvider): supported_features={ PlayerFeature.POWER, PlayerFeature.SET_MEMBERS, + PlayerFeature.MULTI_DEVICE_DSP, PlayerFeature.VOLUME_SET, PlayerFeature.PAUSE, PlayerFeature.VOLUME_MUTE, diff --git a/music_assistant/providers/snapcast/__init__.py b/music_assistant/providers/snapcast/__init__.py index 11bb1a56..2fa7e41b 100644 --- a/music_assistant/providers/snapcast/__init__.py +++ b/music_assistant/providers/snapcast/__init__.py @@ -510,8 +510,8 @@ class SnapCastProvider(PlayerProvider): # special case: UGP stream ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group") ugp_stream = ugp_provider.ugp_streams[media.queue_id] - input_format = ugp_stream.output_format - audio_source = ugp_stream.subscribe() + input_format = ugp_stream.base_pcm_format + audio_source = ugp_stream.subscribe_raw() elif media.queue_id and media.queue_item_id: # regular queue (flow) stream request input_format = DEFAULT_SNAPCAST_PCM_FORMAT -- 2.34.1