Feat: Add multi device DSP support (#1839)
authorMaxim Raznatovski <nda.mr43@gmail.com>
Thu, 9 Jan 2025 23:08:32 +0000 (00:08 +0100)
committerGitHub <noreply@github.com>
Thu, 9 Jan 2025 23:08:32 +0000 (00:08 +0100)
* Fix: Disable DSP in unsupported multiroom scenarios

* Fix: Reload DSP in case either entering or exiting grouped playback

* Feat: Mark Airplay and Slimproto as multi device DSP compatible

* Feat: Add client specific stream support to universal groups

* Feat: Apply DSP to `ugp_*` streams

* Feat: Mark all universal groups as MULTI_DEVICE_DSP compatible

* Refactor: decouple input and shared base format in UGPStream

music_assistant/controllers/players.py
music_assistant/helpers/audio.py
music_assistant/providers/airplay/provider.py
music_assistant/providers/player_group/__init__.py
music_assistant/providers/player_group/ugp_stream.py
music_assistant/providers/slimproto/__init__.py
music_assistant/providers/snapcast/__init__.py

index 72dc2276f9b7d43ecf15e1aed4ee75f7c168cfff..76a6610ff1c037d3e4c41b1caf43af80b77c3cd2 100644 (file)
@@ -964,6 +964,20 @@ class PlayerController(CoreController):
         if len(changed_values) == 0 and not force_update:
             return
 
+        # handle DSP reload when player is grouped or ungrouped
+        prev_is_grouped = bool(prev_state.get("synced_to")) or bool(prev_state.get("group_childs"))
+        new_is_grouped = bool(new_state.get("synced_to")) or bool(new_state.get("group_childs"))
+
+        if prev_is_grouped != new_is_grouped:
+            dsp_config = self.mass.config.get_player_dsp_config(player_id)
+            supports_multi_device_dsp = PlayerFeature.MULTI_DEVICE_DSP in player.supported_features
+            if dsp_config.enabled and not supports_multi_device_dsp:
+                # We now know that that the player was grouped or ungrouped,
+                # the player has a custom DSP enabled, but the player provider does
+                # not support multi-device DSP.
+                # So we need to reload the DSP configuration.
+                self.mass.create_task(self.mass.players.on_player_dsp_change(player_id))
+
         if changed_values.keys() != {"elapsed_time"} or force_update:
             # ignore elapsed_time only changes
             self.mass.signal_event(EventType.PLAYER_UPDATED, object_id=player_id, data=player)
index 5811fe073a8818185e7d6f1f6e4f89e5604a0ad1..c9af669380bbf1f1707d6f187929cb04f690c64c 100644 (file)
@@ -17,6 +17,7 @@ from aiohttp import ClientTimeout
 from music_assistant_models.enums import (
     ContentType,
     MediaType,
+    PlayerFeature,
     StreamType,
     VolumeNormalizationMode,
 )
@@ -836,6 +837,13 @@ def get_player_filter_params(
 
     dsp = mass.config.get_player_dsp_config(player_id)
 
+    if player := mass.players.get(player_id):
+        is_grouped = bool(player.synced_to) or bool(player.group_childs)
+        if is_grouped and PlayerFeature.MULTI_DEVICE_DSP not in player.supported_features:
+            # We can not correctly apply DSP to a grouped player without multi-device DSP support,
+            # so we disable it.
+            dsp.enabled = False
+
     if dsp.enabled:
         # Apply input gain
         if dsp.input_gain != 0:
index 517f3ce00bd7938a8a77881cd832dde0c4352d3a..fc41997beaa6b27ce3fce7702436fd7abfff9058 100644 (file)
@@ -306,8 +306,8 @@ class AirplayProvider(PlayerProvider):
             # special case: UGP stream
             ugp_provider = cast(PlayerGroupProvider, self.mass.get_provider("player_group"))
             ugp_stream = ugp_provider.ugp_streams[media.queue_id]
-            input_format = ugp_stream.output_format
-            audio_source = ugp_stream.subscribe()
+            input_format = ugp_stream.base_pcm_format
+            audio_source = ugp_stream.subscribe_raw()
         elif media.queue_id and media.queue_item_id:
             # regular queue (flow) stream request
             input_format = AIRPLAY_FLOW_PCM_FORMAT
@@ -528,6 +528,7 @@ class AirplayProvider(PlayerProvider):
             supported_features={
                 PlayerFeature.PAUSE,
                 PlayerFeature.SET_MEMBERS,
+                PlayerFeature.MULTI_DEVICE_DSP,
                 PlayerFeature.VOLUME_SET,
             },
             volume_level=volume,
index 9d89d133d88eaeb866f01220ca273a8751a6e734..747bb01bbf6ab8873ff6be26adfb02c9482e6c16 100644 (file)
@@ -56,6 +56,7 @@ from music_assistant.constants import (
     create_sample_rates_config_entry,
 )
 from music_assistant.controllers.streams import DEFAULT_STREAM_HEADERS
+from music_assistant.helpers.audio import get_player_filter_params
 from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
 from music_assistant.helpers.util import TaskManager
 from music_assistant.models.player_provider import PlayerProvider
@@ -453,7 +454,9 @@ class PlayerGroupProvider(PlayerProvider):
             )
 
         # start the stream task
-        self.ugp_streams[player_id] = UGPStream(audio_source=audio_source, audio_format=UGP_FORMAT)
+        self.ugp_streams[player_id] = UGPStream(
+            audio_source=audio_source, audio_format=UGP_FORMAT, base_pcm_format=UGP_FORMAT
+        )
         base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.mp3"
 
         # set the state optimistically
@@ -659,7 +662,11 @@ class PlayerGroupProvider(PlayerProvider):
         self, group_player_id: str, group_type: str, name: str, members: Iterable[str]
     ) -> Player:
         """Register a syncgroup player."""
