changed = True
# set 'onboard_done' flag if we have any (non default) provider configs
- if not self._data.get(CONF_ONBOARD_DONE):
+ if self._data.get(CONF_ONBOARD_DONE) is None:
default_providers = {x.domain for x in self.mass.get_provider_manifests() if x.builtin}
for provider_config in self._data.get(CONF_PROVIDERS, {}).values():
if provider_config["domain"] not in default_providers:
self._data[CONF_ONBOARD_DONE] = True
changed = True
break
+ # migrate slimproto --> squeezelite
+ for instance_id, provider_config in list(self._data.get(CONF_PROVIDERS, {}).items()):
+ if provider_config.get("domain") == "slimproto":
+ del self._data[CONF_PROVIDERS][instance_id]
+ new_instance_id = instance_id.replace("slimproto", "squeezelite")
+ provider_config["instance_id"] = new_instance_id
+ provider_config["domain"] = "squeezelite"
+ self._data[CONF_PROVIDERS][new_instance_id] = provider_config
+ changed = True
if changed:
await self._async_save()
+++ /dev/null
-"""Base/builtin provider with support for players using slimproto."""
-
-from __future__ import annotations
-
-import asyncio
-import logging
-import statistics
-import time
-from collections import deque
-from collections.abc import Iterator
-from dataclasses import dataclass
-from typing import TYPE_CHECKING
-
-from aiohttp import web
-from aioslimproto.client import PlayerState as SlimPlayerState
-from aioslimproto.client import SlimClient
-from aioslimproto.client import TransitionType as SlimTransition
-from aioslimproto.models import EventType as SlimEventType
-from aioslimproto.models import Preset as SlimPreset
-from aioslimproto.models import VisualisationType as SlimVisualisationType
-from aioslimproto.server import SlimServer
-from music_assistant_models.config_entries import (
- ConfigEntry,
- ConfigValueOption,
- ConfigValueType,
- PlayerConfig,
-)
-from music_assistant_models.enums import (
- ConfigEntryType,
- ContentType,
- MediaType,
- PlayerFeature,
- PlayerState,
- PlayerType,
- ProviderFeature,
- RepeatMode,
-)
-from music_assistant_models.errors import MusicAssistantError, SetupFailedError
-from music_assistant_models.media_items import AudioFormat
-from music_assistant_models.player import DeviceInfo, Player, PlayerMedia
-
-from music_assistant.constants import (
- CONF_CROSSFADE,
- CONF_CROSSFADE_DURATION,
- CONF_ENTRY_CROSSFADE,
- CONF_ENTRY_CROSSFADE_DURATION,
- CONF_ENTRY_DEPRECATED_EQ_BASS,
- CONF_ENTRY_DEPRECATED_EQ_MID,
- CONF_ENTRY_DEPRECATED_EQ_TREBLE,
- CONF_ENTRY_HTTP_PROFILE_FORCED_2,
- CONF_ENTRY_OUTPUT_CHANNELS,
- CONF_ENTRY_OUTPUT_CODEC,
- CONF_ENTRY_SYNC_ADJUST,
- CONF_PORT,
- CONF_SYNC_ADJUST,
- DEFAULT_PCM_FORMAT,
- VERBOSE_LOG_LEVEL,
- create_sample_rates_config_entry,
-)
-from music_assistant.helpers.audio import get_ffmpeg_stream, get_player_filter_params
-from music_assistant.helpers.util import TaskManager
-from music_assistant.models.player_provider import PlayerProvider
-from music_assistant.providers.player_group import PlayerGroupProvider
-
-from .multi_client_stream import MultiClientStream
-
-if TYPE_CHECKING:
- from aioslimproto.models import SlimEvent
- from music_assistant_models.config_entries import ProviderConfig
- from music_assistant_models.provider import ProviderManifest
-
- from music_assistant import MusicAssistant
- from music_assistant.models import ProviderInstanceType
-
-
-CACHE_KEY_PREV_STATE = "slimproto_prev_state"
-
-
-STATE_MAP = {
- SlimPlayerState.BUFFERING: PlayerState.PLAYING,
- SlimPlayerState.BUFFER_READY: PlayerState.PLAYING,
- SlimPlayerState.PAUSED: PlayerState.PAUSED,
- SlimPlayerState.PLAYING: PlayerState.PLAYING,
- SlimPlayerState.STOPPED: PlayerState.IDLE,
-}
-REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
-
-# sync constants
-MIN_DEVIATION_ADJUST = 8 # 5 milliseconds
-MIN_REQ_PLAYPOINTS = 8 # we need at least 8 measurements
-DEVIATION_JUMP_IGNORE = 500 # ignore a sudden unrealistic jump
-MAX_SKIP_AHEAD_MS = 800 # 0.8 seconds
-
-
-@dataclass
-class SyncPlayPoint:
- """Simple structure to describe a Sync Playpoint."""
-
- timestamp: float
- sync_master: str
- diff: int
-
-
-CONF_CLI_TELNET_PORT = "cli_telnet_port"
-CONF_CLI_JSON_PORT = "cli_json_port"
-CONF_DISCOVERY = "discovery"
-CONF_DISPLAY = "display"
-CONF_VISUALIZATION = "visualization"
-
-DEFAULT_PLAYER_VOLUME = 20
-DEFAULT_SLIMPROTO_PORT = 3483
-DEFAULT_VISUALIZATION = SlimVisualisationType.NONE
-
-
-CONF_ENTRY_DISPLAY = ConfigEntry(
- key=CONF_DISPLAY,
- type=ConfigEntryType.BOOLEAN,
- default_value=False,
- required=False,
- label="Enable display support",
- description="Enable/disable native display support on squeezebox or squeezelite32 hardware.",
- category="advanced",
-)
-CONF_ENTRY_VISUALIZATION = ConfigEntry(
- key=CONF_VISUALIZATION,
- type=ConfigEntryType.STRING,
- default_value=DEFAULT_VISUALIZATION,
- options=[
- ConfigValueOption(title=x.name.replace("_", " ").title(), value=x.value)
- for x in SlimVisualisationType
- ],
- required=False,
- label="Visualization type",
- description="The type of visualization to show on the display "
- "during playback if the device supports this.",
- category="advanced",
- depends_on=CONF_DISPLAY,
-)
-
-
-async def setup(
- mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
-) -> ProviderInstanceType:
- """Initialize provider(instance) with given configuration."""
- return SlimprotoProvider(mass, manifest, config)
-
-
-async def get_config_entries(
- mass: MusicAssistant,
- instance_id: str | None = None,
- action: str | None = None,
- values: dict[str, ConfigValueType] | None = None,
-) -> tuple[ConfigEntry, ...]:
- """
- Return Config entries to setup this provider.
-
- instance_id: id of an existing provider instance (None if new instance setup).
- action: [optional] action key called from config entries UI.
- values: the (intermediate) raw values for config entries sent with the action.
- """
- # ruff: noqa: ARG001
- return (
- ConfigEntry(
- key=CONF_CLI_TELNET_PORT,
- type=ConfigEntryType.INTEGER,
- default_value=9090,
- label="Classic Squeezebox CLI Port",
- description="Some slimproto based players require the presence of the telnet CLI "
- " to request more information. \n\n"
- "By default this CLI is hosted on port 9090 but some players also accept "
- "a different port. Set to 0 to disable this functionality.\n\n"
- "Commands allowed on this interface are very limited and just enough to satisfy "
- "player compatibility, so security risks are minimized to practically zero."
- "You may safely disable this option if you have no players that rely on this feature "
- "or you dont care about the additional metadata.",
- category="advanced",
- ),
- ConfigEntry(
- key=CONF_CLI_JSON_PORT,
- type=ConfigEntryType.INTEGER,
- default_value=9000,
- label="JSON-RPC CLI/API Port",
- description="Some slimproto based players require the presence of the JSON-RPC "
- "API from LMS to request more information. For example to fetch the album cover "
- "and other metadata. \n\n"
- "This JSON-RPC API is compatible with Logitech Media Server but not all commands "
- "are implemented. Just enough to satisfy player compatibility. \n\n"
- "By default this JSON CLI is hosted on port 9000 but most players also accept "
- "it on a different port. Set to 0 to disable this functionality.\n\n"
- "You may safely disable this option if you have no players that rely on this feature "
- "or you dont care about the additional metadata.",
- category="advanced",
- ),
- ConfigEntry(
- key=CONF_DISCOVERY,
- type=ConfigEntryType.BOOLEAN,
- default_value=True,
- label="Enable Discovery server",
- description="Broadcast discovery packets for slimproto clients to automatically "
- "discover and connect to this server. \n\n"
- "You may want to disable this feature if you are running multiple slimproto servers "
- "on your network and/or you don't want clients to auto connect to this server.",
- category="advanced",
- ),
- ConfigEntry(
- key=CONF_PORT,
- type=ConfigEntryType.INTEGER,
- default_value=DEFAULT_SLIMPROTO_PORT,
- label="Slimproto port",
- description="The TCP/UDP port to run the slimproto sockets server. "
- "The default is 3483 and using a different port is not supported by "
- "hardware squeezebox players. Only adjust this port if you want to "
- "use other slimproto based servers side by side with (squeezelite) software players.",
- category="advanced",
- ),
- )
-
-
-class SlimprotoProvider(PlayerProvider):
- """Base/builtin provider for players using the SLIM protocol (aka slimproto)."""
-
- slimproto: SlimServer
- _sync_playpoints: dict[str, deque[SyncPlayPoint]]
- _do_not_resync_before: dict[str, float]
- _multi_streams: dict[str, MultiClientStream]
-
- @property
- def supported_features(self) -> set[ProviderFeature]:
- """Return the features supported by this Provider."""
- return {ProviderFeature.SYNC_PLAYERS}
-
- async def handle_async_init(self) -> None:
- """Handle async initialization of the provider."""
- self._sync_playpoints = {}
- self._do_not_resync_before = {}
- self._multi_streams = {}
- control_port = self.config.get_value(CONF_PORT)
- telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT)
- json_port = self.config.get_value(CONF_CLI_JSON_PORT)
- # silence aioslimproto logger a bit
- if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
- logging.getLogger("aioslimproto").setLevel(logging.DEBUG)
- else:
- logging.getLogger("aioslimproto").setLevel(self.logger.level + 10)
- self.slimproto = SlimServer(
- cli_port=telnet_port or None,
- cli_port_json=json_port or None,
- ip_address=self.mass.streams.publish_ip,
- name="Music Assistant",
- control_port=control_port,
- )
- # start slimproto socket server
- try:
- await self.slimproto.start()
- except OSError as err:
- raise SetupFailedError(
- "Unable to start the Slimproto server - "
- "is one of the required TCP ports already taken ?"
- ) from err
-
- async def loaded_in_mass(self) -> None:
- """Call after the provider has been loaded."""
- await super().loaded_in_mass()
- self.slimproto.subscribe(self._client_callback)
- self.mass.streams.register_dynamic_route(
- "/slimproto/multi", self._serve_multi_client_stream
- )
- # it seems that WiiM devices do not use the json rpc port that is broadcasted
- # in the discovery info but instead they just assume that the jsonrpc endpoint
- # lives on the same server as stream URL. So we need to provide a jsonrpc.js
- # endpoint that just redirects to the jsonrpc handler within the slimproto package.
- self.mass.streams.register_dynamic_route(
- "/jsonrpc.js", self.slimproto.cli._handle_jsonrpc_client
- )
-
- async def unload(self, is_removed: bool = False) -> None:
- """Handle close/cleanup of the provider."""
- self.mass.streams.unregister_dynamic_route("/slimproto/multi")
- self.mass.streams.unregister_dynamic_route("/jsonrpc.js")
- await self.slimproto.stop()
-
- async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
- """Return all (provider/player specific) Config Entries for the given player (if any)."""
- base_entries = await super().get_player_config_entries(player_id)
- if slimclient := self.slimproto.get_player(player_id):
- max_sample_rate = int(slimclient.max_sample_rate)
- else:
- # player not (yet) connected? use default
- max_sample_rate = 48000
- # create preset entries (for players that support it)
- preset_entries = ()
- presets = []
- async for playlist in self.mass.music.playlists.iter_library_items(True):
- presets.append(ConfigValueOption(playlist.name, playlist.uri))
- async for radio in self.mass.music.radio.iter_library_items(True):
- presets.append(ConfigValueOption(radio.name, radio.uri))
- preset_count = 10
- preset_entries = tuple(
- ConfigEntry(
- key=f"preset_{index}",
- type=ConfigEntryType.STRING,
- options=presets,
- label=f"Preset {index}",
- description="Assign a playable item to the player's preset. "
- "Only supported on real squeezebox hardware or jive(lite) based emulators.",
- category="presets",
- required=False,
- )
- for index in range(1, preset_count + 1)
- )
- return (
- base_entries
- + preset_entries
- + (
- CONF_ENTRY_CROSSFADE,
- CONF_ENTRY_DEPRECATED_EQ_BASS,
- CONF_ENTRY_DEPRECATED_EQ_MID,
- CONF_ENTRY_DEPRECATED_EQ_TREBLE,
- CONF_ENTRY_OUTPUT_CHANNELS,
- CONF_ENTRY_CROSSFADE_DURATION,
- CONF_ENTRY_OUTPUT_CODEC,
- CONF_ENTRY_SYNC_ADJUST,
- CONF_ENTRY_DISPLAY,
- CONF_ENTRY_VISUALIZATION,
- CONF_ENTRY_HTTP_PROFILE_FORCED_2,
- create_sample_rates_config_entry(
- max_sample_rate=max_sample_rate, max_bit_depth=24, safe_max_bit_depth=24
- ),
- )
- )
-
- async def on_player_config_change(self, config: PlayerConfig, changed_keys: set[str]) -> None:
- """Call (by config manager) when the configuration of a player changes."""
- if slimplayer := self.slimproto.get_player(config.player_id):
- await self._set_preset_items(slimplayer)
- await self._set_display(slimplayer)
- await super().on_player_config_change(config, changed_keys)
-
- async def cmd_stop(self, player_id: str) -> None:
- """Send STOP command to given player."""
- # forward command to player and any connected sync members
- async with TaskManager(self.mass) as tg:
- for slimplayer in self._get_sync_clients(player_id):
- tg.create_task(slimplayer.stop())
-
- async def cmd_play(self, player_id: str) -> None:
- """Send PLAY command to given player."""
- # forward command to player and any connected sync members
- async with TaskManager(self.mass) as tg:
- for slimplayer in self._get_sync_clients(player_id):
- tg.create_task(slimplayer.play())
-
- async def play_media(
- self,
- player_id: str,
- media: PlayerMedia,
- ) -> None:
- """Handle PLAY MEDIA on given player."""
- player = self.mass.players.get(player_id)
- if player.synced_to:
- msg = "A synced player cannot receive play commands directly"
- raise RuntimeError(msg)
-
- if not player.group_childs:
- slimplayer = self.slimproto.get_player(player_id)
- # simple, single-player playback
- await self._handle_play_url(
- slimplayer,
- url=media.uri,
- media=media,
- send_flush=True,
- auto_play=False,
- )
- return
-
- # this is a syncgroup, we need to handle this with a multi client stream
- master_audio_format = AudioFormat(
- content_type=DEFAULT_PCM_FORMAT.content_type,
- sample_rate=DEFAULT_PCM_FORMAT.sample_rate,
- bit_depth=DEFAULT_PCM_FORMAT.bit_depth,
- )
- if media.media_type == MediaType.ANNOUNCEMENT:
- # special case: stream announcement
- audio_source = self.mass.streams.get_announcement_stream(
- media.custom_data["url"],
- output_format=master_audio_format,
- use_pre_announce=media.custom_data["use_pre_announce"],
- )
- elif media.media_type == MediaType.PLUGIN_SOURCE:
- # special case: plugin source stream
- audio_source = self.mass.streams.get_plugin_source_stream(
- plugin_source_id=media.custom_data["source_id"],
- output_format=master_audio_format,
- # need to pass player_id from the PlayerMedia object
- # because this could have been a group
- player_id=media.custom_data["player_id"],
- )
- elif media.queue_id.startswith("ugp_"):
- # special case: UGP stream
- ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
- ugp_stream = ugp_provider.ugp_streams[media.queue_id]
- # Filter is later applied in MultiClientStream
- audio_source = ugp_stream.get_stream(master_audio_format, filter_params=None)
- elif media.media_type == MediaType.RADIO:
- # use single item stream request for radio streams
- audio_source = self.mass.streams.get_queue_item_stream(
- queue_item=self.mass.player_queues.get_item(media.queue_id, media.queue_item_id),
- pcm_format=master_audio_format,
- )
- elif media.queue_id and media.queue_item_id:
- # regular queue stream request
- audio_source = self.mass.streams.get_queue_flow_stream(
- queue=self.mass.player_queues.get(media.queue_id),
- start_queue_item=self.mass.player_queues.get_item(
- media.queue_id, media.queue_item_id
- ),
- pcm_format=master_audio_format,
- )
- else:
- # assume url or some other direct path
- # NOTE: this will fail if its an uri not playable by ffmpeg
- audio_source = get_ffmpeg_stream(
- audio_input=media.uri,
- input_format=AudioFormat(ContentType.try_parse(media.uri)),
- output_format=master_audio_format,
- )
- # start the stream task
- self._multi_streams[player_id] = stream = MultiClientStream(
- audio_source=audio_source, audio_format=master_audio_format
- )
- base_url = f"{self.mass.streams.base_url}/slimproto/multi?player_id={player_id}&fmt=flac"
-
- # forward to downstream play_media commands
- async with TaskManager(self.mass) as tg:
- for slimplayer in self._get_sync_clients(player_id):
- url = f"{base_url}&child_player_id={slimplayer.player_id}"
- stream.expected_clients += 1
- tg.create_task(
- self._handle_play_url(
- slimplayer,
- url=url,
- media=media,
- send_flush=True,
- auto_play=False,
- )
- )
-
- async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
- """Handle enqueuing of the next queue item on the player."""
- if not (slimplayer := self.slimproto.get_player(player_id)):
- return
- await self._handle_play_url(
- slimplayer,
- url=media.uri,
- media=media,
- enqueue=True,
- send_flush=False,
- auto_play=True,
- )
-
- async def _handle_play_url(
- self,
- slimplayer: SlimClient,
- url: str,
- media: PlayerMedia,
- enqueue: bool = False,
- send_flush: bool = True,
- auto_play: bool = False,
- ) -> None:
- """Handle playback of an url on slimproto player(s)."""
- player_id = slimplayer.player_id
- if crossfade := await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE):
- transition_duration = await self.mass.config.get_player_config_value(
- player_id, CONF_CROSSFADE_DURATION
- )
- else:
- transition_duration = 0
-
- metadata = {
- "item_id": media.uri,
- "title": media.title,
- "album": media.album,
- "artist": media.artist,
- "image_url": media.image_url,
- "duration": media.duration,
- "queue_id": media.queue_id,
- "queue_item_id": media.queue_item_id,
- }
- if queue := self.mass.player_queues.get(media.queue_id):
- slimplayer.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode]
- slimplayer.extra_data["playlist shuffle"] = int(queue.shuffle_enabled)
- await slimplayer.play_url(
- url=url,
- mime_type=f"audio/{url.split('.')[-1].split('?')[0]}",
- metadata=metadata,
- enqueue=enqueue,
- send_flush=send_flush,
- transition=SlimTransition.CROSSFADE if crossfade else SlimTransition.NONE,
- transition_duration=transition_duration,
- # if autoplay=False playback will not start automatically
- # instead 'buffer ready' will be called when the buffer is full
- # to coordinate a start of multiple synced players
- autostart=auto_play,
- )
- # if queue is set to single track repeat,
- # immediately set this track as the next
- # this prevents race conditions with super short audio clips (on single repeat)
- # https://github.com/music-assistant/hass-music-assistant/issues/2059
- if queue and queue.repeat_mode == RepeatMode.ONE:
- self.mass.call_later(
- 0.2,
- slimplayer.play_url(
- url=url,
- mime_type=f"audio/{url.split('.')[-1].split('?')[0]}",
- metadata=metadata,
- enqueue=True,
- send_flush=False,
- transition=SlimTransition.CROSSFADE if crossfade else SlimTransition.NONE,
- transition_duration=transition_duration,
- autostart=True,
- ),
- )
-
- async def cmd_pause(self, player_id: str) -> None:
- """Send PAUSE command to given player."""
- # forward command to player and any connected sync members
- async with TaskManager(self.mass) as tg:
- for slimplayer in self._get_sync_clients(player_id):
- tg.create_task(slimplayer.pause())
-
- async def cmd_power(self, player_id: str, powered: bool) -> None:
- """Send POWER command to given player."""
- if slimplayer := self.slimproto.get_player(player_id):
- await slimplayer.power(powered)
- # store last state in cache
- await self.mass.cache.set(
- player_id, (powered, slimplayer.volume_level), base_key=CACHE_KEY_PREV_STATE
- )
-
- async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
- """Send VOLUME_SET command to given player."""
- if slimplayer := self.slimproto.get_player(player_id):
- await slimplayer.volume_set(volume_level)
- # store last state in cache
- await self.mass.cache.set(
- player_id, (slimplayer.powered, volume_level), base_key=CACHE_KEY_PREV_STATE
- )
-
- async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
- """Send VOLUME MUTE command to given player."""
- if slimplayer := self.slimproto.get_player(player_id):
- await slimplayer.mute(muted)
-
- async def cmd_group(self, player_id: str, target_player: str) -> None:
- """Handle GROUP command for given player."""
- child_player = self.mass.players.get(player_id)
- assert child_player # guard
- parent_player = self.mass.players.get(target_player)
- assert parent_player # guard
- if parent_player.synced_to:
- raise RuntimeError("Parent player is already synced!")
- if child_player.synced_to and child_player.synced_to != target_player:
- raise RuntimeError("Player is already synced to another player")
- # always make sure that the parent player is part of the sync group
- parent_player.group_childs.append(parent_player.player_id)
- parent_player.group_childs.append(child_player.player_id)
- child_player.synced_to = parent_player.player_id
- # check if we should (re)start or join a stream session
- # TODO: support late joining of a client into an existing stream session
- # so it doesn't need to be restarted anymore.
- active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id)
- if active_queue.state == PlayerState.PLAYING:
- # playback needs to be restarted to form a new multi client stream session
- # this could potentially be called by multiple players at the exact same time
- # so we debounce the resync a bit here with a timer
- self.mass.call_later(
- 1,
- self.mass.player_queues.resume,
- active_queue.queue_id,
- fade_in=False,
- task_id=f"resume_{active_queue.queue_id}",
- )
- else:
- # make sure that the player manager gets an update
- self.mass.players.update(child_player.player_id, skip_forward=True)
- self.mass.players.update(parent_player.player_id, skip_forward=True)
-
- async def cmd_ungroup(self, player_id: str) -> None:
- """Handle UNGROUP command for given player.
-
- Remove the given player from any (sync)groups it currently is grouped to.
-
- - player_id: player_id of the player to handle the command.
- """
- player = self.mass.players.get(player_id, raise_unavailable=True)
- if player.synced_to:
- group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True)
- if player_id in group_leader.group_childs:
- group_leader.group_childs.remove(player_id)
- player.synced_to = None
- if slimclient := self.slimproto.get_player(player_id):
- await slimclient.stop()
- # make sure that the player manager gets an update
- self.mass.players.update(player.player_id, skip_forward=True)
- self.mass.players.update(group_leader.player_id, skip_forward=True)
-
- def _client_callback(
- self,
- event: SlimEvent,
- ) -> None:
- if self.mass.closing:
- return
-
- if event.type == SlimEventType.PLAYER_DISCONNECTED:
- if mass_player := self.mass.players.get(event.player_id):
- mass_player.available = False
- self.mass.players.update(mass_player.player_id)
- return
-
- if not (slimplayer := self.slimproto.get_player(event.player_id)):
- return
-
- if event.type == SlimEventType.PLAYER_CONNECTED:
- self.mass.create_task(self._handle_connected(slimplayer))
- return
-
- if event.type == SlimEventType.PLAYER_BUFFER_READY:
- self.mass.create_task(self._handle_buffer_ready(slimplayer))
- return
-
- if event.type == SlimEventType.PLAYER_HEARTBEAT:
- self._handle_player_heartbeat(slimplayer)
- return
-
- if event.type in (SlimEventType.PLAYER_BTN_EVENT, SlimEventType.PLAYER_CLI_EVENT):
- self.mass.create_task(self._handle_player_cli_event(slimplayer, event))
- return
-
- # forward player update to MA player controller
- self.mass.create_task(self._handle_player_update(slimplayer))
-
- async def _handle_player_update(self, slimplayer: SlimClient) -> None:
- """Process SlimClient update/add to Player controller."""
- player_id = slimplayer.player_id
- player = self.mass.players.get(player_id, raise_unavailable=False)
- if not player:
- # player does not yet exist, create it
- player = Player(
- player_id=player_id,
- provider=self.instance_id,
- type=PlayerType.PLAYER,
- name=slimplayer.name,
- available=True,
- powered=slimplayer.powered,
- device_info=DeviceInfo(
- model=slimplayer.device_model,
- ip_address=slimplayer.device_address,
- manufacturer=slimplayer.device_type,
- ),
- supported_features={
- PlayerFeature.POWER,
- PlayerFeature.SET_MEMBERS,
- PlayerFeature.MULTI_DEVICE_DSP,
- PlayerFeature.VOLUME_SET,
- PlayerFeature.PAUSE,
- PlayerFeature.VOLUME_MUTE,
- PlayerFeature.ENQUEUE,
- },
- can_group_with={self.instance_id},
- )
- await self.mass.players.register_or_update(player)
-
- # update player state on player events
- player.available = True
- if slimplayer.current_media and (metadata := slimplayer.current_media.metadata):
- player.current_media = PlayerMedia(
- uri=metadata.get("item_id"),
- title=metadata.get("title"),
- album=metadata.get("album"),
- artist=metadata.get("artist"),
- image_url=metadata.get("image_url"),
- duration=metadata.get("duration"),
- queue_id=metadata.get("queue_id"),
- queue_item_id=metadata.get("queue_item_id"),
- )
- else:
- player.current_media = None
- player.active_source = player.player_id
- player.name = slimplayer.name
- player.powered = slimplayer.powered
- player.state = STATE_MAP[slimplayer.state]
- player.volume_level = slimplayer.volume_level
- player.volume_muted = slimplayer.muted
- self.mass.players.update(player_id)
-
- def _handle_player_heartbeat(self, slimplayer: SlimClient) -> None:
- """Process SlimClient elapsed_time update."""
- if slimplayer.state == SlimPlayerState.STOPPED:
- # ignore server heartbeats when stopped
- return
-
- # elapsed time change on the player will be auto picked up
- # by the player manager.
- if not (player := self.mass.players.get(slimplayer.player_id)):
- # race condition?!
- return
- player.elapsed_time = slimplayer.elapsed_seconds
- player.elapsed_time_last_updated = time.time()
-
- # handle sync
- if player.synced_to:
- self._handle_client_sync(slimplayer)
-
- async def _handle_player_cli_event(self, slimplayer: SlimClient, event: SlimEvent) -> None:
- """Process CLI Event."""
- if not event.data:
- return
- queue = self.mass.player_queues.get_active_queue(slimplayer.player_id)
- if event.data.startswith("button preset_") and event.data.endswith(".single"):
- preset_id = event.data.split("preset_")[1].split(".")[0]
- preset_index = int(preset_id) - 1
- if len(slimplayer.presets) >= preset_index + 1:
- preset = slimplayer.presets[preset_index]
- await self.mass.player_queues.play_media(queue.queue_id, preset.uri)
- elif event.data == "button repeat":
- if queue.repeat_mode == RepeatMode.OFF:
- repeat_mode = RepeatMode.ONE
- elif queue.repeat_mode == RepeatMode.ONE:
- repeat_mode = RepeatMode.ALL
- else:
- repeat_mode = RepeatMode.OFF
- self.mass.player_queues.set_repeat(queue.queue_id, repeat_mode)
- slimplayer.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode]
- slimplayer.signal_update()
- elif event.data == "button shuffle":
- self.mass.player_queues.set_shuffle(queue.queue_id, not queue.shuffle_enabled)
- slimplayer.extra_data["playlist shuffle"] = int(queue.shuffle_enabled)
- slimplayer.signal_update()
- elif event.data in ("button jump_fwd", "button fwd"):
- await self.mass.player_queues.next(queue.queue_id)
- elif event.data in ("button jump_rew", "button rew"):
- await self.mass.player_queues.previous(queue.queue_id)
- elif event.data.startswith("time "):
- # seek request
- _, param = event.data.split(" ", 1)
- if param.isnumeric():
- await self.mass.player_queues.seek(queue.queue_id, int(param))
- self.logger.debug("CLI Event: %s", event.data)
-
- def _handle_client_sync(self, slimplayer: SlimClient) -> None:
- """Synchronize audio of a sync slimplayer."""
- player = self.mass.players.get(slimplayer.player_id)
- sync_master_id = player.synced_to
- if not sync_master_id:
- # we only correct sync members, not the sync master itself
- return
- if not (sync_master := self.slimproto.get_player(sync_master_id)):
- return # just here as a guard as bad things can happen
-
- if sync_master.state != SlimPlayerState.PLAYING:
- return
- if slimplayer.state != SlimPlayerState.PLAYING:
- return
- if slimplayer.player_id not in self._sync_playpoints:
- return
-
- # we collect a few playpoints of the player to determine
- # average lag/drift so we can adjust accordingly
- sync_playpoints = self._sync_playpoints[slimplayer.player_id]
-
- now = time.time()
- if now < self._do_not_resync_before[slimplayer.player_id]:
- return
-
- last_playpoint = sync_playpoints[-1] if sync_playpoints else None
- if last_playpoint and (now - last_playpoint.timestamp) > 10:
- # last playpoint is too old, invalidate
- sync_playpoints.clear()
- if last_playpoint and last_playpoint.sync_master != sync_master.player_id:
- # this should not happen, but just in case
- sync_playpoints.clear()
-
- diff = int(
- self._get_corrected_elapsed_milliseconds(sync_master)
- - self._get_corrected_elapsed_milliseconds(slimplayer)
- )
-
- # ignore unexpected spikes
- if (
- sync_playpoints
- and abs(statistics.fmean(x.diff for x in sync_playpoints)) > DEVIATION_JUMP_IGNORE
- ):
- return
-
- # we can now append the current playpoint to our list
- sync_playpoints.append(SyncPlayPoint(now, sync_master.player_id, diff))
-
- min_req_playpoints = 2 if sync_master.elapsed_seconds < 2 else MIN_REQ_PLAYPOINTS
- if len(sync_playpoints) < min_req_playpoints:
- return
-
- # get the average diff
- avg_diff = statistics.fmean(x.diff for x in sync_playpoints)
- delta = int(abs(avg_diff))
-
- if delta < MIN_DEVIATION_ADJUST:
- return
-
- # resync the player by skipping ahead or pause for x amount of (milli)seconds
- sync_playpoints.clear()
- self._do_not_resync_before[player.player_id] = now + 5
- if avg_diff > MAX_SKIP_AHEAD_MS:
- # player lagging behind more than MAX_SKIP_AHEAD_MS,
- # we need to correct the sync_master
- self.logger.debug("%s resync: pauseFor %sms", sync_master.name, delta)
- self.mass.create_task(sync_master.pause_for(delta))
- elif avg_diff > 0:
- # handle player lagging behind, fix with skip_ahead
- self.logger.debug("%s resync: skipAhead %sms", player.display_name, delta)
- self.mass.create_task(slimplayer.skip_over(delta))
- else:
- # handle player is drifting too far ahead, use pause_for to adjust
- self.logger.debug("%s resync: pauseFor %sms", player.display_name, delta)
- self.mass.create_task(slimplayer.pause_for(delta))
-
- async def _handle_buffer_ready(self, slimplayer: SlimClient) -> None:
- """Handle buffer ready event, player has buffered a (new) track.
-
- Only used when autoplay=0 for coordinated start of synced players.
- """
- player = self.mass.players.get(slimplayer.player_id)
- if player.synced_to:
- # unpause of sync child is handled by sync master
- return
- if not player.group_childs:
- # not a sync group, continue
- await slimplayer.unpause_at(slimplayer.jiffies)
- return
- count = 0
- while count < 40:
- childs_total = 0
- childs_ready = 0
- await asyncio.sleep(0.2)
- for sync_child in self._get_sync_clients(player.player_id):
- childs_total += 1
- if sync_child.state == SlimPlayerState.BUFFER_READY:
- childs_ready += 1
- if childs_total == childs_ready:
- break
-
- # all child's ready (or timeout) - start play
- async with TaskManager(self.mass) as tg:
- for _client in self._get_sync_clients(player.player_id):
- self._sync_playpoints.setdefault(
- _client.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS)
- ).clear()
- # NOTE: Officially you should do an unpause_at based on the player timestamp
- # but I did not have any good results with that.
- # Instead just start playback on all players and let the sync logic work out
- # the delays etc.
- self._do_not_resync_before[_client.player_id] = time.time() + 1
- tg.create_task(_client.pause_for(200))
-
- async def _handle_connected(self, slimplayer: SlimClient) -> None:
- """Handle a slimplayer connected event."""
- player_id = slimplayer.player_id
- self.logger.info("Player %s connected", slimplayer.name or player_id)
- # set presets and display
- await self._set_preset_items(slimplayer)
- await self._set_display(slimplayer)
- # update all attributes
- await self._handle_player_update(slimplayer)
- # restore volume and power state
- if last_state := await self.mass.cache.get(player_id, base_key=CACHE_KEY_PREV_STATE):
- init_power = last_state[0]
- init_volume = last_state[1]
- else:
- init_volume = DEFAULT_PLAYER_VOLUME
- init_power = False
- await slimplayer.power(init_power)
- await slimplayer.stop()
- await slimplayer.volume_set(init_volume)
-
- def _get_sync_clients(self, player_id: str) -> Iterator[SlimClient]:
- """Get all sync clients for a player."""
- player = self.mass.players.get(player_id)
- # we need to return the player itself too
- group_child_ids = {player_id}
- group_child_ids.update(player.group_childs)
- for child_id in group_child_ids:
- if slimplayer := self.slimproto.get_player(child_id):
- yield slimplayer
-
- def _get_corrected_elapsed_milliseconds(self, slimplayer: SlimClient) -> int:
- """Return corrected elapsed milliseconds."""
- sync_delay = self.mass.config.get_raw_player_config_value(
- slimplayer.player_id, CONF_SYNC_ADJUST, 0
- )
- return slimplayer.elapsed_milliseconds - sync_delay
-
- async def _set_preset_items(self, slimplayer: SlimClient) -> None:
- """Set the presets for a player."""
- preset_items: list[SlimPreset] = []
- for preset_index in range(1, 11):
- if preset_conf := self.mass.config.get_raw_player_config_value(
- slimplayer.player_id, f"preset_{preset_index}"
- ):
- try:
- media_item = await self.mass.music.get_item_by_uri(preset_conf)
- preset_items.append(
- SlimPreset(
- uri=media_item.uri,
- text=media_item.name,
- icon=self.mass.metadata.get_image_url(media_item.image),
- )
- )
- except MusicAssistantError:
- # non-existing media item or some other edge case
- preset_items.append(
- SlimPreset(
- uri=f"preset_{preset_index}",
- text=f"ERROR <preset {preset_index}>",
- icon="",
- )
- )
- else:
- break
- slimplayer.presets = preset_items
-
- async def _set_display(self, slimplayer: SlimClient) -> None:
- """Set the display config for a player."""
- display_enabled = self.mass.config.get_raw_player_config_value(
- slimplayer.player_id,
- CONF_ENTRY_DISPLAY.key,
- CONF_ENTRY_DISPLAY.default_value,
- )
- visualization = self.mass.config.get_raw_player_config_value(
- slimplayer.player_id,
- CONF_ENTRY_VISUALIZATION.key,
- CONF_ENTRY_VISUALIZATION.default_value,
- )
- await slimplayer.configure_display(
- visualisation=SlimVisualisationType(visualization), disabled=not display_enabled
- )
-
- async def _serve_multi_client_stream(self, request: web.Request) -> web.Response:
- """Serve the multi-client flow stream audio to a player."""
- player_id = request.query.get("player_id")
- fmt = request.query.get("fmt")
- child_player_id = request.query.get("child_player_id")
-
- if not self.mass.players.get(player_id):
- raise web.HTTPNotFound(reason=f"Unknown player: {player_id}")
-
- if not (child_player := self.mass.players.get(child_player_id)):
- raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")
-
- if not (stream := self._multi_streams.get(player_id, None)) or stream.done:
- raise web.HTTPNotFound(f"There is no active stream for {player_id}!")
-
- resp = web.StreamResponse(
- status=200,
- reason="OK",
- headers={
- "Content-Type": f"audio/{fmt}",
- },
- )
- await resp.prepare(request)
-
- # return early if this is not a GET request
- if request.method != "GET":
- return resp
-
- # all checks passed, start streaming!
- self.logger.debug(
- "Start serving multi-client flow audio stream to %s",
- child_player.display_name,
- )
- output_format = AudioFormat(content_type=ContentType.try_parse(fmt))
- async for chunk in stream.get_stream(
- output_format=output_format,
- filter_params=get_player_filter_params(
- self.mass, child_player_id, stream.audio_format, output_format
- )
- if child_player_id
- else None,
- ):
- try:
- await resp.write(chunk)
- except (BrokenPipeError, ConnectionResetError, ConnectionError):
- # race condition
- break
-
- return resp
+++ /dev/null
-<svg viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
-<path d="M5.49875 4.68702C5.49875 4.30759 5.79351 4 6.15711 4H13.8379C14.2015 4 14.4963 4.30759 14.4963 4.68702V16.6527C14.4963 16.7104 14.4931 16.7677 14.4869 16.8244C14.355 18.0406 12.8466 19 11.5885 19C10.1621 19 8.6808 18.2926 8.6808 16.6527C8.6808 15.0128 9.99751 14.1908 11.3691 14.1908C12.018 14.1908 12.6531 14.3811 13.1796 14.6935V8.23664C13.1796 7.92045 12.9339 7.66412 12.6309 7.66412H7.36409C7.06109 7.66412 6.81546 7.92045 6.81546 8.23664V16.6527C6.81546 17.9491 5.22444 19 3.90773 19C2.4813 19 1 18.2926 1 16.6527C1 15.0128 2.31671 14.1908 3.68828 14.1908C4.33718 14.1908 4.97227 14.3811 5.49875 14.6935V4.68702Z" fill="#2BAAA6"/>
-<path fill-rule="evenodd" clip-rule="evenodd" d="M16.5028 7.51765C16.8893 7.16236 17.4787 7.20133 17.8191 7.6047C18.7211 8.67335 19.2693 10.0783 19.2693 11.6145C19.2693 13.1507 18.7211 14.5557 17.8191 15.6243C17.4787 16.0277 16.8893 16.0667 16.5028 15.7114C16.1163 15.3561 16.0789 14.7411 16.4194 14.3377C17.0329 13.6108 17.404 12.6591 17.404 11.6145C17.404 10.5699 17.0329 9.61818 16.4194 8.89133C16.0789 8.48796 16.1163 7.87294 16.5028 7.51765Z" fill="#2BAAA6"/>
-<path fill-rule="evenodd" clip-rule="evenodd" d="M19.2531 4.87572C19.638 4.51854 20.2275 4.55463 20.5698 4.95632C22.081 6.72989 23 9.06177 23 11.6145C23 14.1672 22.081 16.4991 20.5698 18.2727C20.2275 18.6744 19.638 18.7105 19.2531 18.3533C18.8681 17.9961 18.8336 17.3809 19.1758 16.9792C20.3965 15.5466 21.1347 13.6702 21.1347 11.6145C21.1347 9.55882 20.3965 7.68239 19.1758 6.24978C18.8336 5.84809 18.8681 5.2329 19.2531 4.87572Z" fill="#2BAAA6"/>
-</svg>
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<svg
- viewBox="0 0 51.2 51.2"
- fill="none"
- version="1.1"
- id="svg3"
- sodipodi:docname="icon_monochrome.svg"
- width="512"
- height="512"
- inkscape:version="1.3.2 (091e20e, 2023-11-25, custom)"
- xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
- xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
- xmlns="http://www.w3.org/2000/svg"
- xmlns:svg="http://www.w3.org/2000/svg">
- <defs
- id="defs3" />
- <sodipodi:namedview
- id="namedview3"
- pagecolor="#ffffff"
- bordercolor="#000000"
- borderopacity="0.25"
- inkscape:showpageshadow="2"
- inkscape:pageopacity="0.0"
- inkscape:pagecheckerboard="0"
- inkscape:deskcolor="#d1d1d1"
- inkscape:zoom="1.7324219"
- inkscape:cx="254.26832"
- inkscape:cy="256"
- inkscape:window-width="1920"
- inkscape:window-height="1129"
- inkscape:window-x="-8"
- inkscape:window-y="-8"
- inkscape:window-maximized="1"
- inkscape:current-layer="svg3" />
- <g
- id="g3"
- transform="matrix(2.3276212,0,0,2.3276212,-2.3276212,-0.9104848)"
- style="fill:#ffffff;fill-opacity:1">
- <path
- d="M 5.49875,4.68702 C 5.49875,4.30759 5.79351,4 6.15711,4 h 7.68079 c 0.3636,0 0.6584,0.30759 0.6584,0.68702 V 16.6527 c 0,0.0577 -0.0032,0.115 -0.0094,0.1717 C 14.355,18.0406 12.8466,19 11.5885,19 10.1621,19 8.6808,18.2926 8.6808,16.6527 c 0,-1.6399 1.31671,-2.4619 2.6883,-2.4619 0.6489,0 1.284,0.1903 1.8105,0.5027 V 8.23664 c 0,-0.31619 -0.2457,-0.57252 -0.5487,-0.57252 H 7.36409 c -0.303,0 -0.54863,0.25633 -0.54863,0.57252 V 16.6527 C 6.81546,17.9491 5.22444,19 3.90773,19 2.4813,19 1,18.2926 1,16.6527 c 0,-1.6399 1.31671,-2.4619 2.68828,-2.4619 0.6489,0 1.28399,0.1903 1.81047,0.5027 z"
- fill="#2baaa6"
- id="path1"
- style="fill:#ffffff;fill-opacity:1" />
- <path
- fill-rule="evenodd"
- clip-rule="evenodd"
- d="m 16.5028,7.51765 c 0.3865,-0.35529 0.9759,-0.31632 1.3163,0.08705 0.902,1.06865 1.4502,2.4736 1.4502,4.0098 0,1.5362 -0.5482,2.9412 -1.4502,4.0098 -0.3404,0.4034 -0.9298,0.4424 -1.3163,0.0871 -0.3865,-0.3553 -0.4239,-0.9703 -0.0834,-1.3737 0.6135,-0.7269 0.9846,-1.6786 0.9846,-2.7232 0,-1.0446 -0.3711,-1.99632 -0.9846,-2.72317 -0.3405,-0.40337 -0.3031,-1.01839 0.0834,-1.37368 z"
- fill="#2baaa6"
- id="path2"
- style="fill:#ffffff;fill-opacity:1" />
- <path
- fill-rule="evenodd"
- clip-rule="evenodd"
- d="M 19.2531,4.87572 C 19.638,4.51854 20.2275,4.55463 20.5698,4.95632 22.081,6.72989 23,9.06177 23,11.6145 c 0,2.5527 -0.919,4.8846 -2.4302,6.6582 -0.3423,0.4017 -0.9318,0.4378 -1.3167,0.0806 -0.385,-0.3572 -0.4195,-0.9724 -0.0773,-1.3741 1.2207,-1.4326 1.9589,-3.309 1.9589,-5.3647 0,-2.05568 -0.7382,-3.93211 -1.9589,-5.36472 C 18.8336,5.84809 18.8681,5.2329 19.2531,4.87572 Z"
- fill="#2baaa6"
- id="path3"
- style="fill:#ffffff;fill-opacity:1" />
- </g>
-</svg>
+++ /dev/null
-{
- "type": "player",
- "domain": "slimproto",
- "name": "Slimproto (Squeezebox players)",
- "description": "Support for slimproto based players (e.g. squeezebox, squeezelite).",
- "codeowners": ["@music-assistant"],
- "requirements": ["aioslimproto==3.1.0"],
- "documentation": "https://music-assistant.io/player-support/slimproto/",
- "multi_instance": false,
- "builtin": false
-}
+++ /dev/null
-"""Implementation of a simple multi-client stream task/job."""
-
-import asyncio
-import logging
-from collections.abc import AsyncGenerator
-from contextlib import suppress
-
-from music_assistant_models.media_items import AudioFormat
-
-from music_assistant.helpers.audio import get_ffmpeg_stream
-from music_assistant.helpers.util import empty_queue
-
-LOGGER = logging.getLogger(__name__)
-
-
-class MultiClientStream:
- """Implementation of a simple multi-client (audio) stream task/job."""
-
- def __init__(
- self,
- audio_source: AsyncGenerator[bytes, None],
- audio_format: AudioFormat,
- expected_clients: int = 0,
- ) -> None:
- """Initialize MultiClientStream."""
- self.audio_source = audio_source
- self.audio_format = audio_format
- self.subscribers: list[asyncio.Queue] = []
- self.expected_clients = expected_clients
- self.task = asyncio.create_task(self._runner())
-
- @property
- def done(self) -> bool:
- """Return if this stream is already done."""
- return self.task.done()
-
- async def stop(self) -> None:
- """Stop/cancel the stream."""
- if self.done:
- return
- self.task.cancel()
- with suppress(asyncio.CancelledError):
- await self.task
- for sub_queue in list(self.subscribers):
- empty_queue(sub_queue)
-
- async def get_stream(
- self,
- output_format: AudioFormat,
- filter_params: list[str] | None = None,
- ) -> AsyncGenerator[bytes, None]:
- """Get (client specific encoded) ffmpeg stream."""
- async for chunk in get_ffmpeg_stream(
- audio_input=self.subscribe_raw(),
- input_format=self.audio_format,
- output_format=output_format,
- filter_params=filter_params,
- ):
- yield chunk
-
- async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
- """Subscribe to the raw/unaltered audio stream."""
- try:
- queue = asyncio.Queue(2)
- self.subscribers.append(queue)
- while True:
- chunk = await queue.get()
- if chunk == b"":
- break
- yield chunk
- finally:
- with suppress(ValueError):
- self.subscribers.remove(queue)
-
- async def _runner(self) -> None:
- """Run the stream for the given audio source."""
- expected_clients = self.expected_clients or 1
- # wait for first/all subscriber
- count = 0
- while count < 50:
- await asyncio.sleep(0.1)
- count += 1
- if len(self.subscribers) >= expected_clients:
- break
- LOGGER.debug(
- "Starting multi-client stream with %s/%s clients",
- len(self.subscribers),
- self.expected_clients,
- )
- async for chunk in self.audio_source:
- fail_count = 0
- while len(self.subscribers) == 0:
- await asyncio.sleep(0.1)
- fail_count += 1
- if fail_count > 50:
- LOGGER.warning("No clients connected, stopping stream")
- return
- await asyncio.gather(
- *[sub.put(chunk) for sub in self.subscribers], return_exceptions=True
- )
- # EOF: send empty chunk
- await asyncio.gather(*[sub.put(b"") for sub in self.subscribers], return_exceptions=True)
--- /dev/null
+"""Base/builtin provider with support for players using slimproto."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import statistics
+import time
+from collections import deque
+from collections.abc import Iterator
+from dataclasses import dataclass
+from typing import TYPE_CHECKING
+
+from aiohttp import web
+from aioslimproto.client import PlayerState as SlimPlayerState
+from aioslimproto.client import SlimClient
+from aioslimproto.client import TransitionType as SlimTransition
+from aioslimproto.models import EventType as SlimEventType
+from aioslimproto.models import Preset as SlimPreset
+from aioslimproto.models import VisualisationType as SlimVisualisationType
+from aioslimproto.server import SlimServer
+from music_assistant_models.config_entries import (
+ ConfigEntry,
+ ConfigValueOption,
+ ConfigValueType,
+ PlayerConfig,
+)
+from music_assistant_models.enums import (
+ ConfigEntryType,
+ ContentType,
+ MediaType,
+ PlayerFeature,
+ PlayerState,
+ PlayerType,
+ ProviderFeature,
+ RepeatMode,
+)
+from music_assistant_models.errors import MusicAssistantError, SetupFailedError
+from music_assistant_models.media_items import AudioFormat
+from music_assistant_models.player import DeviceInfo, Player, PlayerMedia
+
+from music_assistant.constants import (
+ CONF_CROSSFADE,
+ CONF_CROSSFADE_DURATION,
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_DEPRECATED_EQ_BASS,
+ CONF_ENTRY_DEPRECATED_EQ_MID,
+ CONF_ENTRY_DEPRECATED_EQ_TREBLE,
+ CONF_ENTRY_HTTP_PROFILE_FORCED_2,
+ CONF_ENTRY_OUTPUT_CHANNELS,
+ CONF_ENTRY_OUTPUT_CODEC,
+ CONF_ENTRY_SYNC_ADJUST,
+ CONF_PORT,
+ CONF_SYNC_ADJUST,
+ DEFAULT_PCM_FORMAT,
+ VERBOSE_LOG_LEVEL,
+ create_sample_rates_config_entry,
+)
+from music_assistant.helpers.audio import get_ffmpeg_stream, get_player_filter_params
+from music_assistant.helpers.util import TaskManager
+from music_assistant.models.player_provider import PlayerProvider
+from music_assistant.providers.player_group import PlayerGroupProvider
+
+from .multi_client_stream import MultiClientStream
+
+if TYPE_CHECKING:
+ from aioslimproto.models import SlimEvent
+ from music_assistant_models.config_entries import ProviderConfig
+ from music_assistant_models.provider import ProviderManifest
+
+ from music_assistant import MusicAssistant
+ from music_assistant.models import ProviderInstanceType
+
+
+CACHE_KEY_PREV_STATE = "slimproto_prev_state"
+
+
+STATE_MAP = {
+ SlimPlayerState.BUFFERING: PlayerState.PLAYING,
+ SlimPlayerState.BUFFER_READY: PlayerState.PLAYING,
+ SlimPlayerState.PAUSED: PlayerState.PAUSED,
+ SlimPlayerState.PLAYING: PlayerState.PLAYING,
+ SlimPlayerState.STOPPED: PlayerState.IDLE,
+}
+REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
+
+# sync constants
+MIN_DEVIATION_ADJUST = 8 # 5 milliseconds
+MIN_REQ_PLAYPOINTS = 8 # we need at least 8 measurements
+DEVIATION_JUMP_IGNORE = 500 # ignore a sudden unrealistic jump
+MAX_SKIP_AHEAD_MS = 800 # 0.8 seconds
+
+
+@dataclass
+class SyncPlayPoint:
+ """Simple structure to describe a Sync Playpoint."""
+
+ timestamp: float
+ sync_master: str
+ diff: int
+
+
+CONF_CLI_TELNET_PORT = "cli_telnet_port"
+CONF_CLI_JSON_PORT = "cli_json_port"
+CONF_DISCOVERY = "discovery"
+CONF_DISPLAY = "display"
+CONF_VISUALIZATION = "visualization"
+
+DEFAULT_PLAYER_VOLUME = 20
+DEFAULT_SLIMPROTO_PORT = 3483
+DEFAULT_VISUALIZATION = SlimVisualisationType.NONE
+
+
+CONF_ENTRY_DISPLAY = ConfigEntry(
+ key=CONF_DISPLAY,
+ type=ConfigEntryType.BOOLEAN,
+ default_value=False,
+ required=False,
+ label="Enable display support",
+ description="Enable/disable native display support on squeezebox or squeezelite32 hardware.",
+ category="advanced",
+)
+CONF_ENTRY_VISUALIZATION = ConfigEntry(
+ key=CONF_VISUALIZATION,
+ type=ConfigEntryType.STRING,
+ default_value=DEFAULT_VISUALIZATION,
+ options=[
+ ConfigValueOption(title=x.name.replace("_", " ").title(), value=x.value)
+ for x in SlimVisualisationType
+ ],
+ required=False,
+ label="Visualization type",
+ description="The type of visualization to show on the display "
+ "during playback if the device supports this.",
+ category="advanced",
+ depends_on=CONF_DISPLAY,
+)
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ return SlimprotoProvider(mass, manifest, config)
+
+
+async def get_config_entries(
+ mass: MusicAssistant,
+ instance_id: str | None = None,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to setup this provider.
+
+ instance_id: id of an existing provider instance (None if new instance setup).
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ # ruff: noqa: ARG001
+ return (
+ ConfigEntry(
+ key=CONF_CLI_TELNET_PORT,
+ type=ConfigEntryType.INTEGER,
+ default_value=9090,
+ label="Classic Squeezebox CLI Port",
+ description="Some slimproto based players require the presence of the telnet CLI "
+ " to request more information. \n\n"
+ "By default this CLI is hosted on port 9090 but some players also accept "
+ "a different port. Set to 0 to disable this functionality.\n\n"
+ "Commands allowed on this interface are very limited and just enough to satisfy "
+ "player compatibility, so security risks are minimized to practically zero."
+ "You may safely disable this option if you have no players that rely on this feature "
+ "or you dont care about the additional metadata.",
+ category="advanced",
+ ),
+ ConfigEntry(
+ key=CONF_CLI_JSON_PORT,
+ type=ConfigEntryType.INTEGER,
+ default_value=9000,
+ label="JSON-RPC CLI/API Port",
+ description="Some slimproto based players require the presence of the JSON-RPC "
+ "API from LMS to request more information. For example to fetch the album cover "
+ "and other metadata. \n\n"
+ "This JSON-RPC API is compatible with Logitech Media Server but not all commands "
+ "are implemented. Just enough to satisfy player compatibility. \n\n"
+ "By default this JSON CLI is hosted on port 9000 but most players also accept "
+ "it on a different port. Set to 0 to disable this functionality.\n\n"
+ "You may safely disable this option if you have no players that rely on this feature "
+ "or you dont care about the additional metadata.",
+ category="advanced",
+ ),
+ ConfigEntry(
+ key=CONF_DISCOVERY,
+ type=ConfigEntryType.BOOLEAN,
+ default_value=True,
+ label="Enable Discovery server",
+ description="Broadcast discovery packets for slimproto clients to automatically "
+ "discover and connect to this server. \n\n"
+ "You may want to disable this feature if you are running multiple slimproto servers "
+ "on your network and/or you don't want clients to auto connect to this server.",
+ category="advanced",
+ ),
+ ConfigEntry(
+ key=CONF_PORT,
+ type=ConfigEntryType.INTEGER,
+ default_value=DEFAULT_SLIMPROTO_PORT,
+ label="Slimproto port",
+ description="The TCP/UDP port to run the slimproto sockets server. "
+ "The default is 3483 and using a different port is not supported by "
+ "hardware squeezebox players. Only adjust this port if you want to "
+ "use other slimproto based servers side by side with (squeezelite) software players.",
+ category="advanced",
+ ),
+ )
+
+
+class SlimprotoProvider(PlayerProvider):
+ """Base/builtin provider for players using the SLIM protocol (aka slimproto)."""
+
+ slimproto: SlimServer
+ _sync_playpoints: dict[str, deque[SyncPlayPoint]]
+ _do_not_resync_before: dict[str, float]
+ _multi_streams: dict[str, MultiClientStream]
+
+ @property
+ def supported_features(self) -> set[ProviderFeature]:
+ """Return the features supported by this Provider."""
+ return {ProviderFeature.SYNC_PLAYERS}
+
+ async def handle_async_init(self) -> None:
+ """Handle async initialization of the provider."""
+ self._sync_playpoints = {}
+ self._do_not_resync_before = {}
+ self._multi_streams = {}
+ control_port = self.config.get_value(CONF_PORT)
+ telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT)
+ json_port = self.config.get_value(CONF_CLI_JSON_PORT)
+ # silence aioslimproto logger a bit
+ if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
+ logging.getLogger("aioslimproto").setLevel(logging.DEBUG)
+ else:
+ logging.getLogger("aioslimproto").setLevel(self.logger.level + 10)
+ self.slimproto = SlimServer(
+ cli_port=telnet_port or None,
+ cli_port_json=json_port or None,
+ ip_address=self.mass.streams.publish_ip,
+ name="Music Assistant",
+ control_port=control_port,
+ )
+ # start slimproto socket server
+ try:
+ await self.slimproto.start()
+ except OSError as err:
+ raise SetupFailedError(
+ "Unable to start the Slimproto server - "
+ "is one of the required TCP ports already taken ?"
+ ) from err
+
+ async def loaded_in_mass(self) -> None:
+ """Call after the provider has been loaded."""
+ await super().loaded_in_mass()
+ self.slimproto.subscribe(self._client_callback)
+ self.mass.streams.register_dynamic_route(
+ "/slimproto/multi", self._serve_multi_client_stream
+ )
+ # it seems that WiiM devices do not use the json rpc port that is broadcasted
+ # in the discovery info but instead they just assume that the jsonrpc endpoint
+ # lives on the same server as stream URL. So we need to provide a jsonrpc.js
+ # endpoint that just redirects to the jsonrpc handler within the slimproto package.
+ self.mass.streams.register_dynamic_route(
+ "/jsonrpc.js", self.slimproto.cli._handle_jsonrpc_client
+ )
+
+ async def unload(self, is_removed: bool = False) -> None:
+ """Handle close/cleanup of the provider."""
+ self.mass.streams.unregister_dynamic_route("/slimproto/multi")
+ self.mass.streams.unregister_dynamic_route("/jsonrpc.js")
+ await self.slimproto.stop()
+
+ async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
+ """Return all (provider/player specific) Config Entries for the given player (if any)."""
+ base_entries = await super().get_player_config_entries(player_id)
+ if slimclient := self.slimproto.get_player(player_id):
+ max_sample_rate = int(slimclient.max_sample_rate)
+ else:
+ # player not (yet) connected? use default
+ max_sample_rate = 48000
+ # create preset entries (for players that support it)
+ preset_entries = ()
+ presets = []
+ async for playlist in self.mass.music.playlists.iter_library_items(True):
+ presets.append(ConfigValueOption(playlist.name, playlist.uri))
+ async for radio in self.mass.music.radio.iter_library_items(True):
+ presets.append(ConfigValueOption(radio.name, radio.uri))
+ preset_count = 10
+ preset_entries = tuple(
+ ConfigEntry(
+ key=f"preset_{index}",
+ type=ConfigEntryType.STRING,
+ options=presets,
+ label=f"Preset {index}",
+ description="Assign a playable item to the player's preset. "
+ "Only supported on real squeezebox hardware or jive(lite) based emulators.",
+ category="presets",
+ required=False,
+ )
+ for index in range(1, preset_count + 1)
+ )
+ return (
+ base_entries
+ + preset_entries
+ + (
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_DEPRECATED_EQ_BASS,
+ CONF_ENTRY_DEPRECATED_EQ_MID,
+ CONF_ENTRY_DEPRECATED_EQ_TREBLE,
+ CONF_ENTRY_OUTPUT_CHANNELS,
+ CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_OUTPUT_CODEC,
+ CONF_ENTRY_SYNC_ADJUST,
+ CONF_ENTRY_DISPLAY,
+ CONF_ENTRY_VISUALIZATION,
+ CONF_ENTRY_HTTP_PROFILE_FORCED_2,
+ create_sample_rates_config_entry(
+ max_sample_rate=max_sample_rate, max_bit_depth=24, safe_max_bit_depth=24
+ ),
+ )
+ )
+
+ async def on_player_config_change(self, config: PlayerConfig, changed_keys: set[str]) -> None:
+ """Call (by config manager) when the configuration of a player changes."""
+ if slimplayer := self.slimproto.get_player(config.player_id):
+ await self._set_preset_items(slimplayer)
+ await self._set_display(slimplayer)
+ await super().on_player_config_change(config, changed_keys)
+
+ async def cmd_stop(self, player_id: str) -> None:
+ """Send STOP command to given player."""
+ # forward command to player and any connected sync members
+ async with TaskManager(self.mass) as tg:
+ for slimplayer in self._get_sync_clients(player_id):
+ tg.create_task(slimplayer.stop())
+
+ async def cmd_play(self, player_id: str) -> None:
+ """Send PLAY command to given player."""
+ # forward command to player and any connected sync members
+ async with TaskManager(self.mass) as tg:
+ for slimplayer in self._get_sync_clients(player_id):
+ tg.create_task(slimplayer.play())
+
+ async def play_media(
+ self,
+ player_id: str,
+ media: PlayerMedia,
+ ) -> None:
+ """Handle PLAY MEDIA on given player."""
+ player = self.mass.players.get(player_id)
+ if player.synced_to:
+ msg = "A synced player cannot receive play commands directly"
+ raise RuntimeError(msg)
+
+ if not player.group_childs:
+ slimplayer = self.slimproto.get_player(player_id)
+ # simple, single-player playback
+ await self._handle_play_url(
+ slimplayer,
+ url=media.uri,
+ media=media,
+ send_flush=True,
+ auto_play=False,
+ )
+ return
+
+ # this is a syncgroup, we need to handle this with a multi client stream
+ master_audio_format = AudioFormat(
+ content_type=DEFAULT_PCM_FORMAT.content_type,
+ sample_rate=DEFAULT_PCM_FORMAT.sample_rate,
+ bit_depth=DEFAULT_PCM_FORMAT.bit_depth,
+ )
+ if media.media_type == MediaType.ANNOUNCEMENT:
+ # special case: stream announcement
+ audio_source = self.mass.streams.get_announcement_stream(
+ media.custom_data["url"],
+ output_format=master_audio_format,
+ use_pre_announce=media.custom_data["use_pre_announce"],
+ )
+ elif media.media_type == MediaType.PLUGIN_SOURCE:
+ # special case: plugin source stream
+ audio_source = self.mass.streams.get_plugin_source_stream(
+ plugin_source_id=media.custom_data["source_id"],
+ output_format=master_audio_format,
+ # need to pass player_id from the PlayerMedia object
+ # because this could have been a group
+ player_id=media.custom_data["player_id"],
+ )
+ elif media.queue_id.startswith("ugp_"):
+ # special case: UGP stream
+ ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
+ ugp_stream = ugp_provider.ugp_streams[media.queue_id]
+ # Filter is later applied in MultiClientStream
+ audio_source = ugp_stream.get_stream(master_audio_format, filter_params=None)
+ elif media.media_type == MediaType.RADIO:
+ # use single item stream request for radio streams
+ audio_source = self.mass.streams.get_queue_item_stream(
+ queue_item=self.mass.player_queues.get_item(media.queue_id, media.queue_item_id),
+ pcm_format=master_audio_format,
+ )
+ elif media.queue_id and media.queue_item_id:
+ # regular queue stream request
+ audio_source = self.mass.streams.get_queue_flow_stream(
+ queue=self.mass.player_queues.get(media.queue_id),
+ start_queue_item=self.mass.player_queues.get_item(
+ media.queue_id, media.queue_item_id
+ ),
+ pcm_format=master_audio_format,
+ )
+ else:
+ # assume url or some other direct path
+ # NOTE: this will fail if its an uri not playable by ffmpeg
+ audio_source = get_ffmpeg_stream(
+ audio_input=media.uri,
+ input_format=AudioFormat(ContentType.try_parse(media.uri)),
+ output_format=master_audio_format,
+ )
+ # start the stream task
+ self._multi_streams[player_id] = stream = MultiClientStream(
+ audio_source=audio_source, audio_format=master_audio_format
+ )
+ base_url = f"{self.mass.streams.base_url}/slimproto/multi?player_id={player_id}&fmt=flac"
+
+ # forward to downstream play_media commands
+ async with TaskManager(self.mass) as tg:
+ for slimplayer in self._get_sync_clients(player_id):
+ url = f"{base_url}&child_player_id={slimplayer.player_id}"
+ stream.expected_clients += 1
+ tg.create_task(
+ self._handle_play_url(
+ slimplayer,
+ url=url,
+ media=media,
+ send_flush=True,
+ auto_play=False,
+ )
+ )
+
+ async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
+ """Handle enqueuing of the next queue item on the player."""
+ if not (slimplayer := self.slimproto.get_player(player_id)):
+ return
+ await self._handle_play_url(
+ slimplayer,
+ url=media.uri,
+ media=media,
+ enqueue=True,
+ send_flush=False,
+ auto_play=True,
+ )
+
+ async def _handle_play_url(
+ self,
+ slimplayer: SlimClient,
+ url: str,
+ media: PlayerMedia,
+ enqueue: bool = False,
+ send_flush: bool = True,
+ auto_play: bool = False,
+ ) -> None:
+ """Handle playback of an url on slimproto player(s)."""
+ player_id = slimplayer.player_id
+ if crossfade := await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE):
+ transition_duration = await self.mass.config.get_player_config_value(
+ player_id, CONF_CROSSFADE_DURATION
+ )
+ else:
+ transition_duration = 0
+
+ metadata = {
+ "item_id": media.uri,
+ "title": media.title,
+ "album": media.album,
+ "artist": media.artist,
+ "image_url": media.image_url,
+ "duration": media.duration,
+ "queue_id": media.queue_id,
+ "queue_item_id": media.queue_item_id,
+ }
+ if queue := self.mass.player_queues.get(media.queue_id):
+ slimplayer.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode]
+ slimplayer.extra_data["playlist shuffle"] = int(queue.shuffle_enabled)
+ await slimplayer.play_url(
+ url=url,
+ mime_type=f"audio/{url.split('.')[-1].split('?')[0]}",
+ metadata=metadata,
+ enqueue=enqueue,
+ send_flush=send_flush,
+ transition=SlimTransition.CROSSFADE if crossfade else SlimTransition.NONE,
+ transition_duration=transition_duration,
+ # if autoplay=False playback will not start automatically
+ # instead 'buffer ready' will be called when the buffer is full
+ # to coordinate a start of multiple synced players
+ autostart=auto_play,
+ )
+ # if queue is set to single track repeat,
+ # immediately set this track as the next
+ # this prevents race conditions with super short audio clips (on single repeat)
+ # https://github.com/music-assistant/hass-music-assistant/issues/2059
+ if queue and queue.repeat_mode == RepeatMode.ONE:
+ self.mass.call_later(
+ 0.2,
+ slimplayer.play_url(
+ url=url,
+ mime_type=f"audio/{url.split('.')[-1].split('?')[0]}",
+ metadata=metadata,
+ enqueue=True,
+ send_flush=False,
+ transition=SlimTransition.CROSSFADE if crossfade else SlimTransition.NONE,
+ transition_duration=transition_duration,
+ autostart=True,
+ ),
+ )
+
+ async def cmd_pause(self, player_id: str) -> None:
+ """Send PAUSE command to given player."""
+ # forward command to player and any connected sync members
+ async with TaskManager(self.mass) as tg:
+ for slimplayer in self._get_sync_clients(player_id):
+ tg.create_task(slimplayer.pause())
+
+ async def cmd_power(self, player_id: str, powered: bool) -> None:
+ """Send POWER command to given player."""
+ if slimplayer := self.slimproto.get_player(player_id):
+ await slimplayer.power(powered)
+ # store last state in cache
+ await self.mass.cache.set(
+ player_id, (powered, slimplayer.volume_level), base_key=CACHE_KEY_PREV_STATE
+ )
+
+ async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
+ """Send VOLUME_SET command to given player."""
+ if slimplayer := self.slimproto.get_player(player_id):
+ await slimplayer.volume_set(volume_level)
+ # store last state in cache
+ await self.mass.cache.set(
+ player_id, (slimplayer.powered, volume_level), base_key=CACHE_KEY_PREV_STATE
+ )
+
+ async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
+ """Send VOLUME MUTE command to given player."""
+ if slimplayer := self.slimproto.get_player(player_id):
+ await slimplayer.mute(muted)
+
+ async def cmd_group(self, player_id: str, target_player: str) -> None:
+ """Handle GROUP command for given player."""
+ child_player = self.mass.players.get(player_id)
+ assert child_player # guard
+ parent_player = self.mass.players.get(target_player)
+ assert parent_player # guard
+ if parent_player.synced_to:
+ raise RuntimeError("Parent player is already synced!")
+ if child_player.synced_to and child_player.synced_to != target_player:
+ raise RuntimeError("Player is already synced to another player")
+ # always make sure that the parent player is part of the sync group
+ parent_player.group_childs.append(parent_player.player_id)
+ parent_player.group_childs.append(child_player.player_id)
+ child_player.synced_to = parent_player.player_id
+ # check if we should (re)start or join a stream session
+ # TODO: support late joining of a client into an existing stream session
+ # so it doesn't need to be restarted anymore.
+ active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id)
+ if active_queue.state == PlayerState.PLAYING:
+ # playback needs to be restarted to form a new multi client stream session
+ # this could potentially be called by multiple players at the exact same time
+ # so we debounce the resync a bit here with a timer
+ self.mass.call_later(
+ 1,
+ self.mass.player_queues.resume,
+ active_queue.queue_id,
+ fade_in=False,
+ task_id=f"resume_{active_queue.queue_id}",
+ )
+ else:
+ # make sure that the player manager gets an update
+ self.mass.players.update(child_player.player_id, skip_forward=True)
+ self.mass.players.update(parent_player.player_id, skip_forward=True)
+
+ async def cmd_ungroup(self, player_id: str) -> None:
+ """Handle UNGROUP command for given player.
+
+ Remove the given player from any (sync)groups it currently is grouped to.
+
+ - player_id: player_id of the player to handle the command.
+ """
+ player = self.mass.players.get(player_id, raise_unavailable=True)
+ if player.synced_to:
+ group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True)
+ if player_id in group_leader.group_childs:
+ group_leader.group_childs.remove(player_id)
+ player.synced_to = None
+ if slimclient := self.slimproto.get_player(player_id):
+ await slimclient.stop()
+ # make sure that the player manager gets an update
+ self.mass.players.update(player.player_id, skip_forward=True)
+ self.mass.players.update(group_leader.player_id, skip_forward=True)
+
+ def _client_callback(
+ self,
+ event: SlimEvent,
+ ) -> None:
+ if self.mass.closing:
+ return
+
+ if event.type == SlimEventType.PLAYER_DISCONNECTED:
+ if mass_player := self.mass.players.get(event.player_id):
+ mass_player.available = False
+ self.mass.players.update(mass_player.player_id)
+ return
+
+ if not (slimplayer := self.slimproto.get_player(event.player_id)):
+ return
+
+ if event.type == SlimEventType.PLAYER_CONNECTED:
+ self.mass.create_task(self._handle_connected(slimplayer))
+ return
+
+ if event.type == SlimEventType.PLAYER_BUFFER_READY:
+ self.mass.create_task(self._handle_buffer_ready(slimplayer))
+ return
+
+ if event.type == SlimEventType.PLAYER_HEARTBEAT:
+ self._handle_player_heartbeat(slimplayer)
+ return
+
+ if event.type in (SlimEventType.PLAYER_BTN_EVENT, SlimEventType.PLAYER_CLI_EVENT):
+ self.mass.create_task(self._handle_player_cli_event(slimplayer, event))
+ return
+
+ # forward player update to MA player controller
+ self.mass.create_task(self._handle_player_update(slimplayer))
+
+ async def _handle_player_update(self, slimplayer: SlimClient) -> None:
+ """Process SlimClient update/add to Player controller."""
+ player_id = slimplayer.player_id
+ player = self.mass.players.get(player_id, raise_unavailable=False)
+ if not player:
+ # player does not yet exist, create it
+ player = Player(
+ player_id=player_id,
+ provider=self.instance_id,
+ type=PlayerType.PLAYER,
+ name=slimplayer.name,
+ available=True,
+ powered=slimplayer.powered,
+ device_info=DeviceInfo(
+ model=slimplayer.device_model,
+ ip_address=slimplayer.device_address,
+ manufacturer=slimplayer.device_type,
+ ),
+ supported_features={
+ PlayerFeature.POWER,
+ PlayerFeature.SET_MEMBERS,
+ PlayerFeature.MULTI_DEVICE_DSP,
+ PlayerFeature.VOLUME_SET,
+ PlayerFeature.PAUSE,
+ PlayerFeature.VOLUME_MUTE,
+ PlayerFeature.ENQUEUE,
+ },
+ can_group_with={self.instance_id},
+ )
+ await self.mass.players.register_or_update(player)
+
+ # update player state on player events
+ player.available = True
+ if slimplayer.current_media and (metadata := slimplayer.current_media.metadata):
+ player.current_media = PlayerMedia(
+ uri=metadata.get("item_id"),
+ title=metadata.get("title"),
+ album=metadata.get("album"),
+ artist=metadata.get("artist"),
+ image_url=metadata.get("image_url"),
+ duration=metadata.get("duration"),
+ queue_id=metadata.get("queue_id"),
+ queue_item_id=metadata.get("queue_item_id"),
+ )
+ else:
+ player.current_media = None
+ player.active_source = player.player_id
+ player.name = slimplayer.name
+ player.powered = slimplayer.powered
+ player.state = STATE_MAP[slimplayer.state]
+ player.volume_level = slimplayer.volume_level
+ player.volume_muted = slimplayer.muted
+ self.mass.players.update(player_id)
+
+ def _handle_player_heartbeat(self, slimplayer: SlimClient) -> None:
+ """Process SlimClient elapsed_time update."""
+ if slimplayer.state == SlimPlayerState.STOPPED:
+ # ignore server heartbeats when stopped
+ return
+
+ # elapsed time change on the player will be auto picked up
+ # by the player manager.
+ if not (player := self.mass.players.get(slimplayer.player_id)):
+ # race condition?!
+ return
+ player.elapsed_time = slimplayer.elapsed_seconds
+ player.elapsed_time_last_updated = time.time()
+
+ # handle sync
+ if player.synced_to:
+ self._handle_client_sync(slimplayer)
+
+ async def _handle_player_cli_event(self, slimplayer: SlimClient, event: SlimEvent) -> None:
+ """Process CLI Event."""
+ if not event.data:
+ return
+ queue = self.mass.player_queues.get_active_queue(slimplayer.player_id)
+ if event.data.startswith("button preset_") and event.data.endswith(".single"):
+ preset_id = event.data.split("preset_")[1].split(".")[0]
+ preset_index = int(preset_id) - 1
+ if len(slimplayer.presets) >= preset_index + 1:
+ preset = slimplayer.presets[preset_index]
+ await self.mass.player_queues.play_media(queue.queue_id, preset.uri)
+ elif event.data == "button repeat":
+ if queue.repeat_mode == RepeatMode.OFF:
+ repeat_mode = RepeatMode.ONE
+ elif queue.repeat_mode == RepeatMode.ONE:
+ repeat_mode = RepeatMode.ALL
+ else:
+ repeat_mode = RepeatMode.OFF
+ self.mass.player_queues.set_repeat(queue.queue_id, repeat_mode)
+ slimplayer.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode]
+ slimplayer.signal_update()
+ elif event.data == "button shuffle":
+ self.mass.player_queues.set_shuffle(queue.queue_id, not queue.shuffle_enabled)
+ slimplayer.extra_data["playlist shuffle"] = int(queue.shuffle_enabled)
+ slimplayer.signal_update()
+ elif event.data in ("button jump_fwd", "button fwd"):
+ await self.mass.player_queues.next(queue.queue_id)
+ elif event.data in ("button jump_rew", "button rew"):
+ await self.mass.player_queues.previous(queue.queue_id)
+ elif event.data.startswith("time "):
+ # seek request
+ _, param = event.data.split(" ", 1)
+ if param.isnumeric():
+ await self.mass.player_queues.seek(queue.queue_id, int(param))
+ self.logger.debug("CLI Event: %s", event.data)
+
+ def _handle_client_sync(self, slimplayer: SlimClient) -> None:
+ """Synchronize audio of a sync slimplayer."""
+ player = self.mass.players.get(slimplayer.player_id)
+ sync_master_id = player.synced_to
+ if not sync_master_id:
+ # we only correct sync members, not the sync master itself
+ return
+ if not (sync_master := self.slimproto.get_player(sync_master_id)):
+ return # just here as a guard as bad things can happen
+
+ if sync_master.state != SlimPlayerState.PLAYING:
+ return
+ if slimplayer.state != SlimPlayerState.PLAYING:
+ return
+ if slimplayer.player_id not in self._sync_playpoints:
+ return
+
+ # we collect a few playpoints of the player to determine
+ # average lag/drift so we can adjust accordingly
+ sync_playpoints = self._sync_playpoints[slimplayer.player_id]
+
+ now = time.time()
+ if now < self._do_not_resync_before[slimplayer.player_id]:
+ return
+
+ last_playpoint = sync_playpoints[-1] if sync_playpoints else None
+ if last_playpoint and (now - last_playpoint.timestamp) > 10:
+ # last playpoint is too old, invalidate
+ sync_playpoints.clear()
+ if last_playpoint and last_playpoint.sync_master != sync_master.player_id:
+ # this should not happen, but just in case
+ sync_playpoints.clear()
+
+ diff = int(
+ self._get_corrected_elapsed_milliseconds(sync_master)
+ - self._get_corrected_elapsed_milliseconds(slimplayer)
+ )
+
+ # ignore unexpected spikes
+ if (
+ sync_playpoints
+ and abs(statistics.fmean(x.diff for x in sync_playpoints)) > DEVIATION_JUMP_IGNORE
+ ):
+ return
+
+ # we can now append the current playpoint to our list
+ sync_playpoints.append(SyncPlayPoint(now, sync_master.player_id, diff))
+
+ min_req_playpoints = 2 if sync_master.elapsed_seconds < 2 else MIN_REQ_PLAYPOINTS
+ if len(sync_playpoints) < min_req_playpoints:
+ return
+
+ # get the average diff
+ avg_diff = statistics.fmean(x.diff for x in sync_playpoints)
+ delta = int(abs(avg_diff))
+
+ if delta < MIN_DEVIATION_ADJUST:
+ return
+
+ # resync the player by skipping ahead or pause for x amount of (milli)seconds
+ sync_playpoints.clear()
+ self._do_not_resync_before[player.player_id] = now + 5
+ if avg_diff > MAX_SKIP_AHEAD_MS:
+ # player lagging behind more than MAX_SKIP_AHEAD_MS,
+ # we need to correct the sync_master
+ self.logger.debug("%s resync: pauseFor %sms", sync_master.name, delta)
+ self.mass.create_task(sync_master.pause_for(delta))
+ elif avg_diff > 0:
+ # handle player lagging behind, fix with skip_ahead
+ self.logger.debug("%s resync: skipAhead %sms", player.display_name, delta)
+ self.mass.create_task(slimplayer.skip_over(delta))
+ else:
+ # handle player is drifting too far ahead, use pause_for to adjust
+ self.logger.debug("%s resync: pauseFor %sms", player.display_name, delta)
+ self.mass.create_task(slimplayer.pause_for(delta))
+
+ async def _handle_buffer_ready(self, slimplayer: SlimClient) -> None:
+ """Handle buffer ready event, player has buffered a (new) track.
+
+ Only used when autoplay=0 for coordinated start of synced players.
+ """
+ player = self.mass.players.get(slimplayer.player_id)
+ if player.synced_to:
+ # unpause of sync child is handled by sync master
+ return
+ if not player.group_childs:
+ # not a sync group, continue
+ await slimplayer.unpause_at(slimplayer.jiffies)
+ return
+ count = 0
+ while count < 40:
+ childs_total = 0
+ childs_ready = 0
+ await asyncio.sleep(0.2)
+ for sync_child in self._get_sync_clients(player.player_id):
+ childs_total += 1
+ if sync_child.state == SlimPlayerState.BUFFER_READY:
+ childs_ready += 1
+ if childs_total == childs_ready:
+ break
+
+ # all child's ready (or timeout) - start play
+ async with TaskManager(self.mass) as tg:
+ for _client in self._get_sync_clients(player.player_id):
+ self._sync_playpoints.setdefault(
+ _client.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS)
+ ).clear()
+ # NOTE: Officially you should do an unpause_at based on the player timestamp
+ # but I did not have any good results with that.
+ # Instead just start playback on all players and let the sync logic work out
+ # the delays etc.
+ self._do_not_resync_before[_client.player_id] = time.time() + 1
+ tg.create_task(_client.pause_for(200))
+
+ async def _handle_connected(self, slimplayer: SlimClient) -> None:
+ """Handle a slimplayer connected event."""
+ player_id = slimplayer.player_id
+ self.logger.info("Player %s connected", slimplayer.name or player_id)
+ # set presets and display
+ await self._set_preset_items(slimplayer)
+ await self._set_display(slimplayer)
+ # update all attributes
+ await self._handle_player_update(slimplayer)
+ # restore volume and power state
+ if last_state := await self.mass.cache.get(player_id, base_key=CACHE_KEY_PREV_STATE):
+ init_power = last_state[0]
+ init_volume = last_state[1]
+ else:
+ init_volume = DEFAULT_PLAYER_VOLUME
+ init_power = False
+ await slimplayer.power(init_power)
+ await slimplayer.stop()
+ await slimplayer.volume_set(init_volume)
+
+ def _get_sync_clients(self, player_id: str) -> Iterator[SlimClient]:
+ """Get all sync clients for a player."""
+ player = self.mass.players.get(player_id)
+ # we need to return the player itself too
+ group_child_ids = {player_id}
+ group_child_ids.update(player.group_childs)
+ for child_id in group_child_ids:
+ if slimplayer := self.slimproto.get_player(child_id):
+ yield slimplayer
+
+ def _get_corrected_elapsed_milliseconds(self, slimplayer: SlimClient) -> int:
+ """Return corrected elapsed milliseconds."""
+ sync_delay = self.mass.config.get_raw_player_config_value(
+ slimplayer.player_id, CONF_SYNC_ADJUST, 0
+ )
+ return slimplayer.elapsed_milliseconds - sync_delay
+
+ async def _set_preset_items(self, slimplayer: SlimClient) -> None:
+ """Set the presets for a player."""
+ preset_items: list[SlimPreset] = []
+ for preset_index in range(1, 11):
+ if preset_conf := self.mass.config.get_raw_player_config_value(
+ slimplayer.player_id, f"preset_{preset_index}"
+ ):
+ try:
+ media_item = await self.mass.music.get_item_by_uri(preset_conf)
+ preset_items.append(
+ SlimPreset(
+ uri=media_item.uri,
+ text=media_item.name,
+ icon=self.mass.metadata.get_image_url(media_item.image),
+ )
+ )
+ except MusicAssistantError:
+ # non-existing media item or some other edge case
+ preset_items.append(
+ SlimPreset(
+ uri=f"preset_{preset_index}",
+ text=f"ERROR <preset {preset_index}>",
+ icon="",
+ )
+ )
+ else:
+ break
+ slimplayer.presets = preset_items
+
+ async def _set_display(self, slimplayer: SlimClient) -> None:
+ """Set the display config for a player."""
+ display_enabled = self.mass.config.get_raw_player_config_value(
+ slimplayer.player_id,
+ CONF_ENTRY_DISPLAY.key,
+ CONF_ENTRY_DISPLAY.default_value,
+ )
+ visualization = self.mass.config.get_raw_player_config_value(
+ slimplayer.player_id,
+ CONF_ENTRY_VISUALIZATION.key,
+ CONF_ENTRY_VISUALIZATION.default_value,
+ )
+ await slimplayer.configure_display(
+ visualisation=SlimVisualisationType(visualization), disabled=not display_enabled
+ )
+
+ async def _serve_multi_client_stream(self, request: web.Request) -> web.Response:
+ """Serve the multi-client flow stream audio to a player."""
+ player_id = request.query.get("player_id")
+ fmt = request.query.get("fmt")
+ child_player_id = request.query.get("child_player_id")
+
+ if not self.mass.players.get(player_id):
+ raise web.HTTPNotFound(reason=f"Unknown player: {player_id}")
+
+ if not (child_player := self.mass.players.get(child_player_id)):
+ raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")
+
+ if not (stream := self._multi_streams.get(player_id, None)) or stream.done:
+ raise web.HTTPNotFound(f"There is no active stream for {player_id}!")
+
+ resp = web.StreamResponse(
+ status=200,
+ reason="OK",
+ headers={
+ "Content-Type": f"audio/{fmt}",
+ },
+ )
+ await resp.prepare(request)
+
+ # return early if this is not a GET request
+ if request.method != "GET":
+ return resp
+
+ # all checks passed, start streaming!
+ self.logger.debug(
+ "Start serving multi-client flow audio stream to %s",
+ child_player.display_name,
+ )
+ output_format = AudioFormat(content_type=ContentType.try_parse(fmt))
+ async for chunk in stream.get_stream(
+ output_format=output_format,
+ filter_params=get_player_filter_params(
+ self.mass, child_player_id, stream.audio_format, output_format
+ )
+ if child_player_id
+ else None,
+ ):
+ try:
+ await resp.write(chunk)
+ except (BrokenPipeError, ConnectionResetError, ConnectionError):
+ # race condition
+ break
+
+ return resp
--- /dev/null
+<svg viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
+<path d="M5.49875 4.68702C5.49875 4.30759 5.79351 4 6.15711 4H13.8379C14.2015 4 14.4963 4.30759 14.4963 4.68702V16.6527C14.4963 16.7104 14.4931 16.7677 14.4869 16.8244C14.355 18.0406 12.8466 19 11.5885 19C10.1621 19 8.6808 18.2926 8.6808 16.6527C8.6808 15.0128 9.99751 14.1908 11.3691 14.1908C12.018 14.1908 12.6531 14.3811 13.1796 14.6935V8.23664C13.1796 7.92045 12.9339 7.66412 12.6309 7.66412H7.36409C7.06109 7.66412 6.81546 7.92045 6.81546 8.23664V16.6527C6.81546 17.9491 5.22444 19 3.90773 19C2.4813 19 1 18.2926 1 16.6527C1 15.0128 2.31671 14.1908 3.68828 14.1908C4.33718 14.1908 4.97227 14.3811 5.49875 14.6935V4.68702Z" fill="#2BAAA6"/>
+<path fill-rule="evenodd" clip-rule="evenodd" d="M16.5028 7.51765C16.8893 7.16236 17.4787 7.20133 17.8191 7.6047C18.7211 8.67335 19.2693 10.0783 19.2693 11.6145C19.2693 13.1507 18.7211 14.5557 17.8191 15.6243C17.4787 16.0277 16.8893 16.0667 16.5028 15.7114C16.1163 15.3561 16.0789 14.7411 16.4194 14.3377C17.0329 13.6108 17.404 12.6591 17.404 11.6145C17.404 10.5699 17.0329 9.61818 16.4194 8.89133C16.0789 8.48796 16.1163 7.87294 16.5028 7.51765Z" fill="#2BAAA6"/>
+<path fill-rule="evenodd" clip-rule="evenodd" d="M19.2531 4.87572C19.638 4.51854 20.2275 4.55463 20.5698 4.95632C22.081 6.72989 23 9.06177 23 11.6145C23 14.1672 22.081 16.4991 20.5698 18.2727C20.2275 18.6744 19.638 18.7105 19.2531 18.3533C18.8681 17.9961 18.8336 17.3809 19.1758 16.9792C20.3965 15.5466 21.1347 13.6702 21.1347 11.6145C21.1347 9.55882 20.3965 7.68239 19.1758 6.24978C18.8336 5.84809 18.8681 5.2329 19.2531 4.87572Z" fill="#2BAAA6"/>
+</svg>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<svg
+ viewBox="0 0 51.2 51.2"
+ fill="none"
+ version="1.1"
+ id="svg3"
+ sodipodi:docname="icon_monochrome.svg"
+ width="512"
+ height="512"
+ inkscape:version="1.3.2 (091e20e, 2023-11-25, custom)"
+ xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
+ xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
+ xmlns="http://www.w3.org/2000/svg"
+ xmlns:svg="http://www.w3.org/2000/svg">
+ <defs
+ id="defs3" />
+ <sodipodi:namedview
+ id="namedview3"
+ pagecolor="#ffffff"
+ bordercolor="#000000"
+ borderopacity="0.25"
+ inkscape:showpageshadow="2"
+ inkscape:pageopacity="0.0"
+ inkscape:pagecheckerboard="0"
+ inkscape:deskcolor="#d1d1d1"
+ inkscape:zoom="1.7324219"
+ inkscape:cx="254.26832"
+ inkscape:cy="256"
+ inkscape:window-width="1920"
+ inkscape:window-height="1129"
+ inkscape:window-x="-8"
+ inkscape:window-y="-8"
+ inkscape:window-maximized="1"
+ inkscape:current-layer="svg3" />
+ <g
+ id="g3"
+ transform="matrix(2.3276212,0,0,2.3276212,-2.3276212,-0.9104848)"
+ style="fill:#ffffff;fill-opacity:1">
+ <path
+ d="M 5.49875,4.68702 C 5.49875,4.30759 5.79351,4 6.15711,4 h 7.68079 c 0.3636,0 0.6584,0.30759 0.6584,0.68702 V 16.6527 c 0,0.0577 -0.0032,0.115 -0.0094,0.1717 C 14.355,18.0406 12.8466,19 11.5885,19 10.1621,19 8.6808,18.2926 8.6808,16.6527 c 0,-1.6399 1.31671,-2.4619 2.6883,-2.4619 0.6489,0 1.284,0.1903 1.8105,0.5027 V 8.23664 c 0,-0.31619 -0.2457,-0.57252 -0.5487,-0.57252 H 7.36409 c -0.303,0 -0.54863,0.25633 -0.54863,0.57252 V 16.6527 C 6.81546,17.9491 5.22444,19 3.90773,19 2.4813,19 1,18.2926 1,16.6527 c 0,-1.6399 1.31671,-2.4619 2.68828,-2.4619 0.6489,0 1.28399,0.1903 1.81047,0.5027 z"
+ fill="#2baaa6"
+ id="path1"
+ style="fill:#ffffff;fill-opacity:1" />
+ <path
+ fill-rule="evenodd"
+ clip-rule="evenodd"
+ d="m 16.5028,7.51765 c 0.3865,-0.35529 0.9759,-0.31632 1.3163,0.08705 0.902,1.06865 1.4502,2.4736 1.4502,4.0098 0,1.5362 -0.5482,2.9412 -1.4502,4.0098 -0.3404,0.4034 -0.9298,0.4424 -1.3163,0.0871 -0.3865,-0.3553 -0.4239,-0.9703 -0.0834,-1.3737 0.6135,-0.7269 0.9846,-1.6786 0.9846,-2.7232 0,-1.0446 -0.3711,-1.99632 -0.9846,-2.72317 -0.3405,-0.40337 -0.3031,-1.01839 0.0834,-1.37368 z"
+ fill="#2baaa6"
+ id="path2"
+ style="fill:#ffffff;fill-opacity:1" />
+ <path
+ fill-rule="evenodd"
+ clip-rule="evenodd"
+ d="M 19.2531,4.87572 C 19.638,4.51854 20.2275,4.55463 20.5698,4.95632 22.081,6.72989 23,9.06177 23,11.6145 c 0,2.5527 -0.919,4.8846 -2.4302,6.6582 -0.3423,0.4017 -0.9318,0.4378 -1.3167,0.0806 -0.385,-0.3572 -0.4195,-0.9724 -0.0773,-1.3741 1.2207,-1.4326 1.9589,-3.309 1.9589,-5.3647 0,-2.05568 -0.7382,-3.93211 -1.9589,-5.36472 C 18.8336,5.84809 18.8681,5.2329 19.2531,4.87572 Z"
+ fill="#2baaa6"
+ id="path3"
+ style="fill:#ffffff;fill-opacity:1" />
+ </g>
+</svg>
--- /dev/null
+{
+ "type": "player",
+ "domain": "squeezelite",
+ "name": "Squeezelite (slimproto players)",
+ "description": "Support for Squeezelite, a software-based player implementing the slimproto protocol, which was originally designed for the Squeezebox hardware players. Other players and/or original Squeezebox hardware might also work with this provider, but without any guarantees/support.",
+ "codeowners": ["@music-assistant"],
+ "requirements": ["aioslimproto==3.1.0"],
+ "documentation": "https://music-assistant.io/player-support/squeezelite/",
+ "multi_instance": false,
+ "builtin": false
+}
--- /dev/null
+"""Implementation of a simple multi-client stream task/job."""
+
+import asyncio
+import logging
+from collections.abc import AsyncGenerator
+from contextlib import suppress
+
+from music_assistant_models.media_items import AudioFormat
+
+from music_assistant.helpers.audio import get_ffmpeg_stream
+from music_assistant.helpers.util import empty_queue
+
+LOGGER = logging.getLogger(__name__)
+
+
+class MultiClientStream:
+ """Implementation of a simple multi-client (audio) stream task/job."""
+
+ def __init__(
+ self,
+ audio_source: AsyncGenerator[bytes, None],
+ audio_format: AudioFormat,
+ expected_clients: int = 0,
+ ) -> None:
+ """Initialize MultiClientStream."""
+ self.audio_source = audio_source
+ self.audio_format = audio_format
+ self.subscribers: list[asyncio.Queue] = []
+ self.expected_clients = expected_clients
+ self.task = asyncio.create_task(self._runner())
+
+ @property
+ def done(self) -> bool:
+ """Return if this stream is already done."""
+ return self.task.done()
+
+ async def stop(self) -> None:
+ """Stop/cancel the stream."""
+ if self.done:
+ return
+ self.task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self.task
+ for sub_queue in list(self.subscribers):
+ empty_queue(sub_queue)
+
+ async def get_stream(
+ self,
+ output_format: AudioFormat,
+ filter_params: list[str] | None = None,
+ ) -> AsyncGenerator[bytes, None]:
+ """Get (client specific encoded) ffmpeg stream."""
+ async for chunk in get_ffmpeg_stream(
+ audio_input=self.subscribe_raw(),
+ input_format=self.audio_format,
+ output_format=output_format,
+ filter_params=filter_params,
+ ):
+ yield chunk
+
+ async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
+ """Subscribe to the raw/unaltered audio stream."""
+ try:
+ queue = asyncio.Queue(2)
+ self.subscribers.append(queue)
+ while True:
+ chunk = await queue.get()
+ if chunk == b"":
+ break
+ yield chunk
+ finally:
+ with suppress(ValueError):
+ self.subscribers.remove(queue)
+
+ async def _runner(self) -> None:
+ """Run the stream for the given audio source."""
+ expected_clients = self.expected_clients or 1
+ # wait for first/all subscriber
+ count = 0
+ while count < 50:
+ await asyncio.sleep(0.1)
+ count += 1
+ if len(self.subscribers) >= expected_clients:
+ break
+ LOGGER.debug(
+ "Starting multi-client stream with %s/%s clients",
+ len(self.subscribers),
+ self.expected_clients,
+ )
+ async for chunk in self.audio_source:
+ fail_count = 0
+ while len(self.subscribers) == 0:
+ await asyncio.sleep(0.1)
+ fail_count += 1
+ if fail_count > 50:
+ LOGGER.warning("No clients connected, stopping stream")
+ return
+ await asyncio.gather(
+ *[sub.put(chunk) for sub in self.subscribers], return_exceptions=True
+ )
+ # EOF: send empty chunk
+ await asyncio.gather(*[sub.put(b"") for sub in self.subscribers], return_exceptions=True)
'^music_assistant/providers/podcastfeed/.*$',
'^music_assistant/providers/qobuz/.*$',
'^music_assistant/providers/siriusxm/.*$',
- '^music_assistant/providers/slimproto/.*$',
+ '^music_assistant/providers/squeezelite/.*$',
'^music_assistant/providers/sonos/.*$',
'^music_assistant/providers/soundcloud/.*$',
'^music_assistant/providers/snapcast/.*$',