provider_config["instance_id"] = "universal_group"
self._data[CONF_PROVIDERS]["universal_group"] = provider_config
+ # Migrate resonate provider to sendspin (renamed in 2.7 beta 19)
+ for instance_id, provider_config in list(self._data.get(CONF_PROVIDERS, {}).items()):
+ if provider_config.get("domain") == "resonate":
+ self._data[CONF_PROVIDERS].pop(instance_id, None)
+ provider_config["domain"] = "sendspin"
+ provider_config["instance_id"] = "sendspin"
+ self._data[CONF_PROVIDERS]["sendspin"] = provider_config
+ changed = True
+
# Migrate the crossfade setting into Smart Fade Mode = 'crossfade'
for player_config in self._data.get(CONF_PLAYERS, {}).values():
if not (values := player_config.get("values")):
+++ /dev/null
-"""
-Player Provider for the Resonate Audio Protocol.
-
-https://github.com/Resonate-Protocol/spec
-"""
-
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-from .provider import ResonateProvider
-
-if TYPE_CHECKING:
- from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
- from music_assistant_models.provider import ProviderManifest
-
- from music_assistant.mass import MusicAssistant
- from music_assistant.models import ProviderInstanceType
-
-
-async def setup(
- mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
-) -> ProviderInstanceType:
- """Initialize provider(instance) with given configuration."""
- return ResonateProvider(mass, manifest, config)
-
-
-async def get_config_entries(
- mass: MusicAssistant,
- instance_id: str | None = None,
- action: str | None = None,
- values: dict[str, ConfigValueType] | None = None,
-) -> tuple[ConfigEntry, ...]:
- """
- Return Config entries to setup this provider.
-
- instance_id: id of an existing provider instance (None if new instance setup).
- action: [optional] action key called from config entries UI.
- values: the (intermediate) raw values for config entries sent with the action.
- """
- # ruff: noqa: ARG001
- return ()
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 25 25" version="1.1">
-<g id="surface1">
-<path style=" stroke:none;fill-rule:nonzero;fill:rgb(0%,0%,0%);fill-opacity:1;" d="M 1.5 0 L 23.5 0 C 24.328125 0 25 0.671875 25 1.5 L 25 23.5 C 25 24.328125 24.328125 25 23.5 25 L 1.5 25 C 0.671875 25 0 24.328125 0 23.5 L 0 1.5 C 0 0.671875 0.671875 0 1.5 0 Z M 1.5 0 "/>
-<path style=" stroke:none;fill-rule:nonzero;fill:rgb(100%,100%,100%);fill-opacity:1;" d="M 10.386719 18.875 L 14.8125 7.125 L 16.113281 7.125 L 11.6875 18.875 Z M 10.386719 18.875 "/>
-<path style=" stroke:none;fill-rule:nonzero;fill:rgb(100%,100%,100%);fill-opacity:1;" d="M 21.371094 18.875 L 16.945312 7.125 L 18.246094 7.125 L 22.671875 18.875 Z M 21.371094 18.875 "/>
-<path style=" stroke:none;fill-rule:nonzero;fill:rgb(100%,100%,100%);fill-opacity:1;" d="M 2.636719 18.875 L 2.636719 7.125 L 3.875 7.125 L 3.875 18.875 Z M 2.636719 18.875 "/>
-<path style=" stroke:none;fill-rule:nonzero;fill:rgb(100%,100%,100%);fill-opacity:1;" d="M 5.445312 18.875 L 5.445312 7.125 L 6.683594 7.125 L 6.683594 18.875 Z M 5.445312 18.875 "/>
-<path style=" stroke:none;fill-rule:nonzero;fill:rgb(100%,100%,100%);fill-opacity:1;" d="M 8.253906 18.875 L 8.253906 7.125 L 9.492188 7.125 L 9.492188 18.875 Z M 8.253906 18.875 "/>
-</g>
-</svg>
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<svg
- viewBox="0 0 20.039139 20.039139"
- version="1.1"
- id="svg6"
- sodipodi:docname="icon_monochrome.svg"
- inkscape:version="1.3.2 (091e20e, 2023-11-25, custom)"
- width="512"
- height="512"
- xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
- xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
- xmlns="http://www.w3.org/2000/svg"
- xmlns:svg="http://www.w3.org/2000/svg">
- <defs
- id="defs6" />
- <sodipodi:namedview
- id="namedview6"
- pagecolor="#ffffff"
- bordercolor="#000000"
- borderopacity="0.25"
- inkscape:showpageshadow="2"
- inkscape:pageopacity="0.0"
- inkscape:pagecheckerboard="0"
- inkscape:deskcolor="#d1d1d1"
- inkscape:zoom="1.7324219"
- inkscape:cx="256"
- inkscape:cy="256"
- inkscape:window-width="1920"
- inkscape:window-height="1129"
- inkscape:window-x="-8"
- inkscape:window-y="-8"
- inkscape:window-maximized="1"
- inkscape:current-layer="svg6" />
- <g
- id="surface1"
- transform="translate(-2.637486,-2.9780951)">
- <path
- style="fill:#000000;fill-opacity:0;fill-rule:nonzero;stroke:none"
- d="m 1.5,0 h 22 C 24.328125,0 25,0.671875 25,1.5 v 22 C 25,24.328125 24.328125,25 23.5,25 H 1.5 C 0.671875,25 0,24.328125 0,23.5 V 1.5 C 0,0.671875 0.671875,0 1.5,0 Z m 0,0"
- id="path1" />
- <path
- style="fill:#ffffff;fill-opacity:1;fill-rule:nonzero;stroke:none"
- d="M 10.386719,18.875 14.8125,7.125 h 1.300781 L 11.6875,18.875 Z m 0,0"
- id="path2" />
- <path
- style="fill:#ffffff;fill-opacity:1;fill-rule:nonzero;stroke:none"
- d="M 21.371094,18.875 16.945312,7.125 h 1.300782 l 4.425781,11.75 z m 0,0"
- id="path3" />
- <path
- style="fill:#ffffff;fill-opacity:1;fill-rule:nonzero;stroke:none"
- d="M 2.636719,18.875 V 7.125 H 3.875 v 11.75 z m 0,0"
- id="path4" />
- <path
- style="fill:#ffffff;fill-opacity:1;fill-rule:nonzero;stroke:none"
- d="M 5.445312,18.875 V 7.125 h 1.238282 v 11.75 z m 0,0"
- id="path5" />
- <path
- style="fill:#ffffff;fill-opacity:1;fill-rule:nonzero;stroke:none"
- d="M 8.253906,18.875 V 7.125 h 1.238282 v 11.75 z m 0,0"
- id="path6" />
- </g>
-</svg>
+++ /dev/null
-{
- "type": "player",
- "domain": "resonate",
- "stage": "alpha",
- "name": "Resonate (WIP)",
- "description": "Resonate (working title) is the next generation streaming protocol built by the Open Home Foundation. Follow the development on Discord to see how you can get involved.",
- "codeowners": ["@music-assistant"],
- "requirements": ["aioresonate==0.13.1"]
-}
+++ /dev/null
-"""Resonate Player implementation."""
-
-from __future__ import annotations
-
-import asyncio
-import time
-from collections.abc import AsyncGenerator, Callable
-from io import BytesIO
-from typing import TYPE_CHECKING, cast
-
-from aioresonate.models import MediaCommand
-from aioresonate.models.types import PlaybackStateType
-from aioresonate.models.types import RepeatMode as ResonateRepeatMode
-from aioresonate.server import AudioFormat as ResonateAudioFormat
-from aioresonate.server import (
- ClientEvent,
- GroupCommandEvent,
- GroupEvent,
- GroupStateChangedEvent,
- VolumeChangedEvent,
-)
-from aioresonate.server.client import DisconnectBehaviour
-from aioresonate.server.events import ClientGroupChangedEvent
-from aioresonate.server.group import (
- GroupDeletedEvent,
- GroupMemberAddedEvent,
- GroupMemberRemovedEvent,
-)
-from aioresonate.server.metadata import Metadata
-from aioresonate.server.stream import AudioCodec, MediaStream
-from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant_models.constants import PLAYER_CONTROL_NONE
-from music_assistant_models.enums import (
- ContentType,
- EventType,
- PlaybackState,
- PlayerFeature,
- PlayerType,
- RepeatMode,
-)
-from music_assistant_models.media_items import AudioFormat
-from music_assistant_models.player import DeviceInfo
-from PIL import Image
-
-from music_assistant.constants import (
- CONF_ENTRY_FLOW_MODE_ENFORCED,
- CONF_ENTRY_OUTPUT_CODEC,
- CONF_OUTPUT_CODEC,
- INTERNAL_PCM_FORMAT,
-)
-from music_assistant.helpers.audio import get_player_filter_params
-from music_assistant.models.player import Player, PlayerMedia
-
-from .timed_client_stream import TimedClientStream
-
-if TYPE_CHECKING:
- from aioresonate.server.client import ResonateClient
- from music_assistant_models.event import MassEvent
-
- from .provider import ResonateProvider
-
-
-class MusicAssistantMediaStream(MediaStream):
- """MediaStream implementation for Music Assistant with per-player DSP support."""
-
- player_instance: ResonatePlayer
- internal_format: AudioFormat
- output_format: AudioFormat
-
- def __init__(
- self,
- *,
- main_channel_source: AsyncGenerator[bytes, None],
- main_channel_format: ResonateAudioFormat,
- player_instance: ResonatePlayer,
- internal_format: AudioFormat,
- output_format: AudioFormat,
- ) -> None:
- """
- Initialise the media stream with audio source and format for main_channel().
-
- Args:
- main_channel_source: Audio source generator for the main channel.
- main_channel_format: Audio format for the main channel (includes codec).
- player_instance: The ResonatePlayer instance for accessing mass and streams.
- internal_format: Internal processing format (float32 for headroom).
- output_format: Output PCM format (16-bit for player output).
- """
- super().__init__(
- main_channel_source=main_channel_source,
- main_channel_format=main_channel_format,
- )
- self.player_instance = player_instance
- self.internal_format = internal_format
- self.output_format = output_format
-
- async def player_channel(
- self,
- player_id: str,
- preferred_format: ResonateAudioFormat | None = None,
- position_us: int = 0,
- ) -> tuple[AsyncGenerator[bytes, None], ResonateAudioFormat, int] | None:
- """
- Get a player-specific audio stream with per-player DSP.
-
- Args:
- player_id: Identifier for the player requesting the stream.
- preferred_format: The player's preferred native format for the stream.
- The implementation may return a different format; the library
- will handle any necessary conversion.
- position_us: Position in microseconds relative to the main_stream start.
- Used for late-joining players to sync with the main stream.
-
- Returns:
- A tuple of (audio generator, audio format, actual position in microseconds)
- or None if unavailable. If None, the main_stream is used as fallback.
- """
- mass = self.player_instance.mass
- multi_client_stream = self.player_instance.timed_client_stream
- assert multi_client_stream is not None
-
- dsp = mass.config.get_player_dsp_config(player_id)
- if not dsp.enabled:
- # DSP is disabled for this player, use main_stream
- return None
-
- # Get per-player DSP filter parameters
- # Convert from internal format to output format
- filter_params = get_player_filter_params(
- mass, player_id, self.internal_format, self.output_format
- )
-
- # Get the stream with position (in seconds)
- stream_gen, actual_position = await multi_client_stream.get_stream(
- output_format=self.output_format,
- filter_params=filter_params,
- )
-
- # Convert position from seconds to microseconds for aioresonate API
- actual_position_us = int(actual_position * 1_000_000)
-
- # Return actual position in microseconds relative to main_stream start
- self.player_instance.logger.debug(
- "Providing channel stream for player %s at position %d us",
- player_id,
- actual_position_us,
- )
- return (
- stream_gen,
- ResonateAudioFormat(
- sample_rate=self.output_format.sample_rate,
- bit_depth=self.output_format.bit_depth,
- channels=self.output_format.channels,
- codec=self._main_channel_format.codec,
- ),
- actual_position_us,
- )
-
-
-class ResonatePlayer(Player):
- """A resonate audio player in Music Assistant."""
-
- api: ResonateClient
- unsub_event_cb: Callable[[], None]
- unsub_group_event_cb: Callable[[], None]
- last_sent_artwork_url: str | None = None
- _playback_task: asyncio.Task[None] | None = None
- timed_client_stream: TimedClientStream | None = None
-
- def __init__(self, provider: ResonateProvider, player_id: str) -> None:
- """Initialize the Player."""
- super().__init__(provider, player_id)
- resonate_client = provider.server_api.get_client(player_id)
- assert resonate_client is not None
- self.api = resonate_client
- self.api.disconnect_behaviour = DisconnectBehaviour.STOP
- self.unsub_event_cb = resonate_client.add_event_listener(self.event_cb)
- self.unsub_group_event_cb = resonate_client.group.add_event_listener(self.group_event_cb)
-
- self.logger = self.provider.logger.getChild(player_id)
- # init some static variables
- self._attr_name = resonate_client.name
- self._attr_type = PlayerType.PLAYER
- self._attr_supported_features = {
- PlayerFeature.SET_MEMBERS,
- PlayerFeature.MULTI_DEVICE_DSP,
- }
- self._attr_can_group_with = {provider.lookup_key}
- self._attr_power_control = PLAYER_CONTROL_NONE
- self._attr_device_info = DeviceInfo()
- if player_client := resonate_client.player:
- self._attr_volume_level = player_client.volume
- self._attr_volume_muted = player_client.muted
- self._attr_available = True
- self._on_unload_callbacks.append(
- self.mass.subscribe(
- self._on_queue_update,
- (EventType.QUEUE_UPDATED),
- )
- )
-
- async def event_cb(self, event: ClientEvent) -> None:
- """Event callback registered to the resonate server."""
- self.logger.debug("Received PlayerEvent: %s", event)
- match event:
- case VolumeChangedEvent(volume=volume, muted=muted):
- self._attr_volume_level = volume
- self._attr_volume_muted = muted
- self.update_state()
- case ClientGroupChangedEvent(new_group=new_group):
- self.unsub_group_event_cb()
- self.unsub_group_event_cb = new_group.add_event_listener(self.group_event_cb)
- # Sync playback state from the new group
- match new_group.state:
- case PlaybackStateType.PLAYING:
- self._attr_playback_state = PlaybackState.PLAYING
- case PlaybackStateType.PAUSED:
- self._attr_playback_state = PlaybackState.PAUSED
- case PlaybackStateType.STOPPED:
- self._attr_playback_state = PlaybackState.IDLE
- self.update_state()
-
- async def group_event_cb(self, event: GroupEvent) -> None:
- """Event callback registered to the resonate group this player belongs to."""
- if self.synced_to is not None:
- # Only handle group events as the leader
- return
- self.logger.debug("Received GroupEvent: %s", event)
-
- match event:
- case GroupCommandEvent(command=command, volume=volume, mute=mute):
- self.logger.debug("Group command received: %s", command)
- match command:
- case MediaCommand.PLAY:
- await self.mass.players.cmd_play(self.player_id)
- case MediaCommand.PAUSE:
- await self.mass.players.cmd_pause(self.player_id)
- case MediaCommand.STOP:
- await self.mass.players.cmd_stop(self.player_id)
- case MediaCommand.NEXT:
- await self.mass.players.cmd_next_track(self.player_id)
- case MediaCommand.PREVIOUS:
- await self.mass.players.cmd_previous_track(self.player_id)
- case MediaCommand.SEEK:
- raise NotImplementedError("TODO: not supported by spec yet")
- case MediaCommand.VOLUME:
- assert volume is not None
- await self.mass.players.cmd_group_volume(self.player_id, volume)
- case MediaCommand.MUTE:
- assert mute is not None
- for member in self.mass.players.iter_group_members(
- self, active_only=True, exclude_self=True
- ):
- await member.volume_mute(mute)
- case GroupStateChangedEvent(state=state):
- self.logger.debug("Group state changed to: %s", state)
- match state:
- case PlaybackStateType.PLAYING:
- self._attr_playback_state = PlaybackState.PLAYING
- case PlaybackStateType.PAUSED:
- self._attr_playback_state = PlaybackState.PAUSED
- case PlaybackStateType.STOPPED:
- self._attr_playback_state = PlaybackState.IDLE
- self._attr_elapsed_time = 0
- self._attr_elapsed_time_last_updated = time.time()
- self.update_state()
- case GroupMemberAddedEvent(client_id=client_id):
- self.logger.debug("Group member added: %s", client_id)
- if client_id not in self._attr_group_members:
- self._attr_group_members.append(client_id)
- self.update_state()
- case GroupMemberRemovedEvent(client_id=client_id):
- self.logger.debug("Group member removed: %s", client_id)
- if client_id in self._attr_group_members:
- self._attr_group_members.remove(client_id)
- self.update_state()
- case GroupDeletedEvent():
- pass
-
- async def volume_set(self, volume_level: int) -> None:
- """Handle VOLUME_SET command on the player."""
- if player_client := self.api.player:
- player_client.set_volume(volume_level)
-
- async def volume_mute(self, muted: bool) -> None:
- """Handle VOLUME MUTE command on the player."""
- if player_client := self.api.player:
- if muted:
- player_client.mute()
- else:
- player_client.unmute()
-
- async def stop(self) -> None:
- """Stop command."""
- self.logger.debug("Received STOP command on player %s", self.display_name)
- # We don't care if we stopped the stream or it was already stopped
- await self.api.group.stop()
- # Clear the playback task reference (group.stop() handles stopping the stream)
- self._playback_task = None
- self._attr_current_media = None
- self.update_state()
-
- async def play_media(self, media: PlayerMedia) -> None:
- """Play media command."""
- self.logger.debug(
- "Received PLAY_MEDIA command on player %s with uri %s", self.display_name, media.uri
- )
-
- # Update player state optimistically
- self._attr_current_media = media
- self._attr_elapsed_time = 0
- self._attr_elapsed_time_last_updated = time.time()
- # playback_state will be set by the group state change event
-
- # Stop previous stream in case we were already playing something
- await self.api.group.stop()
- # Run playback in background task to immediately return
- self._playback_task = asyncio.create_task(self._run_playback(media))
- self.update_state()
-
- async def _run_playback(self, media: PlayerMedia) -> None:
- """Run the actual playback in a background task."""
- try:
- pcm_format = AudioFormat(
- content_type=ContentType.PCM_S16LE,
- sample_rate=48000,
- bit_depth=16,
- channels=2,
- )
- flow_pcm_format = AudioFormat(
- content_type=INTERNAL_PCM_FORMAT.content_type,
- sample_rate=pcm_format.sample_rate,
- bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
- channels=pcm_format.channels,
- )
-
- output_codec = cast("str", self.config.get_value(CONF_OUTPUT_CODEC, "pcm"))
-
- # Convert string codec to AudioCodec enum
- audio_codec = AudioCodec(output_codec)
-
- # Get clean audio source in flow format (high quality internal format)
- # Format conversion and per-player DSP will be applied via player_channel
- audio_source = self.mass.streams.get_stream(media, flow_pcm_format)
-
- # Create TimedClientStream to wrap the clean audio source
- # This distributes the audio to multiple subscribers without DSP
- self.timed_client_stream = TimedClientStream(
- audio_source=audio_source,
- audio_format=flow_pcm_format,
- )
-
- # Setup the main channel subscription
- # aioresonate only really supports 16-bit for now TODO: upgrade later to 32-bit
- main_channel_gen, main_position = await self.timed_client_stream.get_stream(
- output_format=pcm_format,
- filter_params=None, # TODO: this should probably still include the safety limiter
- )
- assert main_position == 0.0 # first subscriber, should be zero
- media_stream = MusicAssistantMediaStream(
- main_channel_source=main_channel_gen,
- main_channel_format=ResonateAudioFormat(
- sample_rate=pcm_format.sample_rate,
- bit_depth=pcm_format.bit_depth,
- channels=pcm_format.channels,
- codec=audio_codec,
- ),
- player_instance=self,
- internal_format=flow_pcm_format,
- output_format=pcm_format,
- )
-
- stop_time = await self.api.group.play_media(media_stream)
- await self.api.group.stop(stop_time)
- except asyncio.CancelledError:
- self.logger.debug("Playback cancelled for player %s", self.display_name)
- raise
- except Exception:
- self.logger.exception("Error during playback for player %s", self.display_name)
- raise
- finally:
- self.timed_client_stream = None
-
- async def set_members(
- self,
- player_ids_to_add: list[str] | None = None,
- player_ids_to_remove: list[str] | None = None,
- ) -> None:
- """Handle SET_MEMBERS command on the player."""
- self.logger.debug(
- "set_members called: adding %s, removing %s", player_ids_to_add, player_ids_to_remove
- )
- for player_id in player_ids_to_remove or []:
- player = self.mass.players.get(player_id, True)
- player = cast("ResonatePlayer", player) # For type checking
- await self.api.group.remove_client(player.api)
- player.api.disconnect_behaviour = DisconnectBehaviour.STOP
- for player_id in player_ids_to_add or []:
- player = self.mass.players.get(player_id, True)
- player = cast("ResonatePlayer", player) # For type checking
- player.api.disconnect_behaviour = DisconnectBehaviour.UNGROUP
- await self.api.group.add_client(player.api)
- # self.group_members will be updated by the group event callback
-
- async def _on_queue_update(self, event: MassEvent) -> None:
- """Extract and send current media metadata to resonate players on queue updates."""
- queue = self.mass.player_queues.get_active_queue(self.player_id)
- if not queue or not queue.current_item:
- return
-
- current_item = queue.current_item
-
- title = current_item.name
- artist = None
- album_artist = None
- album = None
- track = None
- artwork_url = None
- year = None
-
- if (streamdetails := current_item.streamdetails) and streamdetails.stream_title:
- # stream title/metadata from radio/live stream
- if " - " in streamdetails.stream_title:
- artist, title = streamdetails.stream_title.split(" - ", 1)
- else:
- title = streamdetails.stream_title
- artist = ""
- # set album to radio station name
- album = current_item.name
- elif media_item := current_item.media_item:
- title = media_item.name
- if artist_str := getattr(media_item, "artist_str", None):
- artist = artist_str
- if _album := getattr(media_item, "album", None):
- album = _album.name
- year = getattr(_album, "year", None)
- album_artist = getattr(_album, "artist_str", None)
- if _track_number := getattr(media_item, "track_number", None):
- track = _track_number
-
- if current_item.image is not None:
- artwork_url = self.mass.metadata.get_image_url(current_item.image)
-
- if artwork_url != self.last_sent_artwork_url:
- # Image changed, resend the artwork
- self.last_sent_artwork_url = artwork_url
- if artwork_url is not None and current_item.media_item is not None:
- image_data = await self.mass.metadata.get_image_data_for_item(
- current_item.media_item
- )
- if image_data is not None:
- image = await asyncio.to_thread(Image.open, BytesIO(image_data))
- await self.api.group.set_media_art(image)
- # TODO: null media art if not set?
-
- track_duration = current_item.duration
-
- repeat = ResonateRepeatMode.OFF
- if queue.repeat_mode == RepeatMode.ALL:
- repeat = ResonateRepeatMode.ALL
- elif queue.repeat_mode == RepeatMode.ONE:
- repeat = ResonateRepeatMode.ONE
-
- shuffle = queue.shuffle_enabled
-
- metadata = Metadata(
- title=title,
- artist=artist,
- album_artist=album_artist,
- album=album,
- artwork_url=artwork_url,
- year=year,
- track=track,
- track_duration=track_duration,
- playback_speed=1,
- repeat=repeat,
- shuffle=shuffle,
- )
-
- # Send metadata to the group
- self.api.group.set_metadata(metadata)
-
- async def get_config_entries(
- self,
- action: str | None = None,
- values: dict[str, ConfigValueType] | None = None,
- ) -> list[ConfigEntry]:
- """Return all (provider/player specific) Config Entries for the player."""
- default_entries = await super().get_config_entries(action=action, values=values)
- return [
- *default_entries,
- CONF_ENTRY_FLOW_MODE_ENFORCED,
- ConfigEntry.from_dict(
- {
- **CONF_ENTRY_OUTPUT_CODEC.to_dict(),
- "default_value": "pcm",
- "options": [
- {"title": "PCM (lossless, uncompressed)", "value": "pcm"},
- {"title": "FLAC (lossless, compressed)", "value": "flac"},
- {"title": "OPUS (lossy)", "value": "opus"},
- ],
- }
- ),
- ]
-
- async def on_unload(self) -> None:
- """Handle logic when the player is unloaded from the Player controller."""
- await super().on_unload()
- self.unsub_event_cb()
- self.unsub_group_event_cb()
- await self.api.disconnect()
+++ /dev/null
-"""Player Provider for Resonate."""
-
-from __future__ import annotations
-
-from collections.abc import Callable
-from typing import TYPE_CHECKING, cast
-
-from aioresonate.server import ClientAddedEvent, ClientRemovedEvent, ResonateEvent, ResonateServer
-from music_assistant_models.enums import ProviderFeature
-
-from music_assistant.mass import MusicAssistant
-from music_assistant.models.player_provider import PlayerProvider
-from music_assistant.providers.resonate.player import ResonatePlayer
-
-if TYPE_CHECKING:
- from music_assistant_models.config_entries import ProviderConfig
- from music_assistant_models.provider import ProviderManifest
-
-
-class ResonateProvider(PlayerProvider):
- """Player Provider for Resonate."""
-
- server_api: ResonateServer
- unregister_cbs: list[Callable[[], None]]
-
- def __init__(
- self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
- ) -> None:
- """Initialize a new Resonate player provider."""
- super().__init__(mass, manifest, config)
- self.server_api = ResonateServer(
- self.mass.loop, mass.server_id, "Music Assistant", self.mass.http_session
- )
- self.unregister_cbs = [
- self.server_api.add_event_listener(self.event_cb),
- # For the web player
- self.mass.webserver.register_dynamic_route(
- "/resonate", self.server_api.on_client_connect
- ),
- ]
-
- async def event_cb(self, event: ResonateEvent) -> None:
- """Event callback registered to the resonate server."""
- self.logger.debug("Received ResonateEvent: %s", event)
- match event:
- case ClientAddedEvent(client_id):
- player = ResonatePlayer(self, client_id)
- self.logger.debug("Client %s connected", client_id)
- await self.mass.players.register(player)
- case ClientRemovedEvent(client_id):
- self.logger.debug("Client %s disconnected", client_id)
- await self.mass.players.unregister(client_id)
- case _:
- self.logger.error("Unknown resonate event: %s", event)
-
- @property
- def supported_features(self) -> set[ProviderFeature]:
- """Return the features supported by this Provider."""
- return {
- ProviderFeature.SYNC_PLAYERS,
- }
-
- async def loaded_in_mass(self) -> None:
- """Call after the provider has been loaded."""
- await super().loaded_in_mass()
- # Start server for handling incoming Resonate connections from clients
- # and mDNS discovery of new clients
- await self.server_api.start_server(
- port=8927,
- host=self.mass.streams.bind_ip,
- advertise_host=cast("str", self.mass.streams.publish_ip),
- )
-
- async def unload(self, is_removed: bool = False) -> None:
- """
- Handle unload/close of the provider.
-
- Called when provider is deregistered (e.g. MA exiting or config reloading).
- is_removed will be set to True when the provider is removed from the configuration.
- """
- # Stop the Resonate server
- await self.server_api.close()
-
- for cb in self.unregister_cbs:
- cb()
- self.unregister_cbs = []
- for player in self.players:
- self.logger.debug("Unloading player %s", player.name)
- await self.mass.players.unregister(player.player_id)
+++ /dev/null
-"""
-Timestamped multi-client audio stream for position-aware playback.
-
-This module provides a multi-client streaming implementation optimized for
-aioresonate's synchronized multi-room audio playback. Each audio chunk is
-timestamped, allowing late-joining players to start at the correct position
-for synchronized playback across multiple devices.
-"""
-
-import asyncio
-import logging
-from collections import deque
-from collections.abc import AsyncGenerator
-from contextlib import suppress
-from uuid import UUID, uuid4
-
-from music_assistant_models.media_items import AudioFormat
-
-from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
-
-LOGGER = logging.getLogger(__name__)
-
-# Minimum/target buffer retention time in seconds
-# This 10s buffer is currently required since:
-# - aioresonate currently uses a fixed 5s buffer to allow up to ~4s of network interruption
-# - ~2s allows for ffmpeg processing time and some margin
-# - ~3s are currently needed internally by aioresonate for initial buffering
-MIN_BUFFER_DURATION = 10.0
-# Maximum buffer duration before raising an error (safety mechanism)
-MAX_BUFFER_DURATION = MIN_BUFFER_DURATION + 5.0
-
-
-class TimedClientStream:
- """Multi-client audio stream with timestamped chunks for synchronized playback."""
-
- audio_source: AsyncGenerator[bytes, None]
- """The source audio stream to read from."""
- audio_format: AudioFormat
- """The audio format of the source stream."""
- chunk_buffer: deque[tuple[bytes, float]]
- """Buffer storing chunks with their timestamps in seconds (chunk_data, timestamp_seconds)."""
- subscriber_positions: dict[UUID, int]
- """Subscriber positions: maps subscriber_id to position (index into chunk_buffer)."""
- buffer_lock: asyncio.Lock
- """Lock for buffer and shared state access."""
- source_read_lock: asyncio.Lock
- """Lock to serialize audio source reads."""
- stream_ended: bool = False
- """Track if stream has ended."""
- current_position: float = 0.0
- """Current position in seconds (from stream start)."""
-
- def __init__(
- self,
- audio_source: AsyncGenerator[bytes, None],
- audio_format: AudioFormat,
- ) -> None:
- """Initialize TimedClientStream."""
- self.audio_source = audio_source
- self.audio_format = audio_format
- self.chunk_buffer = deque()
- self.subscriber_positions = {}
- self.buffer_lock = asyncio.Lock()
- self.source_read_lock = asyncio.Lock()
-
- def _get_bytes_per_second(self) -> int:
- """Get bytes per second for the audio format."""
- return (
- self.audio_format.sample_rate
- * self.audio_format.channels
- * (self.audio_format.bit_depth // 8)
- )
-
- def _bytes_to_seconds(self, num_bytes: int) -> float:
- """Convert bytes to seconds based on audio format."""
- bytes_per_second = self._get_bytes_per_second()
- if bytes_per_second == 0:
- return 0.0
- return num_bytes / bytes_per_second
-
- def _get_buffer_duration(self) -> float:
- """Calculate total duration of buffered chunks in seconds."""
- if not self.chunk_buffer:
- return 0.0
- # Duration is from first chunk timestamp to current position
- first_chunk_timestamp = self.chunk_buffer[0][1]
- return self.current_position - first_chunk_timestamp
-
- def _cleanup_old_chunks(self) -> None:
- """Remove old chunks when all subscribers read them and min duration exceeded."""
- # Find the oldest position still needed by any subscriber
- if self.subscriber_positions:
- min_position = min(self.subscriber_positions.values())
- else:
- min_position = len(self.chunk_buffer)
-
- # Calculate target oldest timestamp
- # This ensures buffer contains at least MIN_BUFFER_DURATION seconds of recent data
- target_oldest = self.current_position - MIN_BUFFER_DURATION
-
- # Remove old chunks that meet both conditions:
- # 1. Before min_position (no subscriber needs them)
- # 2. Older than target_oldest (outside minimum retention window)
- chunks_removed = 0
- while chunks_removed < min_position and self.chunk_buffer:
- _chunk_bytes, chunk_timestamp = self.chunk_buffer[0]
- if chunk_timestamp < target_oldest:
- self.chunk_buffer.popleft()
- chunks_removed += 1
- else:
- # Stop when we reach chunks we want to keep
- break
-
- # Adjust all subscriber positions to account for removed chunks
- for sub_id in self.subscriber_positions:
- self.subscriber_positions[sub_id] -= chunks_removed
-
- async def _read_chunk_from_source(self) -> None:
- """Read next chunk from audio source and add to buffer."""
- try:
- chunk = await anext(self.audio_source)
- async with self.buffer_lock:
- # Calculate timestamp for this chunk
- chunk_timestamp = self.current_position
- chunk_duration = self._bytes_to_seconds(len(chunk))
-
- # Append chunk with its timestamp
- self.chunk_buffer.append((chunk, chunk_timestamp))
-
- # Update current position
- self.current_position += chunk_duration
-
- # Safety check: ensure buffer doesn't grow unbounded
- if self._get_buffer_duration() > MAX_BUFFER_DURATION:
- msg = f"Buffer exceeded maximum duration ({MAX_BUFFER_DURATION}s)"
- raise RuntimeError(msg)
- except StopAsyncIteration:
- # Source exhausted, add EOF marker
- async with self.buffer_lock:
- self.chunk_buffer.append((b"", self.current_position))
- self.stream_ended = True
- except Exception:
- # Source errored or was canceled, mark stream as ended
- async with self.buffer_lock:
- self.stream_ended = True
- raise
-
- async def _check_buffer(self, subscriber_id: UUID) -> bool | None:
- """
- Check if buffer has grown or stream ended.
-
- REQUIRES: Caller must hold self.source_read_lock before calling.
-
- Returns:
- True if should continue reading loop (chunk found in buffer),
- False if should break (stream ended),
- None if should proceed to read from source.
- """
- async with self.buffer_lock:
- position = self.subscriber_positions[subscriber_id]
- if position < len(self.chunk_buffer):
- # Another subscriber already read the chunk
- return True
- if self.stream_ended:
- # Stream ended while waiting for source lock
- return False
- return None # Continue to read from source
-
- async def _get_chunk_from_buffer(self, subscriber_id: UUID) -> bytes | None:
- """
- Get next chunk from buffer for subscriber.
-
- Returns:
- Chunk bytes if available, None if no chunk available, or empty bytes for EOF.
- """
- async with self.buffer_lock:
- position = self.subscriber_positions[subscriber_id]
-
- # Check if we have a chunk at this position
- if position < len(self.chunk_buffer):
- # Chunk available in buffer
- chunk_data, _ = self.chunk_buffer[position]
-
- # Move to next position
- self.subscriber_positions[subscriber_id] = position + 1
-
- # Cleanup old chunks that no one needs
- self._cleanup_old_chunks()
- return chunk_data
- if self.stream_ended:
- # Stream ended and we've read all buffered chunks
- return b""
- return None
-
- async def _cleanup_subscriber(self, subscriber_id: UUID) -> None:
- """Clean up subscriber and close stream if no subscribers left."""
- async with self.buffer_lock:
- if subscriber_id in self.subscriber_positions:
- del self.subscriber_positions[subscriber_id]
-
- # If no subscribers left, close the stream
- if not self.subscriber_positions and not self.stream_ended:
- self.stream_ended = True
- # Close the audio source generator to prevent resource leak
- with suppress(Exception):
- await self.audio_source.aclose()
-
- async def get_stream(
- self,
- output_format: AudioFormat,
- filter_params: list[str] | None = None,
- ) -> tuple[AsyncGenerator[bytes, None], float]:
- """
- Get (client specific encoded) ffmpeg stream.
-
- Returns:
- A tuple of (audio generator, actual position in seconds)
- """
- audio_gen, position = await self.subscribe_raw()
-
- async def _stream_with_ffmpeg() -> AsyncGenerator[bytes, None]:
- try:
- async for chunk in get_ffmpeg_stream(
- audio_input=audio_gen,
- input_format=self.audio_format,
- output_format=output_format,
- filter_params=filter_params,
- ):
- yield chunk
- finally:
- # Ensure audio_gen cleanup runs immediately
- with suppress(Exception):
- await audio_gen.aclose()
-
- return _stream_with_ffmpeg(), position
-
- async def _generate(self, subscriber_id: UUID) -> AsyncGenerator[bytes, None]:
- """
- Generate audio chunks for a subscriber.
-
- Yields chunks from the buffer until the stream ends, reading from the source
- as needed. Automatically cleans up the subscriber on exit.
- """
- try:
- # Position already set above atomically with timestamp capture
- while True:
- # Try to get chunk from buffer
- chunk_bytes = await self._get_chunk_from_buffer(subscriber_id)
-
- # Release lock before yielding to avoid deadlock
- if chunk_bytes is not None:
- if chunk_bytes == b"":
- # End of stream marker
- break
- yield chunk_bytes
- else:
- # No chunk available, need to read from source
- # Use source_read_lock to ensure only one subscriber reads at a time
- async with self.source_read_lock:
- # Check again if buffer has grown or stream ended while waiting
- check_result = await self._check_buffer(subscriber_id)
- if check_result is True:
- # Another subscriber already read the chunk
- continue
- if check_result is False:
- # Stream ended while waiting for source lock
- break
-
- # Read next chunk from source (check_result is None)
- # Note: This may block if the audio_source does synchronous I/O
- await self._read_chunk_from_source()
-
- finally:
- await self._cleanup_subscriber(subscriber_id)
-
- async def subscribe_raw(self) -> tuple[AsyncGenerator[bytes, None], float]:
- """
- Subscribe to the raw/unaltered audio stream.
-
- Returns:
- A tuple of (audio generator, actual position in seconds).
- The position indicates where in the stream the first chunk will be from.
-
- Note:
- Callers must properly consume or cancel the returned generator to prevent
- resource leaks.
- """
- subscriber_id = uuid4()
-
- # Atomically capture starting position and register subscriber while holding lock
- async with self.buffer_lock:
- if self.chunk_buffer:
- _, starting_position = self.chunk_buffer[0]
- # Log buffer time range for debugging
- newest_ts = self.chunk_buffer[-1][1]
- oldest_relative = starting_position - self.current_position
- newest_relative = newest_ts - self.current_position
- LOGGER.debug(
- "New subscriber joining: buffer contains %.3fs (from %.3fs to %.3fs, "
- "current_position=%.3fs)",
- newest_ts - starting_position,
- oldest_relative,
- newest_relative,
- self.current_position,
- )
- else:
- starting_position = self.current_position
- LOGGER.debug(
- "New subscriber joining: buffer is empty, starting at current_position=%.3fs",
- self.current_position,
- )
- # Register subscriber at position 0 (start of buffer)
- self.subscriber_positions[subscriber_id] = 0
-
- # Return generator and starting position in seconds
- return self._generate(subscriber_id), starting_position
--- /dev/null
+"""
+Player Provider for the Sendspin Audio Protocol.
+
+https://github.com/Sendspin-Protocol/spec
+"""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from .provider import SendspinProvider
+
+if TYPE_CHECKING:
+ from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
+ from music_assistant_models.provider import ProviderManifest
+
+ from music_assistant.mass import MusicAssistant
+ from music_assistant.models import ProviderInstanceType
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ return SendspinProvider(mass, manifest, config)
+
+
+async def get_config_entries(
+ mass: MusicAssistant,
+ instance_id: str | None = None,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to setup this provider.
+
+ instance_id: id of an existing provider instance (None if new instance setup).
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ # ruff: noqa: ARG001
+ return ()
--- /dev/null
+<svg xmlns="http://www.w3.org/2000/svg" width="45" height="45" viewBox="0 0 45 45">
+ <g style="stroke-width:.311784">
+ <g style="stroke-width:.311784">
+ <path d="M146.952 64.268h-1a3 3 0 0 1-3 3 3 3 0 0 1-3-3h-1a4 4 0 0 0 4 4 4 4 0 0 0 4-4" style="fill:#b125e3;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(-442.635 -183.63)scale(3.20734)"/>
+ <path d="M144.952 64.268h-.86a1.14 1.14 0 0 1-1.14 1.139 1.14 1.14 0 0 1-1.139-1.14h-.861a2 2 0 0 0 2 2 2 2 0 0 0 2-2" style="fill:#000;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(-442.635 -183.63)scale(3.20734)"/>
+ <path d="M143.574 62.367a4 4 0 0 1 .603-.837 3 3 0 0 0-1.224-.262 3 3 0 0 0-3 3h1a2 2 0 0 1 2-2 2 2 0 0 1 .62.099" style="fill:#000;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(-442.635 -183.63)scale(3.20734)"/>
+ <path d="M142.953 60.268v-.946a4.946 4.946 0 0 0-4.946 4.946h.945a4 4 0 0 1 4-4" style="fill:#b125e3;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(-442.635 -183.63)scale(3.20734)"/>
+ </g>
+ <g style="stroke-width:.311784">
+ <path d="M146.952 64.268h-1a3 3 0 0 1-3 3 3 3 0 0 1-3-3h-1a4 4 0 0 0 4 4 4 4 0 0 0 4-4" style="fill:#b125e3;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(487.635 228.63)scale(-3.20734)"/>
+ <path d="M144.952 64.268h-.86a1.14 1.14 0 0 1-1.14 1.139 1.14 1.14 0 0 1-1.139-1.14h-.861a2 2 0 0 0 2 2 2 2 0 0 0 2-2" style="fill:#000;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(487.635 228.63)scale(-3.20734)"/>
+ <path d="M143.574 62.367a4 4 0 0 1 .603-.837 3 3 0 0 0-1.224-.262 3 3 0 0 0-3 3h1a2 2 0 0 1 2-2 2 2 0 0 1 .62.099" style="fill:#000;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(487.635 228.63)scale(-3.20734)"/>
+ <path d="M142.953 60.268v-.946a4.946 4.946 0 0 0-4.946 4.946h.945a4 4 0 0 1 4-4" style="fill:#b125e3;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(487.635 228.63)scale(-3.20734)"/>
+ </g>
+ </g>
+</svg>
--- /dev/null
+<svg xmlns="http://www.w3.org/2000/svg" width="45" height="45" viewBox="0 0 45 45">
+ <g style="stroke-width:.311784">
+ <g style="stroke-width:.311784">
+ <path d="M146.952 64.268h-1a3 3 0 0 1-3 3 3 3 0 0 1-3-3h-1a4 4 0 0 0 4 4 4 4 0 0 0 4-4" style="fill:#b125e3;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(-442.635 -183.63)scale(3.20734)"/>
+ <path d="M144.952 64.268h-.86a1.14 1.14 0 0 1-1.14 1.139 1.14 1.14 0 0 1-1.139-1.14h-.861a2 2 0 0 0 2 2 2 2 0 0 0 2-2" style="fill:#fff;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(-442.635 -183.63)scale(3.20734)"/>
+ <path d="M143.574 62.367a4 4 0 0 1 .603-.837 3 3 0 0 0-1.224-.262 3 3 0 0 0-3 3h1a2 2 0 0 1 2-2 2 2 0 0 1 .62.099" style="fill:#fff;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(-442.635 -183.63)scale(3.20734)"/>
+ <path d="M142.953 60.268v-.946a4.946 4.946 0 0 0-4.946 4.946h.945a4 4 0 0 1 4-4" style="fill:#b125e3;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(-442.635 -183.63)scale(3.20734)"/>
+ </g>
+ <g style="stroke-width:.311784">
+ <path d="M146.952 64.268h-1a3 3 0 0 1-3 3 3 3 0 0 1-3-3h-1a4 4 0 0 0 4 4 4 4 0 0 0 4-4" style="fill:#b125e3;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(487.635 228.63)scale(-3.20734)"/>
+ <path d="M144.952 64.268h-.86a1.14 1.14 0 0 1-1.14 1.139 1.14 1.14 0 0 1-1.139-1.14h-.861a2 2 0 0 0 2 2 2 2 0 0 0 2-2" style="fill:#fff;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(487.635 228.63)scale(-3.20734)"/>
+ <path d="M143.574 62.367a4 4 0 0 1 .603-.837 3 3 0 0 0-1.224-.262 3 3 0 0 0-3 3h1a2 2 0 0 1 2-2 2 2 0 0 1 .62.099" style="fill:#fff;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(487.635 228.63)scale(-3.20734)"/>
+ <path d="M142.953 60.268v-.946a4.946 4.946 0 0 0-4.946 4.946h.945a4 4 0 0 1 4-4" style="fill:#b125e3;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(487.635 228.63)scale(-3.20734)"/>
+ </g>
+ </g>
+</svg>
--- /dev/null
+<svg xmlns="http://www.w3.org/2000/svg" width="45" height="45" viewBox="0 0 45 45">
+ <g style="fill:#fff;stroke-width:.311784">
+ <g style="fill:#fff;stroke-width:.311784">
+ <path d="M146.952 64.268h-1a3 3 0 0 1-3 3 3 3 0 0 1-3-3h-1a4 4 0 0 0 4 4 4 4 0 0 0 4-4" style="fill:#fff;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(-442.635 -183.63)scale(3.20734)"/>
+ <path d="M144.952 64.268h-.86a1.14 1.14 0 0 1-1.14 1.139 1.14 1.14 0 0 1-1.139-1.14h-.861a2 2 0 0 0 2 2 2 2 0 0 0 2-2" style="fill:#fff;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(-442.635 -183.63)scale(3.20734)"/>
+ <path d="M143.574 62.367a4 4 0 0 1 .603-.837 3 3 0 0 0-1.224-.262 3 3 0 0 0-3 3h1a2 2 0 0 1 2-2 2 2 0 0 1 .62.099" style="fill:#fff;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(-442.635 -183.63)scale(3.20734)"/>
+ <path d="M142.953 60.268v-.946a4.946 4.946 0 0 0-4.946 4.946h.945a4 4 0 0 1 4-4" style="fill:#fff;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(-442.635 -183.63)scale(3.20734)"/>
+ </g>
+ <g style="fill:#fff;stroke-width:.311784">
+ <path d="M146.952 64.268h-1a3 3 0 0 1-3 3 3 3 0 0 1-3-3h-1a4 4 0 0 0 4 4 4 4 0 0 0 4-4" style="fill:#fff;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(487.635 228.63)scale(-3.20734)"/>
+ <path d="M144.952 64.268h-.86a1.14 1.14 0 0 1-1.14 1.139 1.14 1.14 0 0 1-1.139-1.14h-.861a2 2 0 0 0 2 2 2 2 0 0 0 2-2" style="fill:#fff;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(487.635 228.63)scale(-3.20734)"/>
+ <path d="M143.574 62.367a4 4 0 0 1 .603-.837 3 3 0 0 0-1.224-.262 3 3 0 0 0-3 3h1a2 2 0 0 1 2-2 2 2 0 0 1 .62.099" style="fill:#fff;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(487.635 228.63)scale(-3.20734)"/>
+ <path d="M142.953 60.268v-.946a4.946 4.946 0 0 0-4.946 4.946h.945a4 4 0 0 1 4-4" style="fill:#fff;fill-opacity:1;stroke-width:.216377;stroke-linecap:round;stroke-linejoin:round" transform="translate(487.635 228.63)scale(-3.20734)"/>
+ </g>
+ </g>
+</svg>
--- /dev/null
+{
+ "type": "player",
+ "domain": "sendspin",
+ "stage": "alpha",
+ "name": "Sendspin",
+ "description": "Sendspin is an audio playback, control and synchronization protocol developed by the Open Home Foundation and is the native playback protocol built into Music Assistant, used for playback to supported clients like the Music Assistant Web interface, supported (mobile) clients and supported hardware",
+ "codeowners": ["@music-assistant"],
+ "requirements": ["aiosendspin==1.0.0"],
+ "builtin": true
+}
--- /dev/null
+"""Sendspin Player implementation."""
+
+from __future__ import annotations
+
+import asyncio
+import time
+from collections.abc import AsyncGenerator, Callable
+from io import BytesIO
+from typing import TYPE_CHECKING, cast
+
+from aiosendspin.models import MediaCommand
+from aiosendspin.models.types import ArtworkSource, PlaybackStateType
+from aiosendspin.models.types import RepeatMode as SendspinRepeatMode
+from aiosendspin.server import AudioFormat as SendspinAudioFormat
+from aiosendspin.server import (
+ ClientEvent,
+ GroupCommandEvent,
+ GroupEvent,
+ GroupStateChangedEvent,
+ SendspinGroup,
+ VolumeChangedEvent,
+)
+from aiosendspin.server.client import DisconnectBehaviour
+from aiosendspin.server.events import ClientGroupChangedEvent
+from aiosendspin.server.group import (
+ GroupDeletedEvent,
+ GroupMemberAddedEvent,
+ GroupMemberRemovedEvent,
+)
+from aiosendspin.server.metadata import Metadata
+from aiosendspin.server.stream import AudioCodec, MediaStream
+from music_assistant_models.constants import PLAYER_CONTROL_NONE
+from music_assistant_models.enums import (
+ ContentType,
+ EventType,
+ ImageType,
+ PlaybackState,
+ PlayerFeature,
+ PlayerType,
+ RepeatMode,
+)
+from music_assistant_models.media_items import AudioFormat
+from music_assistant_models.player import DeviceInfo
+from PIL import Image
+
+from music_assistant.constants import (
+ CONF_ENTRY_FLOW_MODE_ENFORCED,
+ CONF_OUTPUT_CODEC,
+ INTERNAL_PCM_FORMAT,
+)
+from music_assistant.helpers.audio import get_player_filter_params
+from music_assistant.models.player import Player, PlayerMedia
+
+from .timed_client_stream import TimedClientStream
+
+# Supported group commands for Sendspin players
+SUPPORTED_GROUP_COMMANDS = [
+ MediaCommand.PLAY,
+ MediaCommand.PAUSE,
+ MediaCommand.STOP,
+ MediaCommand.NEXT,
+ MediaCommand.PREVIOUS,
+ MediaCommand.REPEAT_OFF,
+ MediaCommand.REPEAT_ONE,
+ MediaCommand.REPEAT_ALL,
+ MediaCommand.SHUFFLE,
+ MediaCommand.UNSHUFFLE,
+]
+
+if TYPE_CHECKING:
+ from aiosendspin.server.client import SendspinClient
+ from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
+ from music_assistant_models.event import MassEvent
+ from music_assistant_models.queue_item import QueueItem
+
+ from .provider import SendspinProvider
+
+
+class MusicAssistantMediaStream(MediaStream):
+ """MediaStream implementation for Music Assistant with per-player DSP support."""
+
+ player_instance: SendspinPlayer
+ internal_format: AudioFormat
+ output_format: AudioFormat
+
+ def __init__(
+ self,
+ *,
+ main_channel_source: AsyncGenerator[bytes, None],
+ main_channel_format: SendspinAudioFormat,
+ player_instance: SendspinPlayer,
+ internal_format: AudioFormat,
+ output_format: AudioFormat,
+ ) -> None:
+ """
+ Initialise the media stream with audio source and format for main_channel().
+
+ Args:
+ main_channel_source: Audio source generator for the main channel.
+ main_channel_format: Audio format for the main channel (includes codec).
+ player_instance: The SendspinPlayer instance for accessing mass and streams.
+ internal_format: Internal processing format (float32 for headroom).
+ output_format: Output PCM format (16-bit for player output).
+ """
+ super().__init__(
+ main_channel_source=main_channel_source,
+ main_channel_format=main_channel_format,
+ )
+ self.player_instance = player_instance
+ self.internal_format = internal_format
+ self.output_format = output_format
+
+ async def player_channel(
+ self,
+ player_id: str,
+ preferred_format: SendspinAudioFormat | None = None,
+ position_us: int = 0,
+ ) -> tuple[AsyncGenerator[bytes, None], SendspinAudioFormat, int] | None:
+ """
+ Get a player-specific audio stream with per-player DSP.
+
+ Args:
+ player_id: Identifier for the player requesting the stream.
+ preferred_format: The player's preferred native format for the stream.
+ The implementation may return a different format; the library
+ will handle any necessary conversion.
+ position_us: Position in microseconds relative to the main_stream start.
+ Used for late-joining players to sync with the main stream.
+
+ Returns:
+ A tuple of (audio generator, audio format, actual position in microseconds)
+ or None if unavailable. If None, the main_stream is used as fallback.
+ """
+ mass = self.player_instance.mass
+ multi_client_stream = self.player_instance.timed_client_stream
+ assert multi_client_stream is not None
+
+ dsp = mass.config.get_player_dsp_config(player_id)
+ if not dsp.enabled:
+ # DSP is disabled for this player, use main_stream
+ return None
+
+ # Get per-player DSP filter parameters
+ # Convert from internal format to output format
+ filter_params = get_player_filter_params(
+ mass, player_id, self.internal_format, self.output_format
+ )
+
+ # Get the stream with position (in seconds)
+ stream_gen, actual_position = await multi_client_stream.get_stream(
+ output_format=self.output_format,
+ filter_params=filter_params,
+ )
+
+ # Convert position from seconds to microseconds for aiosendspin API
+ actual_position_us = int(actual_position * 1_000_000)
+
+ # Return actual position in microseconds relative to main_stream start
+ self.player_instance.logger.debug(
+ "Providing channel stream for player %s at position %d us",
+ player_id,
+ actual_position_us,
+ )
+ return (
+ stream_gen,
+ SendspinAudioFormat(
+ sample_rate=self.output_format.sample_rate,
+ bit_depth=self.output_format.bit_depth,
+ channels=self.output_format.channels,
+ codec=self._main_channel_format.codec,
+ ),
+ actual_position_us,
+ )
+
+
+class SendspinPlayer(Player):
+ """A sendspin audio player in Music Assistant."""
+
+ api: SendspinClient
+ unsub_event_cb: Callable[[], None]
+ unsub_group_event_cb: Callable[[], None]
+ last_sent_artwork_url: str | None = None
+ last_sent_artist_artwork_url: str | None = None
+ _playback_task: asyncio.Task[None] | None = None
+ timed_client_stream: TimedClientStream | None = None
+
+ def __init__(self, provider: SendspinProvider, player_id: str) -> None:
+ """Initialize the Player."""
+ super().__init__(provider, player_id)
+ sendspin_client = provider.server_api.get_client(player_id)
+ assert sendspin_client is not None
+ self.api = sendspin_client
+ self.api.disconnect_behaviour = DisconnectBehaviour.STOP
+ self.unsub_event_cb = sendspin_client.add_event_listener(self.event_cb)
+ self.unsub_group_event_cb = sendspin_client.group.add_event_listener(self.group_event_cb)
+ sendspin_client.group.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
+
+ self.logger = self.provider.logger.getChild(player_id)
+ # init some static variables
+ self._attr_name = sendspin_client.name
+ self._attr_type = PlayerType.PLAYER
+ self._attr_supported_features = {
+ PlayerFeature.SET_MEMBERS,
+ PlayerFeature.MULTI_DEVICE_DSP,
+ PlayerFeature.VOLUME_SET,
+ PlayerFeature.VOLUME_MUTE,
+ }
+ self._attr_can_group_with = {provider.lookup_key}
+ self._attr_power_control = PLAYER_CONTROL_NONE
+ self._attr_device_info = DeviceInfo()
+ if player_client := sendspin_client.player:
+ self._attr_volume_level = player_client.volume
+ self._attr_volume_muted = player_client.muted
+ self._attr_available = True
+ self._on_unload_callbacks.append(
+ self.mass.subscribe(
+ self._on_queue_update,
+ (EventType.QUEUE_UPDATED),
+ )
+ )
+
+ async def event_cb(self, client: SendspinClient, event: ClientEvent) -> None:
+ """Event callback registered to the sendspin server."""
+ self.logger.debug("Received PlayerEvent: %s", event)
+ match event:
+ case VolumeChangedEvent(volume=volume, muted=muted):
+ self._attr_volume_level = volume
+ self._attr_volume_muted = muted
+ self.update_state()
+ case ClientGroupChangedEvent(new_group=new_group):
+ self.unsub_group_event_cb()
+ self.unsub_group_event_cb = new_group.add_event_listener(self.group_event_cb)
+ # Sync playback state from the new group
+ match new_group.state:
+ case PlaybackStateType.PLAYING:
+ self._attr_playback_state = PlaybackState.PLAYING
+ case PlaybackStateType.PAUSED:
+ self._attr_playback_state = PlaybackState.PAUSED
+ case PlaybackStateType.STOPPED:
+ self._attr_playback_state = PlaybackState.IDLE
+ # Update in case this is a newly created group
+ new_group.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
+ # GroupMemberAddedEvent or GroupMemberRemovedEvent will be fired before this
+ # so group members are already up to date at this point
+ if self.synced_to is None:
+ # We are the leader, stop on disconnect
+ self.api.disconnect_behaviour = DisconnectBehaviour.STOP
+ else:
+ self.api.disconnect_behaviour = DisconnectBehaviour.UNGROUP
+ self.update_state()
+
+ async def _handle_group_command(self, command: MediaCommand) -> None:
+ """Handle a group command from aiosendspin."""
+ queue = self.mass.player_queues.get_active_queue(self.player_id)
+ match command:
+ case MediaCommand.PLAY:
+ await self.mass.players.cmd_play(self.player_id)
+ case MediaCommand.PAUSE:
+ await self.mass.players.cmd_pause(self.player_id)
+ case MediaCommand.STOP:
+ await self.mass.players.cmd_stop(self.player_id)
+ case MediaCommand.NEXT:
+ await self.mass.players.cmd_next_track(self.player_id)
+ case MediaCommand.PREVIOUS:
+ await self.mass.players.cmd_previous_track(self.player_id)
+ case MediaCommand.REPEAT_OFF if queue:
+ self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.OFF)
+ case MediaCommand.REPEAT_ONE if queue:
+ self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ONE)
+ case MediaCommand.REPEAT_ALL if queue:
+ self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ALL)
+ case MediaCommand.SHUFFLE if queue:
+ self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=True)
+ case MediaCommand.UNSHUFFLE if queue:
+ self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=False)
+
+ async def group_event_cb(self, group: SendspinGroup, event: GroupEvent) -> None:
+ """Event callback registered to the sendspin group this player belongs to."""
+ if self.synced_to is not None:
+ # Only handle group events as the leader, except for GroupMemberRemovedEvent
+ if not isinstance(event, GroupMemberRemovedEvent):
+ return
+ self.logger.debug("Received GroupEvent: %s", event)
+
+ match event:
+ case GroupCommandEvent(command=command):
+ self.logger.debug("Group command received: %s", command)
+ await self._handle_group_command(command)
+ case GroupStateChangedEvent(state=state):
+ self.logger.debug("Group state changed to: %s", state)
+ match state:
+ case PlaybackStateType.PLAYING:
+ self._attr_playback_state = PlaybackState.PLAYING
+ case PlaybackStateType.PAUSED:
+ self._attr_playback_state = PlaybackState.PAUSED
+ case PlaybackStateType.STOPPED:
+ self._attr_playback_state = PlaybackState.IDLE
+ self._attr_elapsed_time = 0
+ self._attr_elapsed_time_last_updated = time.time()
+ self.update_state()
+ case GroupMemberAddedEvent(client_id=client_id):
+ self.logger.debug("Group member added: %s", client_id)
+ if client_id not in self._attr_group_members:
+ self._attr_group_members.append(client_id)
+ self.update_state()
+ case GroupMemberRemovedEvent(client_id=client_id):
+ self.logger.debug("Group member removed: %s", client_id)
+ if client_id == self.player_id:
+ if len(self._attr_group_members) > 0:
+ # We were just removed as a leader:
+ # 1. stop playback on the old group
+ await group.stop()
+ # 2. clear our members (since we are now alone)
+ group_members = [
+ member for member in self._attr_group_members if member != client_id
+ ]
+ self._attr_group_members = []
+ # 3. assign new leader if there are members left
+ if len(group_members) > 0 and (
+ new_leader := self.mass.players.get(group_members[0])
+ ):
+ new_leader._attr_group_members = group_members[1:]
+ new_leader.update_state()
+ self.update_state()
+ elif client_id in self._attr_group_members:
+ # Someone else left our group
+ self._attr_group_members.remove(client_id)
+ self.update_state()
+ case GroupDeletedEvent():
+ pass
+
+ async def volume_set(self, volume_level: int) -> None:
+ """Handle VOLUME_SET command on the player."""
+ if player_client := self.api.player:
+ player_client.set_volume(volume_level)
+
+ async def volume_mute(self, muted: bool) -> None:
+ """Handle VOLUME MUTE command on the player."""
+ if player_client := self.api.player:
+ if muted:
+ player_client.mute()
+ else:
+ player_client.unmute()
+
+ async def stop(self) -> None:
+ """Stop command."""
+ self.logger.debug("Received STOP command on player %s", self.display_name)
+ # We don't care if we stopped the stream or it was already stopped
+ await self.api.group.stop()
+ # Clear the playback task reference (group.stop() handles stopping the stream)
+ self._playback_task = None
+ self._attr_current_media = None
+ self.update_state()
+
+ async def play_media(self, media: PlayerMedia) -> None:
+ """Play media command."""
+ self.logger.debug(
+ "Received PLAY_MEDIA command on player %s with uri %s", self.display_name, media.uri
+ )
+
+ # Update player state optimistically
+ self._attr_current_media = media
+ self._attr_elapsed_time = 0
+ self._attr_elapsed_time_last_updated = time.time()
+ # playback_state will be set by the group state change event
+
+ # Stop previous stream in case we were already playing something
+ await self.api.group.stop()
+ # Run playback in background task to immediately return
+ self._playback_task = asyncio.create_task(self._run_playback(media))
+ self.update_state()
+
+ async def _run_playback(self, media: PlayerMedia) -> None:
+ """Run the actual playback in a background task."""
+ try:
+ pcm_format = AudioFormat(
+ content_type=ContentType.PCM_S16LE,
+ sample_rate=48000,
+ bit_depth=16,
+ channels=2,
+ )
+ flow_pcm_format = AudioFormat(
+ content_type=INTERNAL_PCM_FORMAT.content_type,
+ sample_rate=pcm_format.sample_rate,
+ bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
+ channels=pcm_format.channels,
+ )
+
+ output_codec = cast("str", self.config.get_value(CONF_OUTPUT_CODEC, "pcm"))
+
+ # Convert string codec to AudioCodec enum
+ audio_codec = AudioCodec(output_codec)
+
+ # Get clean audio source in flow format (high quality internal format)
+ # Format conversion and per-player DSP will be applied via player_channel
+ audio_source = self.mass.streams.get_stream(media, flow_pcm_format)
+
+ # Create TimedClientStream to wrap the clean audio source
+ # This distributes the audio to multiple subscribers without DSP
+ self.timed_client_stream = TimedClientStream(
+ audio_source=audio_source,
+ audio_format=flow_pcm_format,
+ )
+
+ # Setup the main channel subscription
+ # aiosendspin only really supports 16-bit for now TODO: upgrade later to 32-bit
+ main_channel_gen, main_position = await self.timed_client_stream.get_stream(
+ output_format=pcm_format,
+ filter_params=None, # TODO: this should probably still include the safety limiter
+ )
+ assert main_position == 0.0 # first subscriber, should be zero
+ media_stream = MusicAssistantMediaStream(
+ main_channel_source=main_channel_gen,
+ main_channel_format=SendspinAudioFormat(
+ sample_rate=pcm_format.sample_rate,
+ bit_depth=pcm_format.bit_depth,
+ channels=pcm_format.channels,
+ codec=audio_codec,
+ ),
+ player_instance=self,
+ internal_format=flow_pcm_format,
+ output_format=pcm_format,
+ )
+
+ stop_time = await self.api.group.play_media(media_stream)
+ await self.api.group.stop(stop_time)
+ except asyncio.CancelledError:
+ self.logger.debug("Playback cancelled for player %s", self.display_name)
+ raise
+ except Exception:
+ self.logger.exception("Error during playback for player %s", self.display_name)
+ raise
+ finally:
+ self.timed_client_stream = None
+
+ async def set_members(
+ self,
+ player_ids_to_add: list[str] | None = None,
+ player_ids_to_remove: list[str] | None = None,
+ ) -> None:
+ """Handle SET_MEMBERS command on the player."""
+ self.logger.debug(
+ "set_members called: adding %s, removing %s", player_ids_to_add, player_ids_to_remove
+ )
+ for player_id in player_ids_to_remove or []:
+ player = self.mass.players.get(player_id, True)
+ player = cast("SendspinPlayer", player) # For type checking
+ await self.api.group.remove_client(player.api)
+ for player_id in player_ids_to_add or []:
+ player = self.mass.players.get(player_id, True)
+ player = cast("SendspinPlayer", player) # For type checking
+ await self.api.group.add_client(player.api)
+ # self.group_members will be updated by the group event callback
+
+ async def _send_album_artwork(self, current_item: QueueItem) -> str | None:
+ """
+ Send album artwork to the sendspin group.
+
+ Args:
+ current_item: The current queue item.
+ """
+ artwork_url = None
+ if current_item.image is not None:
+ artwork_url = self.mass.metadata.get_image_url(current_item.image)
+
+ if artwork_url != self.last_sent_artwork_url:
+ # Image changed, resend the artwork
+ self.last_sent_artwork_url = artwork_url
+ if artwork_url is not None and current_item.media_item is not None:
+ image_data = await self.mass.metadata.get_image_data_for_item(
+ current_item.media_item
+ )
+ if image_data is not None:
+ image = await asyncio.to_thread(Image.open, BytesIO(image_data))
+ await self.api.group.set_media_art(image, source=ArtworkSource.ALBUM)
+ else:
+ # Clear artwork if none available
+ await self.api.group.set_media_art(None, source=ArtworkSource.ALBUM)
+
+ return artwork_url
+
+ async def _send_artist_artwork(self, current_item: QueueItem) -> None:
+ """
+ Send artist artwork to the sendspin group.
+
+ Args:
+ current_item: The current queue item.
+ """
+ # Extract primary artist if available
+ artist_artwork_url = None
+ if current_item.media_item is not None and hasattr(current_item.media_item, "artists"):
+ artists = getattr(current_item.media_item, "artists", None)
+ if artists and len(artists) > 0:
+ primary_artist = artists[0]
+ if hasattr(primary_artist, "image"):
+ artist_image = getattr(primary_artist, "image", None)
+ if artist_image is not None:
+ artist_artwork_url = self.mass.metadata.get_image_url(artist_image)
+
+ if artist_artwork_url != self.last_sent_artist_artwork_url:
+ # Artist image changed, resend the artwork
+ self.last_sent_artist_artwork_url = artist_artwork_url
+ if artist_artwork_url is not None:
+ artist_image_data = await self.mass.metadata.get_image_data_for_item(
+ primary_artist, img_type=ImageType.THUMB
+ )
+ if artist_image_data is not None:
+ artist_image = await asyncio.to_thread(Image.open, BytesIO(artist_image_data))
+ await self.api.group.set_media_art(artist_image, source=ArtworkSource.ARTIST)
+ else:
+ # Clear artist artwork if none available
+ await self.api.group.set_media_art(None, source=ArtworkSource.ARTIST)
+
+ async def _on_queue_update(self, event: MassEvent) -> None:
+ """Extract and send current media metadata to sendspin players on queue updates."""
+ queue = self.mass.player_queues.get_active_queue(self.player_id)
+ if not queue or not queue.current_item:
+ return
+
+ current_item = queue.current_item
+
+ title = current_item.name
+ artist = None
+ album_artist = None
+ album = None
+ track = None
+ artwork_url = None
+ year = None
+
+ if (streamdetails := current_item.streamdetails) and streamdetails.stream_title:
+ # stream title/metadata from radio/live stream
+ if " - " in streamdetails.stream_title:
+ artist, title = streamdetails.stream_title.split(" - ", 1)
+ else:
+ title = streamdetails.stream_title
+ artist = ""
+ # set album to radio station name
+ album = current_item.name
+ elif media_item := current_item.media_item:
+ title = media_item.name
+ if artist_str := getattr(media_item, "artist_str", None):
+ artist = artist_str
+ if _album := getattr(media_item, "album", None):
+ album = _album.name
+ year = getattr(_album, "year", None)
+ album_artist = getattr(_album, "artist_str", None)
+ if _track_number := getattr(media_item, "track_number", None):
+ track = _track_number
+
+ # Send album and artist artwork
+ artwork_url = await self._send_album_artwork(current_item)
+ await self._send_artist_artwork(current_item)
+
+ track_duration = current_item.duration
+
+ repeat = SendspinRepeatMode.OFF
+ if queue.repeat_mode == RepeatMode.ALL:
+ repeat = SendspinRepeatMode.ALL
+ elif queue.repeat_mode == RepeatMode.ONE:
+ repeat = SendspinRepeatMode.ONE
+
+ shuffle = queue.shuffle_enabled
+
+ metadata = Metadata(
+ title=title,
+ artist=artist,
+ album_artist=album_artist,
+ album=album,
+ artwork_url=artwork_url,
+ year=year,
+ track=track,
+ track_duration=track_duration * 1000 if track_duration is not None else None,
+ track_progress=int(queue.corrected_elapsed_time * 1000),
+ playback_speed=1000,
+ repeat=repeat,
+ shuffle=shuffle,
+ )
+
+ # Send metadata to the group
+ self.api.group.set_metadata(metadata)
+
+ async def get_config_entries(
+ self,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+ ) -> list[ConfigEntry]:
+ """Return all (provider/player specific) Config Entries for the player."""
+ default_entries = await super().get_config_entries(action=action, values=values)
+ return [
+ *default_entries,
+ CONF_ENTRY_FLOW_MODE_ENFORCED,
+ ]
+
+ async def on_unload(self) -> None:
+ """Handle logic when the player is unloaded from the Player controller."""
+ await super().on_unload()
+ self.unsub_event_cb()
+ self.unsub_group_event_cb()
+ await self.api.disconnect()
--- /dev/null
+"""Player Provider for Sendspin."""
+
+from __future__ import annotations
+
+from collections.abc import Callable
+from typing import TYPE_CHECKING, cast
+
+from aiosendspin.server import ClientAddedEvent, ClientRemovedEvent, SendspinEvent, SendspinServer
+from music_assistant_models.enums import ProviderFeature
+
+from music_assistant.mass import MusicAssistant
+from music_assistant.models.player_provider import PlayerProvider
+from music_assistant.providers.sendspin.player import SendspinPlayer
+
+if TYPE_CHECKING:
+ from music_assistant_models.config_entries import ProviderConfig
+ from music_assistant_models.provider import ProviderManifest
+
+
+class SendspinProvider(PlayerProvider):
+ """Player Provider for Sendspin."""
+
+ server_api: SendspinServer
+ unregister_cbs: list[Callable[[], None]]
+
+ def __init__(
+ self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+ ) -> None:
+ """Initialize a new Sendspin player provider."""
+ super().__init__(mass, manifest, config)
+ self.server_api = SendspinServer(
+ self.mass.loop, mass.server_id, "Music Assistant", self.mass.http_session
+ )
+ self.unregister_cbs = [
+ self.server_api.add_event_listener(self.event_cb),
+ # For the web player
+ self.mass.webserver.register_dynamic_route(
+ "/sendspin", self.server_api.on_client_connect
+ ),
+ ]
+
+ async def event_cb(self, server: SendspinServer, event: SendspinEvent) -> None:
+ """Event callback registered to the sendspin server."""
+ self.logger.debug("Received SendspinEvent: %s", event)
+ match event:
+ case ClientAddedEvent(client_id):
+ player = SendspinPlayer(self, client_id)
+ self.logger.debug("Client %s connected", client_id)
+ await self.mass.players.register(player)
+ case ClientRemovedEvent(client_id):
+ self.logger.debug("Client %s disconnected", client_id)
+ await self.mass.players.unregister(client_id)
+ case _:
+ self.logger.error("Unknown sendspin event: %s", event)
+
+ @property
+ def supported_features(self) -> set[ProviderFeature]:
+ """Return the features supported by this Provider."""
+ return {
+ ProviderFeature.SYNC_PLAYERS,
+ }
+
+ async def loaded_in_mass(self) -> None:
+ """Call after the provider has been loaded."""
+ await super().loaded_in_mass()
+ # Start server for handling incoming Sendspin connections from clients
+ # and mDNS discovery of new clients
+ await self.server_api.start_server(
+ port=8927,
+ host=self.mass.streams.bind_ip,
+ advertise_host=cast("str", self.mass.streams.publish_ip),
+ )
+
+ async def unload(self, is_removed: bool = False) -> None:
+ """
+ Handle unload/close of the provider.
+
+ Called when provider is deregistered (e.g. MA exiting or config reloading).
+ is_removed will be set to True when the provider is removed from the configuration.
+ """
+ # Stop the Sendspin server
+ await self.server_api.close()
+
+ for cb in self.unregister_cbs:
+ cb()
+ self.unregister_cbs = []
+ for player in self.players:
+ self.logger.debug("Unloading player %s", player.name)
+ await self.mass.players.unregister(player.player_id)
--- /dev/null
+"""
+Timestamped multi-client audio stream for position-aware playback.
+
+This module provides a multi-client streaming implementation optimized for
+aiosendspin's synchronized multi-room audio playback. Each audio chunk is
+timestamped, allowing late-joining players to start at the correct position
+for synchronized playback across multiple devices.
+"""
+
+import asyncio
+import logging
+from collections import deque
+from collections.abc import AsyncGenerator
+from contextlib import suppress
+from uuid import UUID, uuid4
+
+from music_assistant_models.media_items import AudioFormat
+
+from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
+
+LOGGER = logging.getLogger(__name__)
+
+# Minimum/target buffer retention time in seconds
+# This 10s buffer is currently required since:
+# - aiosendspin currently uses a fixed 5s buffer to allow up to ~4s of network interruption
+# - ~2s allows for ffmpeg processing time and some margin
+# - ~3s are currently needed internally by aiosendspin for initial buffering
+MIN_BUFFER_DURATION = 10.0
+# Maximum buffer duration before raising an error (safety mechanism)
+MAX_BUFFER_DURATION = MIN_BUFFER_DURATION + 5.0
+
+
+class TimedClientStream:
+ """Multi-client audio stream with timestamped chunks for synchronized playback."""
+
+ audio_source: AsyncGenerator[bytes, None]
+ """The source audio stream to read from."""
+ audio_format: AudioFormat
+ """The audio format of the source stream."""
+ chunk_buffer: deque[tuple[bytes, float]]
+ """Buffer storing chunks with their timestamps in seconds (chunk_data, timestamp_seconds)."""
+ subscriber_positions: dict[UUID, int]
+ """Subscriber positions: maps subscriber_id to position (index into chunk_buffer)."""
+ buffer_lock: asyncio.Lock
+ """Lock for buffer and shared state access."""
+ source_read_lock: asyncio.Lock
+ """Lock to serialize audio source reads."""
+ stream_ended: bool = False
+ """Track if stream has ended."""
+ current_position: float = 0.0
+ """Current position in seconds (from stream start)."""
+
+ def __init__(
+ self,
+ audio_source: AsyncGenerator[bytes, None],
+ audio_format: AudioFormat,
+ ) -> None:
+ """Initialize TimedClientStream."""
+ self.audio_source = audio_source
+ self.audio_format = audio_format
+ self.chunk_buffer = deque()
+ self.subscriber_positions = {}
+ self.buffer_lock = asyncio.Lock()
+ self.source_read_lock = asyncio.Lock()
+
+ def _get_bytes_per_second(self) -> int:
+ """Get bytes per second for the audio format."""
+ return (
+ self.audio_format.sample_rate
+ * self.audio_format.channels
+ * (self.audio_format.bit_depth // 8)
+ )
+
+ def _bytes_to_seconds(self, num_bytes: int) -> float:
+ """Convert bytes to seconds based on audio format."""
+ bytes_per_second = self._get_bytes_per_second()
+ if bytes_per_second == 0:
+ return 0.0
+ return num_bytes / bytes_per_second
+
+ def _get_buffer_duration(self) -> float:
+ """Calculate total duration of buffered chunks in seconds."""
+ if not self.chunk_buffer:
+ return 0.0
+ # Duration is from first chunk timestamp to current position
+ first_chunk_timestamp = self.chunk_buffer[0][1]
+ return self.current_position - first_chunk_timestamp
+
+ def _cleanup_old_chunks(self) -> None:
+ """Remove old chunks when all subscribers read them and min duration exceeded."""
+ # Find the oldest position still needed by any subscriber
+ if self.subscriber_positions:
+ min_position = min(self.subscriber_positions.values())
+ else:
+ min_position = len(self.chunk_buffer)
+
+ # Calculate target oldest timestamp
+ # This ensures buffer contains at least MIN_BUFFER_DURATION seconds of recent data
+ target_oldest = self.current_position - MIN_BUFFER_DURATION
+
+ # Remove old chunks that meet both conditions:
+ # 1. Before min_position (no subscriber needs them)
+ # 2. Older than target_oldest (outside minimum retention window)
+ chunks_removed = 0
+ while chunks_removed < min_position and self.chunk_buffer:
+ _chunk_bytes, chunk_timestamp = self.chunk_buffer[0]
+ if chunk_timestamp < target_oldest:
+ self.chunk_buffer.popleft()
+ chunks_removed += 1
+ else:
+ # Stop when we reach chunks we want to keep
+ break
+
+ # Adjust all subscriber positions to account for removed chunks
+ for sub_id in self.subscriber_positions:
+ self.subscriber_positions[sub_id] -= chunks_removed
+
+ async def _read_chunk_from_source(self) -> None:
+ """Read next chunk from audio source and add to buffer."""
+ try:
+ chunk = await anext(self.audio_source)
+ async with self.buffer_lock:
+ # Calculate timestamp for this chunk
+ chunk_timestamp = self.current_position
+ chunk_duration = self._bytes_to_seconds(len(chunk))
+
+ # Append chunk with its timestamp
+ self.chunk_buffer.append((chunk, chunk_timestamp))
+
+ # Update current position
+ self.current_position += chunk_duration
+
+ # Safety check: ensure buffer doesn't grow unbounded
+ if self._get_buffer_duration() > MAX_BUFFER_DURATION:
+ msg = f"Buffer exceeded maximum duration ({MAX_BUFFER_DURATION}s)"
+ raise RuntimeError(msg)
+ except StopAsyncIteration:
+ # Source exhausted, add EOF marker
+ async with self.buffer_lock:
+ self.chunk_buffer.append((b"", self.current_position))
+ self.stream_ended = True
+ except Exception:
+ # Source errored or was canceled, mark stream as ended
+ async with self.buffer_lock:
+ self.stream_ended = True
+ raise
+
+ async def _check_buffer(self, subscriber_id: UUID) -> bool | None:
+ """
+ Check if buffer has grown or stream ended.
+
+ REQUIRES: Caller must hold self.source_read_lock before calling.
+
+ Returns:
+ True if should continue reading loop (chunk found in buffer),
+ False if should break (stream ended),
+ None if should proceed to read from source.
+ """
+ async with self.buffer_lock:
+ position = self.subscriber_positions[subscriber_id]
+ if position < len(self.chunk_buffer):
+ # Another subscriber already read the chunk
+ return True
+ if self.stream_ended:
+ # Stream ended while waiting for source lock
+ return False
+ return None # Continue to read from source
+
+ async def _get_chunk_from_buffer(self, subscriber_id: UUID) -> bytes | None:
+ """
+ Get next chunk from buffer for subscriber.
+
+ Returns:
+ Chunk bytes if available, None if no chunk available, or empty bytes for EOF.
+ """
+ async with self.buffer_lock:
+ position = self.subscriber_positions[subscriber_id]
+
+ # Check if we have a chunk at this position
+ if position < len(self.chunk_buffer):
+ # Chunk available in buffer
+ chunk_data, _ = self.chunk_buffer[position]
+
+ # Move to next position
+ self.subscriber_positions[subscriber_id] = position + 1
+
+ # Cleanup old chunks that no one needs
+ self._cleanup_old_chunks()
+ return chunk_data
+ if self.stream_ended:
+ # Stream ended and we've read all buffered chunks
+ return b""
+ return None
+
+ async def _cleanup_subscriber(self, subscriber_id: UUID) -> None:
+ """Clean up subscriber and close stream if no subscribers left."""
+ async with self.buffer_lock:
+ if subscriber_id in self.subscriber_positions:
+ del self.subscriber_positions[subscriber_id]
+
+ # If no subscribers left, close the stream
+ if not self.subscriber_positions and not self.stream_ended:
+ self.stream_ended = True
+ # Close the audio source generator to prevent resource leak
+ with suppress(Exception):
+ await self.audio_source.aclose()
+
+ async def get_stream(
+ self,
+ output_format: AudioFormat,
+ filter_params: list[str] | None = None,
+ ) -> tuple[AsyncGenerator[bytes, None], float]:
+ """
+ Get (client specific encoded) ffmpeg stream.
+
+ Returns:
+ A tuple of (audio generator, actual position in seconds)
+ """
+ audio_gen, position = await self.subscribe_raw()
+
+ async def _stream_with_ffmpeg() -> AsyncGenerator[bytes, None]:
+ try:
+ async for chunk in get_ffmpeg_stream(
+ audio_input=audio_gen,
+ input_format=self.audio_format,
+ output_format=output_format,
+ filter_params=filter_params,
+ ):
+ yield chunk
+ finally:
+ # Ensure audio_gen cleanup runs immediately
+ with suppress(Exception):
+ await audio_gen.aclose()
+
+ return _stream_with_ffmpeg(), position
+
+ async def _generate(self, subscriber_id: UUID) -> AsyncGenerator[bytes, None]:
+ """
+ Generate audio chunks for a subscriber.
+
+ Yields chunks from the buffer until the stream ends, reading from the source
+ as needed. Automatically cleans up the subscriber on exit.
+ """
+ try:
+ # Position already set above atomically with timestamp capture
+ while True:
+ # Try to get chunk from buffer
+ chunk_bytes = await self._get_chunk_from_buffer(subscriber_id)
+
+ # Release lock before yielding to avoid deadlock
+ if chunk_bytes is not None:
+ if chunk_bytes == b"":
+ # End of stream marker
+ break
+ yield chunk_bytes
+ else:
+ # No chunk available, need to read from source
+ # Use source_read_lock to ensure only one subscriber reads at a time
+ async with self.source_read_lock:
+ # Check again if buffer has grown or stream ended while waiting
+ check_result = await self._check_buffer(subscriber_id)
+ if check_result is True:
+ # Another subscriber already read the chunk
+ continue
+ if check_result is False:
+ # Stream ended while waiting for source lock
+ break
+
+ # Read next chunk from source (check_result is None)
+ # Note: This may block if the audio_source does synchronous I/O
+ await self._read_chunk_from_source()
+
+ finally:
+ await self._cleanup_subscriber(subscriber_id)
+
+ async def subscribe_raw(self) -> tuple[AsyncGenerator[bytes, None], float]:
+ """
+ Subscribe to the raw/unaltered audio stream.
+
+ Returns:
+ A tuple of (audio generator, actual position in seconds).
+ The position indicates where in the stream the first chunk will be from.
+
+ Note:
+ Callers must properly consume or cancel the returned generator to prevent
+ resource leaks.
+ """
+ subscriber_id = uuid4()
+
+ # Atomically capture starting position and register subscriber while holding lock
+ async with self.buffer_lock:
+ if self.chunk_buffer:
+ _, starting_position = self.chunk_buffer[0]
+ # Log buffer time range for debugging
+ newest_ts = self.chunk_buffer[-1][1]
+ oldest_relative = starting_position - self.current_position
+ newest_relative = newest_ts - self.current_position
+ LOGGER.debug(
+ "New subscriber joining: buffer contains %.3fs (from %.3fs to %.3fs, "
+ "current_position=%.3fs)",
+ newest_ts - starting_position,
+ oldest_relative,
+ newest_relative,
+ self.current_position,
+ )
+ else:
+ starting_position = self.current_position
+ LOGGER.debug(
+ "New subscriber joining: buffer is empty, starting at current_position=%.3fs",
+ self.current_position,
+ )
+ # Register subscriber at position 0 (start of buffer)
+ self.subscriber_positions[subscriber_id] = 0
+
+ # Return generator and starting position in seconds
+ return self._generate(subscriber_id), starting_position
aiohttp-fast-zlib==0.3.0
aiojellyfin==0.14.1
aiomusiccast==0.15.0
-aioresonate==0.13.1
aiortc>=1.6.0
aiorun==2025.1.1
+aiosendspin==1.0.0
aioslimproto==3.1.1
aiosonos==0.1.9
aiosqlite==0.21.0