-        player_features = {PlayerFeature.POWER, PlayerFeature.VOLUME_SET}
+        player_features = {
+            PlayerFeature.POWER,
+            PlayerFeature.VOLUME_SET,
+            PlayerFeature.MULTI_DEVICE_DSP,
+        }
 
         if not (self.mass.players.get(x) for x in members):
             raise PlayerUnavailableError("One or more members are not available!")
@@ -825,6 +832,10 @@ class PlayerGroupProvider(PlayerProvider):
         ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1]
         child_player_id = request.query.get("player_id")  # optional!
 
+        # Right now we default to MP3 output format, since it's the most compatible
+        # TODO: use the player's preferred output format
+        output_format = AudioFormat(content_type=ContentType.MP3)
+
         if not (ugp_player := self.mass.players.get(ugp_player_id)):
             raise web.HTTPNotFound(reason=f"Unknown UGP player: {ugp_player_id}")
 
@@ -860,7 +871,18 @@ class PlayerGroupProvider(PlayerProvider):
             ugp_player.display_name,
             child_player_id or request.remote,
         )
-        async for chunk in stream.subscribe():
+
+        # Generate filter params for the player specific DSP settings
+        filter_params = None
+        if child_player_id:
+            filter_params = get_player_filter_params(
+                self.mass, child_player_id, stream.input_format
+            )
+
+        async for chunk in stream.get_stream(
+            output_format,
+            filter_params=filter_params,
+        ):
             try:
                 await resp.write(chunk)
             except (ConnectionError, ConnectionResetError):
index ed9654a6d3829bdf0490b164abdbe4965b9149fc..5b13815bdb71f54f48a90f4d82d27ee23b48d78b 100644 (file)
@@ -9,9 +9,10 @@ from __future__ import annotations
 
 import asyncio
 from collections.abc import AsyncGenerator, Awaitable, Callable
+from typing import TYPE_CHECKING
 
-from music_assistant_models.enums import ContentType
-from music_assistant_models.media_items import AudioFormat
+if TYPE_CHECKING:
+    from music_assistant_models.media_items import AudioFormat
 
 from music_assistant.helpers.audio import get_ffmpeg_stream
 from music_assistant.helpers.util import empty_queue
