From: sprocket-9 Date: Thu, 20 Nov 2025 13:06:19 +0000 (+0000) Subject: Add VBAN Receiver plugin provider (#2498) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=720a04c8476b11708b106e2bc538ca3f35f683ba;p=music-assistant-server.git Add VBAN Receiver plugin provider (#2498) --- diff --git a/music_assistant/providers/vban_receiver/__init__.py b/music_assistant/providers/vban_receiver/__init__.py new file mode 100644 index 00000000..d8ccfead --- /dev/null +++ b/music_assistant/providers/vban_receiver/__init__.py @@ -0,0 +1,323 @@ +"""VBAN protocol receiver plugin for Music Assistant.""" + +from __future__ import annotations + +import asyncio +import re +from collections.abc import AsyncGenerator +from contextlib import suppress +from typing import TYPE_CHECKING, cast + +from aiovban.asyncio.util import BackPressureStrategy +from aiovban.enums import VBANSampleRate +from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption +from music_assistant_models.enums import ( + ConfigEntryType, + ContentType, + ProviderFeature, + StreamType, +) +from music_assistant_models.errors import SetupFailedError +from music_assistant_models.media_items import AudioFormat +from music_assistant_models.streamdetails import StreamMetadata + +from music_assistant.constants import ( + CONF_BIND_IP, + CONF_BIND_PORT, + CONF_ENTRY_WARN_PREVIEW, +) +from music_assistant.helpers.util import ( + get_ip_addresses, +) +from music_assistant.models.plugin import PluginProvider, PluginSource + +from .vban import AsyncVBANClientMod + +if TYPE_CHECKING: + from aiovban.asyncio.device import VBANDevice + from aiovban.asyncio.streams import VBANIncomingStream + from music_assistant_models.config_entries import ConfigValueType, ProviderConfig + from music_assistant_models.provider import ProviderManifest + + from music_assistant.mass import MusicAssistant + from music_assistant.models import ProviderInstanceType + +DEFAULT_UDP_PORT = 6980 +DEFAULT_PCM_AUDIO_FORMAT = "S16LE" +DEFAULT_PCM_SAMPLE_RATE = 44100 +DEFAULT_AUDIO_CHANNELS = 2 + +CONF_VBAN_STREAM_NAME = "vban_stream_name" +CONF_SENDER_HOST = "sender_host" +CONF_PCM_AUDIO_FORMAT = "audio_format" +CONF_PCM_SAMPLE_RATE = "sample_rate" +CONF_AUDIO_CHANNELS = "audio_channels" +CONF_VBAN_QUEUE_STRATEGY = "vban_queue_strategy" +CONF_VBAN_QUEUE_SIZE = "vban_queue_size" + +VBAN_QUEUE_STRATEGIES = { + "Clear entire queue": BackPressureStrategy.DROP, + "Clear the oldest half of the queue": BackPressureStrategy.DRAIN_OLDEST, + "Remove single oldest queue entry": BackPressureStrategy.POP, +} + +SUPPORTED_FEATURES = {ProviderFeature.AUDIO_SOURCE} + + +def _get_supported_pcm_formats() -> dict[str, int]: + """Return supported PCM formats.""" + pcm_formats = {} + for content_type in ContentType.__members__: + if match := re.match(r"PCM_([S|F](\d{2})LE)", content_type): + pcm_formats[match.group(1)] = int(match.group(2)) + return pcm_formats + + +def _get_vban_sample_rates() -> list[int]: + """Return supported VBAN sample rates.""" + return [int(member.split("_")[1]) for member in VBANSampleRate.__members__] + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider(instance) with given configuration.""" + return VBANReceiverProvider(mass, manifest, config) + + +async def get_config_entries( + mass: MusicAssistant, # noqa: ARG001 + instance_id: str | None = None, # noqa: ARG001 + action: str | None = None, # noqa: ARG001 + values: dict[str, ConfigValueType] | None = None, # noqa: ARG001 +) -> tuple[ConfigEntry, ...]: + """ + Return Config entries to setup this provider. + + instance_id: id of an existing provider instance (None if new instance setup). + action: [optional] action key called from config entries UI. + values: the (intermediate) raw values for config entries sent with the action. + """ + ip_addresses = await get_ip_addresses() + + def _validate_stream_name(config_value: str) -> bool: + """Validate stream name.""" + try: + config_value.encode("ascii") + except UnicodeEncodeError: + return False + return len(config_value) < 17 + + return ( + CONF_ENTRY_WARN_PREVIEW, + ConfigEntry( + key=CONF_BIND_PORT, + type=ConfigEntryType.INTEGER, + default_value=DEFAULT_UDP_PORT, + label="Receiver: UDP Port", + description="The UDP port the VBAN receiver will listen on for connections. " + "Make sure that this server can be reached " + "on the given IP and UDP port by remote VBAN senders.", + ), + ConfigEntry( + key=CONF_VBAN_STREAM_NAME, + type=ConfigEntryType.STRING, + label="Sender: VBAN Stream Name", + default_value="Network AUX", + description="Max 16 ASCII chars.\n" + "The VBAN stream name to expect from the remote VBAN sender.\n" + "This MUST match what the remote VBAN sender has set for the session name " + "otherwise audio streaming will not work.", + required=True, + validate=_validate_stream_name, # type: ignore[arg-type] + ), + ConfigEntry( + key=CONF_SENDER_HOST, + type=ConfigEntryType.STRING, + default_value="127.0.0.1", + label="Sender: VBAN Sender hostname/IP address", + description="The hostname/IP Address of the remote VBAN SENDER.", + required=True, + ), + ConfigEntry( + key=CONF_PCM_AUDIO_FORMAT, + type=ConfigEntryType.STRING, + default_value=DEFAULT_PCM_AUDIO_FORMAT, + options=[ConfigValueOption(x, x) for x in _get_supported_pcm_formats()], + label="PCM audio format", + description="The VBAN PCM audio format to expect from the remote VBAN sender. " + "This MUST match what the remote VBAN sender has set otherwise audio streaming " + "will not work.", + required=True, + ), + ConfigEntry( + key=CONF_PCM_SAMPLE_RATE, + type=ConfigEntryType.INTEGER, + default_value=DEFAULT_PCM_SAMPLE_RATE, + options=[ConfigValueOption(str(x), x) for x in _get_vban_sample_rates()], + label="PCM sample rate", + description="The VBAN PCM sample rate to expect from the remote VBAN sender. " + "This MUST match what the remote VBAN sender has set otherwise audio streaming " + "will not work.", + required=True, + ), + ConfigEntry( + key=CONF_AUDIO_CHANNELS, + type=ConfigEntryType.INTEGER, + default_value=DEFAULT_AUDIO_CHANNELS, + options=[ConfigValueOption(str(x), x) for x in list(range(1, 9))], + label="Channels", + description="The number of audio channels", + required=True, + ), + ConfigEntry( + key=CONF_BIND_IP, + type=ConfigEntryType.STRING, + default_value="0.0.0.0", + options=[ConfigValueOption(x, x) for x in {"0.0.0.0", *ip_addresses}], + label="Receiver: Bind to IP/interface", + description="Start the VBAN receiver on this specific interface. \n" + "Use 0.0.0.0 to bind to all interfaces, which is the default. \n" + "This is an advanced setting that should normally " + "not be adjusted in regular setups.", + category="advanced", + required=True, + ), + ConfigEntry( + key=CONF_VBAN_QUEUE_STRATEGY, + type=ConfigEntryType.STRING, + default_value=next(iter(VBAN_QUEUE_STRATEGIES)), + options=[ConfigValueOption(x, x) for x in VBAN_QUEUE_STRATEGIES], + label="Receiver: VBAN queue strategy", + description="What should happen if the receiving queue fills up?", + category="advanced", + required=True, + ), + ConfigEntry( + key=CONF_VBAN_QUEUE_SIZE, + type=ConfigEntryType.INTEGER, + default_value=AsyncVBANClientMod.default_queue_size, + label="Receiver: VBAN packets queue size", + description="This can be increased if MA is running on a very low power device, " + "otherwise this should not need to be changed.", + category="advanced", + required=True, + ), + ) + + +class VBANReceiverProvider(PluginProvider): + """Implementation of a VBAN protocol receiver plugin.""" + + def __init__( + self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig + ) -> None: + """Initialize MusicProvider.""" + super().__init__(mass, manifest, config, SUPPORTED_FEATURES) + self._bind_port: int = cast("int", self.config.get_value(CONF_BIND_PORT)) + self._bind_ip: str = cast("str", self.config.get_value(CONF_BIND_IP)) + self._sender_host: str = cast("str", self.config.get_value(CONF_SENDER_HOST)) + self._vban_stream_name: str = cast("str", self.config.get_value(CONF_VBAN_STREAM_NAME)) + self._pcm_audio_format: str = cast("str", self.config.get_value(CONF_PCM_AUDIO_FORMAT)) + self._pcm_sample_rate: int = cast("int", self.config.get_value(CONF_PCM_SAMPLE_RATE)) + self._audio_channels: int = cast("int", self.config.get_value(CONF_AUDIO_CHANNELS)) + self._vban_queue_strategy: BackPressureStrategy = VBAN_QUEUE_STRATEGIES[ + cast("str", self.config.get_value(CONF_VBAN_QUEUE_STRATEGY)) + ] + self._vban_queue_size: int = cast("int", self.config.get_value(CONF_VBAN_QUEUE_SIZE)) + + self._vban_receiver: AsyncVBANClientMod | None = None + self._vban_sender: VBANDevice | None = None + self._vban_stream: VBANIncomingStream | None = None + + self._source_details = PluginSource( + id=self.instance_id, + name=f"{self.manifest.name}: {self._vban_stream_name}", + passive=False, + can_play_pause=False, + can_seek=False, + can_next_previous=False, + audio_format=AudioFormat( + content_type=ContentType(self._pcm_audio_format.lower()), + codec_type=ContentType(self._pcm_audio_format.lower()), + sample_rate=self._pcm_sample_rate, + bit_depth=_get_supported_pcm_formats()[self._pcm_audio_format], + channels=self._audio_channels, + ), + metadata=StreamMetadata( + title=self._vban_stream_name, + artist=self._sender_host, + ), + stream_type=StreamType.CUSTOM, + ) + + @property + def instance_name_postfix(self) -> str | None: + """Return a (default) instance name postfix for this provider instance.""" + return self._vban_stream_name + + async def handle_async_init(self) -> None: + """Handle async initialization of the provider.""" + self._vban_receiver = AsyncVBANClientMod( + default_queue_size=self._vban_queue_size, ignore_audio_streams=False + ) + try: + self._udp_socket_task = asyncio.create_task( + self._vban_receiver.listen( + address=self._bind_ip, port=self._bind_port, controller=self + ) + ) + except OSError as err: + raise SetupFailedError(f"Failed to start VBAN receiver plugin: {err}") from err + + self._vban_sender = self._vban_receiver.register_device(self._sender_host) + if self._vban_sender: + self._vban_stream = self._vban_sender.receive_stream( + self._vban_stream_name, back_pressure_strategy=self._vban_queue_strategy + ) + + async def unload(self, is_removed: bool = False) -> None: + """Handle close/cleanup of the provider.""" + self.logger.debug("Unloading plugin") + if self._vban_receiver: + self.logger.debug("Closing UDP transport") + self._vban_receiver.close() + with suppress(asyncio.CancelledError): + await self._udp_socket_task + + self._vban_receiver = None + self._vban_sender = None + self._vban_stream = None + await asyncio.sleep(0.1) + + def get_source(self) -> PluginSource: + """Get (audio)source details for this plugin.""" + return self._source_details + + @property + def active_player(self) -> bool: + """Report the active player status.""" + return bool(self._source_details.in_use_by) + + async def get_audio_stream(self, player_id: str) -> AsyncGenerator[bytes, None]: + """Yield raw PCM chunks from the VBANIncomingStream queue.""" + self.logger.debug( + "Getting VBAN PCM audio stream for Player: %s//Stream: %s//Config: %s", + player_id, + self._vban_stream_name, + self._source_details.audio_format.output_format_str, + ) + while ( + self._source_details.in_use_by + and self._vban_stream + and not self._udp_socket_task.done() + ): + try: + packet = await self._vban_stream.get_packet() + except asyncio.QueueShutDown: # type: ignore[attr-defined] + self.logger.error( + "Found VBANIncomingStream queue shut down when attempting to get VBAN packet" + ) + break + + yield packet.body.data diff --git a/music_assistant/providers/vban_receiver/icon.svg b/music_assistant/providers/vban_receiver/icon.svg new file mode 100644 index 00000000..939b98ab --- /dev/null +++ b/music_assistant/providers/vban_receiver/icon.svg @@ -0,0 +1,22 @@ + + + + diff --git a/music_assistant/providers/vban_receiver/icon_monochrome.svg b/music_assistant/providers/vban_receiver/icon_monochrome.svg new file mode 100644 index 00000000..e7aac70d --- /dev/null +++ b/music_assistant/providers/vban_receiver/icon_monochrome.svg @@ -0,0 +1,22 @@ + + + + diff --git a/music_assistant/providers/vban_receiver/manifest.json b/music_assistant/providers/vban_receiver/manifest.json new file mode 100644 index 00000000..dc42ca24 --- /dev/null +++ b/music_assistant/providers/vban_receiver/manifest.json @@ -0,0 +1,10 @@ +{ + "type": "plugin", + "domain": "vban_receiver", + "stage": "alpha", + "name": "VBAN Receiver", + "description": "VBAN protocol receiver - receive PCM-over-UDP streams from a VBAN protocol sender", + "codeowners": ["@sprocket-9"], + "documentation": "https://music-assistant.io/plugins/vban-receiver/", + "multi_instance": true +} diff --git a/music_assistant/providers/vban_receiver/vban.py b/music_assistant/providers/vban_receiver/vban.py new file mode 100644 index 00000000..68d3f74e --- /dev/null +++ b/music_assistant/providers/vban_receiver/vban.py @@ -0,0 +1,112 @@ +"""VBAN subclasses to workaround issues in aiovban 0.6.3.""" + +from __future__ import annotations + +import asyncio +import logging +import os +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from aiovban.asyncio import AsyncVBANClient +from aiovban.packet import VBANPacket +from aiovban.packet.headers import VBANHeaderException + +if TYPE_CHECKING: + from . import VBANReceiverProvider + +logger = logging.getLogger(__name__) +_aiovban_log_level = os.environ.get("AIOVBAN_LOG_LEVEL", "info").upper() +logging.getLogger("aiovban.asyncio.aiovban.asyncio.util").setLevel(_aiovban_log_level) + + +class VBANListenerProtocolMod(asyncio.DatagramProtocol): + """VBANListenerProcotol workaround.""" + + def __init__(self, client: AsyncVBANClientMod) -> None: + """Initialize.""" + # WORKAROUND: each instance gets it's own Future. + self.done: asyncio.Future[Any] = asyncio.get_event_loop().create_future() + self._background_tasks: set[asyncio.Task[Any]] = set() + self._client = client + + def error_received(self, exc: Exception) -> None: + """Handle error.""" + self.done.set_exception(exc) + + def connection_lost(self, exc: Exception | None) -> None: + """Handle lost connection.""" + if self.done.done(): + return + # WORKAROUND: handle exc properly. + if exc: + self.done.set_exception(exc) + else: + self.done.set_result(None) + + def connection_made(self, transport) -> None: # type: ignore[no-untyped-def] + """Handle connection made.""" + logger.debug(f"Connection made to {transport}") + + def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: + """Handle received datagram.""" + sender_ip, sender_port = addr + + if self._client.quick_reject(sender_ip) or not self._client.active_player: + return + + try: + packet = VBANPacket.unpack(data) + except VBANHeaderException as exc: + logger.error(f"Error unpacking packet: {exc}") + return + except ValueError as exc: + # Handle odd packet sent when Voicemeeter start/stops stream + error_msg = "6000 is not a valid VBANSampleRate" + if str(exc) == error_msg: + return + raise + + task = asyncio.create_task(self._client.process_packet(sender_ip, sender_port, packet)) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + + +@dataclass +class AsyncVBANClientMod(AsyncVBANClient): # type: ignore[misc] + """AsyncVBANClient workaround.""" + + _controller: VBANReceiverProvider | None = None + + @property + def active_player(self) -> bool: + """Report the active player status.""" + return False if not self._controller else self._controller.active_player + + async def listen( + self, + address: str = "0.0.0.0", + port: int = 6980, + loop: asyncio.AbstractEventLoop | None = None, + controller: VBANReceiverProvider | None = None, + ) -> None: + """Create UDP listener.""" + loop = loop or asyncio.get_running_loop() + self._controller = controller + + # Create a socket and set the options + self._transport, proto = await loop.create_datagram_endpoint( + lambda: VBANListenerProtocolMod(self), + local_addr=(address, port), + allow_broadcast=not self.ignore_audio_streams, + ) + + # WORKAROUND: await, not return. + await proto.done + + def close(self) -> None: + """Close down the connection.""" + self._controller = None + if self._transport: + self._transport.close() + self._transport = None # type: ignore[assignment] diff --git a/pyproject.toml b/pyproject.toml index 6239a336..67d7cbd0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ dependencies = [ "uv>=0.8.0", "librosa==0.11.0", "gql[all]==4.0.0", + "aiovban>=0.6.3", ] description = "Music Assistant" license = {text = "Apache-2.0"} diff --git a/requirements_all.txt b/requirements_all.txt index 2a3721a2..22871ada 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -14,6 +14,7 @@ aiorun==2025.1.1 aioslimproto==3.1.1 aiosonos==0.1.9 aiosqlite==0.21.0 +aiovban>=0.6.3 alexapy==1.29.8 async-upnp-client==0.45.0 audible==0.10.0