@@ -31,11 +32,12 @@ class UGPStream:
         self,
         audio_source: AsyncGenerator[bytes, None],
         audio_format: AudioFormat,
+        base_pcm_format: AudioFormat,
     ) -> None:
         """Initialize UGP Stream."""
         self.audio_source = audio_source
         self.input_format = audio_format
-        self.output_format = AudioFormat(content_type=ContentType.MP3)
+        self.base_pcm_format = base_pcm_format
         self.subscribers: list[Callable[[bytes], Awaitable]] = []
         self._task: asyncio.Task | None = None
         self._done: asyncio.Event = asyncio.Event()
@@ -53,8 +55,12 @@ class UGPStream:
             self._task.cancel()
         self._done.set()
 
-    async def subscribe(self) -> AsyncGenerator[bytes, None]:
-        """Subscribe to the raw/unaltered audio stream."""
+    async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
+        """
+        Subscribe to the raw/unaltered audio stream.
+
+        The returned stream has the format `self.base_pcm_format`.
+        """
         # start the runner as soon as the (first) client connects
         if not self._task:
             self._task = asyncio.create_task(self._runner())
@@ -71,13 +77,26 @@ class UGPStream:
             empty_queue(queue)
             del queue
 
+    async def get_stream(
+        self, output_format: AudioFormat, filter_params: list[str] | None = None
+    ) -> AsyncGenerator[bytes, None]:
+        """Subscribe to the client specific audio stream."""
+        # start the runner as soon as the (first) client connects
+        async for chunk in get_ffmpeg_stream(
+            audio_input=self.subscribe_raw(),
+            input_format=self.base_pcm_format,
+            output_format=output_format,
+            filter_params=filter_params,
+        ):
+            yield chunk
+
     async def _runner(self) -> None:
         """Run the stream for the given audio source."""
         await asyncio.sleep(0.25)  # small delay to allow subscribers to connect
         async for chunk in get_ffmpeg_stream(
             audio_input=self.audio_source,
             input_format=self.input_format,
-            output_format=self.output_format,
+            output_format=self.base_pcm_format,
             # we don't allow the player to buffer too much ahead so we use readrate limiting
             extra_input_args=["-readrate", "1.1", "-readrate_initial_burst", "10"],
         ):
index ab50aa94244f3a878db1b9733115472bc7a8d3b0..601103c6565e5af381d7ada1742e02b4d8352229 100644 (file)
@@ -380,7 +380,8 @@ class SlimprotoProvider(PlayerProvider):
             # special case: UGP stream
             ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
             ugp_stream = ugp_provider.ugp_streams[media.queue_id]
-            audio_source = ugp_stream.subscribe()
+            # Filter is later applied in MultiClientStream
+            audio_source = ugp_stream.get_stream(master_audio_format, filter_params=None)
         elif media.queue_id and media.queue_item_id:
             # regular queue stream request
             audio_source = self.mass.streams.get_flow_stream(
@@ -644,6 +645,7 @@ class SlimprotoProvider(PlayerProvider):
                 supported_features={
                     PlayerFeature.POWER,
                     PlayerFeature.SET_MEMBERS,
+                    PlayerFeature.MULTI_DEVICE_DSP,
                     PlayerFeature.VOLUME_SET,
                     PlayerFeature.PAUSE,
                     PlayerFeature.VOLUME_MUTE,
index 11bb1a56045606fe4abb1f99725b9ff0d2780f06..2fa7e41b4aafb956f3d5aa8726e48f1022a23b3b 100644 (file)
@@ -510,8 +510,8 @@ class SnapCastProvider(PlayerProvider):
             # special case: UGP stream
             ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
             ugp_stream = ugp_provider.ugp_streams[media.queue_id]
-            input_format = ugp_stream.output_format
-            audio_source = ugp_stream.subscribe()
+            input_format = ugp_stream.base_pcm_format
+            audio_source = ugp_stream.subscribe_raw()
         elif media.queue_id and media.queue_item_id:
             # regular queue (flow) stream request
             input_format = DEFAULT_SNAPCAST_PCM_FORMAT