"""All constants for Music Assistant."""
-__version__ = "0.0.52"
+__version__ = "0.0.53"
REQUIRED_PYTHON_VER = "3.7"
# configuration keys/attributes
import time
from typing import AsyncGenerator, List, Optional
-LOGGER = logging.getLogger("AsyncProcess")
+LOGGER = logging.getLogger("mass.helpers")
class AsyncProcess(object):
await self.__queue_out.get()
self.__queue_out.task_done()
await self.__proc_task
- LOGGER.debug("[%s] Context manager closed", self._id)
return True
async def iterate_chunks(self) -> AsyncGenerator[bytes, None]:
def __run_proc(self):
"""Run process in executor."""
try:
- LOGGER.debug(
- "[%s] Starting process with args: %s", self._id, str(self._process_args)
- )
proc = subprocess.Popen(
self._process_args,
shell=self._enable_shell,
if proc.poll() is None:
proc.terminate()
proc.communicate()
- LOGGER.debug("[%s] process finished", self._id)
def __write_stdin(self, _stdin):
"""Put chunks from queue to stdin."""
DEFAULT_BASE_CONFIG_ENTRIES = {
CONF_KEY_BASE_WEBSERVER: [
+ ConfigEntry(
+ entry_key="__name__",
+ entry_type=ConfigEntryType.LABEL,
+ label=CONF_KEY_BASE_WEBSERVER,
+ hidden=True,
+ ),
ConfigEntry(
entry_key=CONF_HTTP_PORT,
entry_type=ConfigEntryType.INT,
entry_key=CONF_EXTERNAL_URL,
entry_type=ConfigEntryType.STRING,
default_value=f"http://{get_external_ip()}:8095",
- label="External url (fqdn)",
+ label=CONF_EXTERNAL_URL,
description="desc_external_url",
),
],
CONF_KEY_BASE_SECURITY: [
+ ConfigEntry(
+ entry_key="__name__",
+ entry_type=ConfigEntryType.LABEL,
+ label=CONF_KEY_BASE_SECURITY,
+ hidden=True,
+ ),
ConfigEntry(
entry_key=CONF_USERNAME,
entry_type=ConfigEntryType.STRING,
return self._player_states.get(player_id)
@callback
- def get_player(self, player_id: str) -> PlayerState:
+ def get_player(self, player_id: str) -> Player:
"""Return Player by player_id or None if player does not exist."""
player_state = self._player_states.get(player_id)
if player_state:
import pyloudnorm
import soundfile
from aiofile import AIOFile, Reader
-from music_assistant.constants import EVENT_STREAM_ENDED, EVENT_STREAM_STARTED
+from music_assistant.constants import (
+ CONF_MAX_SAMPLE_RATE,
+ EVENT_STREAM_ENDED,
+ EVENT_STREAM_STARTED,
+)
from music_assistant.helpers.encryption import (
async_decrypt_bytes,
async_decrypt_string,
if resample:
args += ["rate", "-v", str(resample)]
- LOGGER.debug(
- "[async_get_sox_stream] [%s/%s] started using args: %s",
- streamdetails.provider,
- streamdetails.item_id,
- " ".join(args),
- )
async with AsyncProcess(args, chunk_size, enable_write=True) as sox_proc:
cancelled = False
async def fill_buffer():
"""Forward audio chunks to sox stdin."""
- LOGGER.debug(
- "[async_get_sox_stream] [%s/%s] fill_buffer started",
- streamdetails.provider,
- streamdetails.item_id,
- )
# feed audio data into sox stdin for processing
async for chunk in self.async_get_media_stream(streamdetails):
- if self.mass.exit or cancelled:
+ if self.mass.exit or cancelled or not chunk:
break
await sox_proc.write(chunk)
await sox_proc.write_eof()
- LOGGER.debug(
- "[async_get_sox_stream] [%s/%s] fill_buffer finished",
- streamdetails.provider,
- streamdetails.item_id,
- )
fill_buffer_task = self.mass.loop.create_task(fill_buffer())
# yield chunks from stdout
cancelled = True
fill_buffer_task.cancel()
LOGGER.debug(
- "[async_get_sox_stream] [%s/%s] cancelled",
- streamdetails.provider,
- streamdetails.item_id,
- )
- raise exc
- else:
- LOGGER.debug(
- "[async_get_sox_stream] [%s/%s] finished",
+ "[async_get_sox_stream] [%s/%s] cancelled: %s",
streamdetails.provider,
streamdetails.item_id,
+ str(exc),
)
async def async_queue_stream_flac(self, player_id) -> AsyncGenerator[bytes, None]:
"""Stream the PlayerQueue's tracks as constant feed in flac format."""
- chunk_size = 571392 # 74,7% of pcm
+ chunk_size = 512000
- args = ["sox", "-t", "s32", "-c", "2", "-r", "96000", "-", "-t", "flac", "-"]
+ player_conf = self.mass.config.get_player_config(player_id)
+ sample_rate = player_conf.get(CONF_MAX_SAMPLE_RATE, 96000)
+
+ args = [
+ "sox",
+ "-t",
+ "s32",
+ "-c",
+ "2",
+ "-r",
+ str(sample_rate),
+ "-",
+ "-t",
+ "flac",
+ "-",
+ ]
async with AsyncProcess(args, chunk_size, enable_write=True) as sox_proc:
- LOGGER.debug(
- "[async_queue_stream_flac] [%s] started using args: %s",
- player_id,
- " ".join(args),
- )
-
# feed stdin with pcm samples
cancelled = False
async def fill_buffer():
"""Feed audio data into sox stdin for processing."""
- LOGGER.debug(
- "[async_queue_stream_flac] [%s] fill buffer started", player_id
- )
- async for chunk in self.async_queue_stream_pcm(player_id, 96000, 32):
- if self.mass.exit or cancelled:
+ async for chunk in self.async_queue_stream_pcm(
+ player_id, sample_rate, 32
+ ):
+ if self.mass.exit or cancelled or not chunk:
break
await sox_proc.write(chunk)
# write eof when no more data
await sox_proc.write_eof()
- LOGGER.debug(
- "[async_queue_stream_flac] [%s] fill buffer finished", player_id
- )
fill_buffer_task = self.mass.loop.create_task(fill_buffer())
try:
cancelled = True
fill_buffer_task.cancel()
LOGGER.debug(
- "[async_queue_stream_flac] [%s] cancelled",
- player_id,
- )
- raise exc
- else:
- LOGGER.debug(
- "[async_queue_stream_flac] [%s] finished",
- player_id,
+ "[async_queue_stream_flac] [%s] cancelled: %s", player_id, str(exc)
)
async def async_queue_stream_pcm(
) -> AsyncGenerator[bytes, None]:
"""Stream the PlayerQueue's tracks as constant feed in PCM raw audio."""
player_queue = self.mass.players.get_player_queue(player_id)
- queue_conf = self.mass.config.get_player_config(player_id)
- fade_length = try_parse_int(queue_conf["crossfade_duration"])
- pcm_args = ["s32", "-c", "2", "-r", str(sample_rate)]
- sample_size = int(sample_rate * (bit_depth / 8) * 2) # 1 second
- if fade_length:
- buffer_size = sample_size * fade_length
- else:
- buffer_size = sample_size * 10
LOGGER.info("Start Queue Stream for player %s ", player_id)
else:
queue_track = player_queue.next_item
if not queue_track:
- LOGGER.debug("no (more) tracks left in queue")
+ LOGGER.info("no (more) tracks left in queue")
break
+
+ # get crossfade details
+ fade_length = player_queue.crossfade_duration
+ pcm_args = ["s32", "-c", "2", "-r", str(sample_rate)]
+ sample_size = int(sample_rate * (bit_depth / 8) * 2) # 1 second
+ buffer_size = sample_size * fade_length if fade_length else sample_size * 10
+
# get streamdetails
streamdetails = await self.mass.music.async_get_stream_details(
queue_track, player_id
# signal start of stream event
self.mass.signal_event(EVENT_STREAM_STARTED, streamdetails)
- LOGGER.debug(
- "[async_get_media_stream] [%s/%s] started, using %s",
- streamdetails.provider,
- streamdetails.item_id,
- stream_type,
- )
if stream_type == StreamType.CACHE:
async for chunk in async_yield_chunks(audio_data, chunk_size):
# send analyze job to background worker
if not stream_type == StreamType.CACHE:
self.mass.add_job(self.__analyze_audio, streamdetails, audio_data)
- LOGGER.debug(
- "[async_get_media_stream] [%s/%s] Finished",
- streamdetails.provider,
- streamdetails.item_id,
- )
def __get_player_sox_options(
self, player_id: str, streamdetails: StreamDetails
entry_key: str
entry_type: ConfigEntryType
- default_value: Any = None
+ default_value: Any = ""
values: List[Any] = field(default_factory=list) # select from list of values
range: Tuple[Any] = () # select values within range
label: str = "" # a friendly name for the setting
from typing import List, Optional, Tuple
from music_assistant.constants import (
+ CONF_CROSSFADE_DURATION,
EVENT_QUEUE_ITEMS_UPDATED,
EVENT_QUEUE_TIME_UPDATED,
EVENT_QUEUE_UPDATED,
self._items = []
self._shuffle_enabled = False
self._repeat_enabled = False
- self._crossfade_enabled = False
self._cur_index = 0
self._cur_item_time = 0
self._last_item = None
self.mass.add_job(self.async_update_state())
self.mass.add_job(self.__async_save_state())
- @property
- def crossfade_enabled(self) -> bool:
- """Return if crossfade is enabled for this player's queue."""
- return self._crossfade_enabled
-
@property
def cur_index(self) -> OptionalInt:
"""
else not self.supports_queue
)
+ @property
+ def crossfade_duration(self) -> int:
+ """Return crossfade duration (if enabled)."""
+ player_settings = self.mass.config.get_player_config(self.player_id)
+ if player_settings:
+ return player_settings.get(CONF_CROSSFADE_DURATION, 0)
+ return 0
+
+ @property
+ def crossfade_enabled(self) -> bool:
+ """Return bool if crossfade is enabled."""
+ return self.crossfade_duration > 0
+
@property
def supports_queue(self) -> bool:
"""Return if this player supports native queue."""
async def async_next(self) -> None:
"""Play the next track in the queue."""
- self._crossfade_enabled = (
- self.mass.config.player_settings[self.player_id]["crossfade_duration"] > 0
- )
if self.cur_index is None:
return
if self.use_queue_stream:
async def async_previous(self) -> None:
"""Play the previous track in the queue."""
- self._crossfade_enabled = (
- self.mass.config.player_settings[self.player_id]["crossfade_duration"] > 0
- )
if self.cur_index is None:
return
if self.use_queue_stream:
async def async_resume(self) -> None:
"""Resume previous queue."""
- self._crossfade_enabled = (
- self.mass.config.player_settings[self.player_id]["crossfade_duration"] > 0
- )
if self.items:
prev_index = self.cur_index
if self.use_queue_stream or not self.supports_queue:
async def async_play_index(self, index: int) -> None:
"""Play item at index X in queue."""
- self._crossfade_enabled = (
- self.mass.config.player_settings[self.player_id]["crossfade_duration"] > 0
- )
if not isinstance(index, int):
index = self.__index_by_id(index)
if not len(self.items) > index:
async def async_load(self, queue_items: List[QueueItem]) -> None:
"""Load (overwrite) queue with new items."""
- self._crossfade_enabled = (
- self.mass.config.player_settings[self.player_id]["crossfade_duration"] > 0
- )
for index, item in enumerate(queue_items):
item.sort_index = index
if self._shuffle_enabled:
async def async_start_queue_stream(self) -> None:
"""Call when queue_streamer starts playing the queue stream."""
self._last_queue_startindex = self._next_queue_startindex
-
self._cur_item_time = 0
return self.get_item(self._next_queue_startindex)
+++ /dev/null
-"""Local player provider."""
-import asyncio
-import logging
-import signal
-import subprocess
-from typing import List
-
-from music_assistant.models.config_entry import ConfigEntry
-from music_assistant.models.player import DeviceInfo, PlaybackState, Player
-from music_assistant.models.provider import PlayerProvider
-
-PROV_ID = "builtin_player"
-PROV_NAME = "Built-in (local) player"
-LOGGER = logging.getLogger(PROV_ID)
-
-
-async def async_setup(mass):
- """Perform async setup of this Plugin/Provider."""
- prov = BuiltinPlayerProvider()
- await mass.async_register_provider(prov)
-
-
-class BuiltinPlayerProvider(PlayerProvider):
- """Demo PlayerProvider which provides a single local player."""
-
- @property
- def id(self) -> str:
- """Return provider ID for this provider."""
- return PROV_ID
-
- @property
- def name(self) -> str:
- """Return provider Name for this provider."""
- return PROV_NAME
-
- @property
- def config_entries(self) -> List[ConfigEntry]:
- """Return Config Entries for this provider."""
- return []
-
- async def async_on_start(self) -> bool:
- """Handle initialization of the provider based on config."""
- player = BuiltinPlayer("local_player", "Built-in player on the server")
- self.mass.add_job(self.mass.players.async_add_player(player))
- return True
-
- async def async_on_stop(self):
- """Handle correct close/cleanup of the provider on exit."""
- for player in self.players:
- await player.async_cmd_stop()
-
-
-class BuiltinPlayer(Player):
- """Representation of a BuiltinPlayer."""
-
- def __init__(self, player_id: str, name: str) -> None:
- """Initialize the built-in player."""
- self._player_id = player_id
- self._name = name
- self._powered = False
- self._elapsed_time = 0
- self._state = PlaybackState.Stopped
- self._current_uri = ""
- self._volume_level = 100
- self._muted = False
- self._sox = None
- self._progress_task = None
-
- @property
- def player_id(self) -> str:
- """Return player id of this player."""
- return self._player_id
-
- @property
- def provider_id(self) -> str:
- """Return provider id of this player."""
- return PROV_ID
-
- @property
- def name(self) -> str:
- """Return name of the player."""
- return self._name
-
- @property
- def powered(self) -> bool:
- """Return current power state of player."""
- return self._powered
-
- @property
- def elapsed_time(self) -> float:
- """Return elapsed_time of current playing uri in seconds."""
- return self._elapsed_time
-
- @property
- def state(self) -> PlaybackState:
- """Return current PlaybackState of player."""
- return self._state
-
- @property
- def available(self) -> bool:
- """Return current availablity of player."""
- return True
-
- @property
- def current_uri(self) -> str:
- """Return currently loaded uri of player (if any)."""
- return self._current_uri
-
- @property
- def volume_level(self) -> int:
- """Return current volume level of player (scale 0..100)."""
- return self._volume_level
-
- @property
- def muted(self) -> bool:
- """Return current mute state of player."""
- return self._muted
-
- @property
- def is_group_player(self) -> bool:
- """Return True if this player is a group player."""
- return False
-
- @property
- def device_info(self) -> DeviceInfo:
- """Return the device info for this player."""
- return DeviceInfo(
- model="Demo", address="http://demo:12345", manufacturer=PROV_NAME
- )
-
- # SERVICE CALLS / PLAYER COMMANDS
-
- async def async_cmd_play_uri(self, uri: str):
- """Play the specified uri/url on the player."""
- if self._sox:
- await self.async_cmd_stop()
- self._current_uri = uri
- self._sox = subprocess.Popen(["play", "-t", "flac", "-q", uri])
- self._state = PlaybackState.Playing
- self._powered = True
- self.update_state()
-
- async def report_progress():
- """Report fake progress while sox is playing."""
- LOGGER.info("Playback started on player %s", self.name)
- self._elapsed_time = 0
- while self._sox and not self._sox.poll():
- await asyncio.sleep(1)
- self._elapsed_time += 1
- self.update_state()
- LOGGER.info("Playback stopped on player %s", self.name)
- self._elapsed_time = 0
- self._state = PlaybackState.Stopped
- self.update_state()
-
- if self._progress_task:
- self._progress_task.cancel()
- self._progress_task = self.mass.add_job(report_progress)
-
- async def async_cmd_stop(self) -> None:
- """Send STOP command to player."""
- if self._sox:
- self._sox.terminate()
- self._sox = None
- self._state = PlaybackState.Stopped
- self.update_state()
-
- async def async_cmd_play(self) -> None:
- """Send PLAY command to player."""
- if self._sox:
- self._sox.send_signal(signal.SIGCONT)
- self._state = PlaybackState.Playing
- self.update_state()
-
- async def async_cmd_pause(self):
- """Send PAUSE command to given player."""
- if self._sox:
- self._sox.send_signal(signal.SIGSTOP)
- self._state = PlaybackState.Paused
- self.update_state()
-
- async def async_cmd_power_on(self) -> None:
- """Send POWER ON command to player."""
- self._powered = True
- self.update_state()
-
- async def async_cmd_power_off(self) -> None:
- """Send POWER OFF command to player."""
- await self.async_cmd_stop()
- self._powered = False
- self.update_state()
-
- async def async_cmd_volume_set(self, volume_level: int) -> None:
- """
- Send volume level command to given player.
-
- :param volume_level: volume level to set (0..100).
- """
- self._volume_level = volume_level
- self.update_state()
-
- async def async_cmd_volume_mute(self, is_muted=False):
- """
- Send volume MUTE command to given player.
-
- :param is_muted: bool with new mute state.
- """
- self._muted = is_muted
- self.update_state()
+++ /dev/null
-{
- "nl": {
- "Built-in (local) player": "Ingebouwde speler van de server"
- }
-}
\ No newline at end of file
+++ /dev/null
-"""Group player provider: enables grouping of all playertypes."""
-
-import asyncio
-import logging
-from typing import List
-
-from music_assistant.helpers.typing import MusicAssistantType
-from music_assistant.models.config_entry import ConfigEntry, ConfigEntryType
-from music_assistant.models.player import DeviceInfo, PlaybackState, Player
-from music_assistant.models.provider import PlayerProvider
-
-PROV_ID = "group_player"
-PROV_NAME = "Group player creator"
-LOGGER = logging.getLogger(PROV_ID)
-
-CONF_PLAYER_COUNT = "group_player_count"
-CONF_PLAYERS = "group_player_players"
-CONF_MASTER = "group_player_master"
-
-CONFIG_ENTRIES = [
- ConfigEntry(
- entry_key=CONF_PLAYER_COUNT,
- entry_type=ConfigEntryType.INT,
- description=CONF_PLAYER_COUNT,
- default_value=1,
- range=(0, 10),
- )
-]
-
-
-async def async_setup(mass):
- """Perform async setup of this Plugin/Provider."""
- prov = GroupPlayerProvider()
- await mass.async_register_provider(prov)
-
-
-class GroupPlayerProvider(PlayerProvider):
- """PlayerProvider which allows users to group players."""
-
- @property
- def id(self) -> str:
- """Return provider ID for this provider."""
- return PROV_ID
-
- @property
- def name(self) -> str:
- """Return provider Name for this provider."""
- return PROV_NAME
-
- @property
- def config_entries(self) -> List[ConfigEntry]:
- """Return Config Entries for this provider."""
- return CONFIG_ENTRIES
-
- async def async_on_start(self) -> bool:
- """Handle initialization of the provider based on config."""
- conf = self.mass.config.player_providers[PROV_ID]
- for index in range(conf[CONF_PLAYER_COUNT]):
- player = GroupPlayer(self.mass, index)
- self.mass.add_job(self.mass.players.async_add_player(player))
- return True
-
- async def async_on_stop(self):
- """Handle correct close/cleanup of the provider on exit. Called on shutdown."""
- for player in self.players:
- await player.async_cmd_stop()
-
-
-class GroupPlayer(Player):
- """Model for a group player."""
-
- def __init__(self, mass: MusicAssistantType, player_index: int):
- """Initialize."""
- self.mass = mass
- self._player_index = player_index
- self._player_id = f"group_player_{player_index}"
- self._provider_id = PROV_ID
- self._name = f"Group Player {player_index}"
- self._powered = False
- self._state = PlaybackState.Stopped
- self._available = True
- self._current_uri = ""
- self._volume_level = 0
- self._muted = False
- self.connected_clients = {}
- self.stream_task = None
- self.sync_task = None
- self._config_entries = self.__get_config_entries()
- self._group_childs = self.__get_group_childs()
-
- @property
- def player_id(self) -> str:
- """Return player id of this player."""
- return self._player_id
-
- @property
- def provider_id(self) -> str:
- """Return provider id of this player."""
- return self._provider_id
-
- @property
- def name(self) -> str:
- """Return name of the player."""
- return self._name
-
- @property
- def powered(self) -> bool:
- """Return current power state of player."""
- return self._powered
-
- @property
- def state(self) -> PlaybackState:
- """Return current PlaybackState of player."""
- return self._state
-
- @property
- def available(self) -> bool:
- """Return current availablity of player."""
- return True
-
- @property
- def current_uri(self) -> str:
- """Return currently loaded uri of player (if any)."""
- return self._current_uri
-
- @property
- def volume_level(self) -> int:
- """Return current volume level of player (scale 0..100)."""
- return self._volume_level
-
- @property
- def muted(self) -> bool:
- """Return current mute state of player."""
- return self._muted
-
- @property
- def elapsed_time(self):
- """Return elapsed timefor first child player."""
- if self.state in [PlaybackState.Playing, PlaybackState.Paused]:
- for player_id in self.group_childs:
- player = self.mass.players.get_player(player_id)
- if player:
- return player.elapsed_time
- return 0
-
- @property
- def should_poll(self):
- """Return True if this player should be polled for state."""
- return True
-
- @property
- def is_group_player(self) -> bool:
- """Return True if this player is a group player."""
- return True
-
- @property
- def group_childs(self):
- """Return group childs of this group player."""
- return self._group_childs
-
- @property
- def device_info(self) -> DeviceInfo:
- """Return deviceinfo."""
- return DeviceInfo(
- model="Group Player",
- manufacturer=PROV_ID,
- )
-
- @property
- def config_entries(self):
- """Return config entries for this group player."""
- return self._config_entries
-
- async def async_on_update(self) -> None:
- """Call when player is periodically polled by the player manager (should_poll=True)."""
- self._config_entries = self.__get_config_entries()
- self._group_childs = self.__get_group_childs()
- self.update_state()
-
- def __get_group_childs(self):
- """Return group childs of this group player."""
- player_conf = self.mass.config.get_player_config(self.player_id)
- if player_conf and player_conf.get(CONF_PLAYERS):
- return player_conf[CONF_PLAYERS]
- return []
-
- def __get_config_entries(self):
- """Return config entries for this group player."""
- all_players = [
- {"text": item.name, "value": item.player_id}
- for item in self.mass.players.player_states
- if item.player_id is not self._player_id
- ]
- selected_players_ids = self.mass.config.get_player_config(self.player_id).get(
- CONF_PLAYERS, []
- )
- # selected_players_ids = []
- selected_players = []
- for player_id in selected_players_ids:
- player_state = self.mass.players.get_player_state(player_id)
- if player_state:
- selected_players.append(
- {"text": player_state.name, "value": player_state.player_id}
- )
- default_master = ""
- if selected_players:
- default_master = selected_players[0]["value"]
- return [
- ConfigEntry(
- entry_key=CONF_PLAYERS,
- entry_type=ConfigEntryType.STRING,
- default_value=[],
- values=all_players,
- label=CONF_PLAYERS,
- description="group_player_players_desc",
- multi_value=True,
- ),
- ConfigEntry(
- entry_key=CONF_MASTER,
- entry_type=ConfigEntryType.STRING,
- default_value=default_master,
- values=selected_players,
- label=CONF_MASTER,
- description="group_player_master_desc",
- multi_value=False,
- depends_on=CONF_PLAYERS,
- ),
- ]
-
- # SERVICE CALLS / PLAYER COMMANDS
-
- async def async_cmd_play_uri(self, uri: str):
- """Play the specified uri/url on the player."""
- await self.async_cmd_stop()
- self._current_uri = uri
- self._state = PlaybackState.Playing
- self._powered = True
- # forward this command to each child player
- # TODO: Only start playing on powered players ?
- # Monitor if a child turns on and join it to the sync ?
- for child_player_id in self.group_childs:
- child_player = self.mass.players.get_player(child_player_id)
- if child_player:
- queue_stream_uri = f"{self.mass.web.internal_url}/stream/group/{self.player_id}?player_id={child_player_id}"
- await child_player.async_cmd_play_uri(queue_stream_uri)
- self.update_state()
- self.stream_task = self.mass.add_job(self.async_queue_stream_task())
-
- async def async_cmd_stop(self) -> None:
- """Send STOP command to player."""
- self._state = PlaybackState.Stopped
- if self.stream_task:
- # cancel existing stream task if any
- self.stream_task.cancel()
- self.connected_clients = {}
- await asyncio.sleep(0.5)
- if self.sync_task:
- self.sync_task.cancel()
- # forward this command to each child player
- # TODO: Only forward to powered child players
- for child_player_id in self.group_childs:
- child_player = self.mass.players.get_player(child_player_id)
- if child_player:
- await child_player.async_cmd_stop()
- self.update_state()
-
- async def async_cmd_play(self) -> None:
- """Send PLAY command to player."""
- if not self.state == PlaybackState.Paused:
- return
- # forward this command to each child player
- for child_player_id in self.group_childs:
- child_player = self.mass.players.get_player(child_player_id)
- if child_player:
- await child_player.async_cmd_play()
- self._state = PlaybackState.Playing
- self.update_state()
-
- async def async_cmd_pause(self):
- """Send PAUSE command to player."""
- # forward this command to each child player
- for child_player_id in self.group_childs:
- child_player = self.mass.players.get_player(child_player_id)
- if child_player:
- await child_player.async_cmd_pause()
- self._state = PlaybackState.Paused
- self.update_state()
-
- async def async_cmd_power_on(self) -> None:
- """Send POWER ON command to player."""
- self._powered = True
- self.update_state()
-
- async def async_cmd_power_off(self) -> None:
- """Send POWER OFF command to player."""
- await self.async_cmd_stop()
- self._powered = False
- self.update_state()
-
- async def async_cmd_volume_set(self, volume_level: int) -> None:
- """
- Send volume level command to player.
-
- :param volume_level: volume level to set (0..100).
- """
- # this is already handled by the player manager
-
- async def async_cmd_volume_mute(self, is_muted=False):
- """
- Send volume MUTE command to given player.
-
- :param is_muted: bool with new mute state.
- """
- for child_player_id in self.group_childs:
- self.mass.players.async_cmd_volume_mute(child_player_id)
- self.muted = is_muted
-
- async def subscribe_stream_client(self, child_player_id):
- """Handle streaming to all players of a group. Highly experimental."""
-
- # each connected client gets its own Queue to which audio chunks (flac) are sent
- try:
- # report this client as connected
- queue = asyncio.Queue()
- self.connected_clients[child_player_id] = queue
- LOGGER.debug(
- "[%s] child player connected: %s",
- self.player_id,
- child_player_id,
- )
- # yield flac chunks from stdout to the http streamresponse
- while True:
- chunk = await queue.get()
- yield chunk
- queue.task_done()
- if not chunk:
- break
- except (GeneratorExit, Exception): # pylint: disable=broad-except
- LOGGER.warning(
- "[%s] child player aborted stream: %s", self.player_id, child_player_id
- )
- self.connected_clients.pop(child_player_id, None)
- else:
- self.connected_clients.pop(child_player_id, None)
- LOGGER.debug(
- "[%s] child player completed streaming: %s",
- self.player_id,
- child_player_id,
- )
-
- async def async_queue_stream_task(self):
- """Handle streaming queue to connected child players."""
- ticks = 0
- while ticks < 60 and len(self.connected_clients) != len(self.group_childs):
- # TODO: Support situation where not all clients of the group are powered
- await asyncio.sleep(0.1)
- ticks += 1
- if not self.connected_clients:
- LOGGER.warning("no clients!")
- return
- LOGGER.debug(
- "start queue stream with %s connected clients", len(self.connected_clients)
- )
- self.sync_task = asyncio.create_task(self.__synchronize_players())
-
- async for audio_chunk in self.mass.streams.async_queue_stream_flac(
- self.player_id
- ):
-
- # make sure we still have clients connected
- if not self.connected_clients:
- LOGGER.warning("no more clients!")
- return
-
- # send the audio chunk to all connected players
- tasks = []
- for _queue in self.connected_clients.values():
- tasks.append(self.mass.add_job(_queue.put(audio_chunk)))
- # wait for clients to consume the data
- await asyncio.wait(tasks)
-
- if not self.connected_clients:
- LOGGER.warning("no more clients!")
- return
- self.sync_task.cancel()
-
- async def __synchronize_players(self):
- """Handle drifting/lagging by monitoring progress and compare to master player."""
-
- master_player_id = self.mass.config.player_settings[self.player_id].get(
- CONF_MASTER
- )
- master_player = self.mass.players.get_player(master_player_id)
- if not master_player:
- LOGGER.warning("Synchronization of playback aborted: no master player.")
- return
- LOGGER.debug(
- "Synchronize playback of group using master player %s", master_player.name
- )
-
- # wait until master is playing
- while master_player.state != PlaybackState.Playing:
- await asyncio.sleep(0.1)
- await asyncio.sleep(0.5)
-
- prev_lags = {}
- prev_drifts = {}
-
- while self.connected_clients:
-
- # check every 0.5 seconds for player sync
- await asyncio.sleep(0.5)
-
- for child_player_id in self.connected_clients:
-
- if child_player_id == master_player_id:
- continue
- child_player = self.mass.players.get_player(child_player_id)
-
- if (
- not child_player
- or child_player.state != PlaybackState.Playing
- or child_player.elapsed_milliseconds is None
- ):
- continue
-
- if child_player_id not in prev_lags:
- prev_lags[child_player_id] = []
- if child_player_id not in prev_drifts:
- prev_drifts[child_player_id] = []
-
- # calculate lag (player is too slow in relation to the master)
- lag = (
- master_player.elapsed_milliseconds
- - child_player.elapsed_milliseconds
- )
- prev_lags[child_player_id].append(lag)
- if len(prev_lags[child_player_id]) == 5:
- # if we have 5 samples calclate the average lag
- avg_lag = sum(prev_lags[child_player_id]) / len(
- prev_lags[child_player_id]
- )
- prev_lags[child_player_id] = []
- if avg_lag > 25:
- LOGGER.debug(
- "child player %s is lagging behind with %s milliseconds",
- child_player_id,
- avg_lag,
- )
- # we correct the lag by pausing the master player for a very short time
- await master_player.async_cmd_pause()
- # sending the command takes some time, account for that too
- if avg_lag > 20:
- sleep_time = avg_lag - 20
- await asyncio.sleep(sleep_time / 1000)
- asyncio.create_task(master_player.async_cmd_play())
- break # no more processing this round if we've just corrected a lag
-
- # calculate drift (player is going faster in relation to the master)
- drift = (
- child_player.elapsed_milliseconds
- - master_player.elapsed_milliseconds
- )
- prev_drifts[child_player_id].append(drift)
- if len(prev_drifts[child_player_id]) == 5:
- # if we have 5 samples calculate the average drift
- avg_drift = sum(prev_drifts[child_player_id]) / len(
- prev_drifts[child_player_id]
- )
- prev_drifts[child_player_id] = []
-
- if avg_drift > 25:
- LOGGER.debug(
- "child player %s is drifting ahead with %s milliseconds",
- child_player_id,
- avg_drift,
- )
- # we correct the drift by pausing the player for a very short time
- # this is not the best approach but works with all playertypes
- # temporary solution until I find something better like sending more/less pcm chunks
- await child_player.async_cmd_pause()
- # sending the command takes some time, account for that too
- if avg_drift > 20:
- sleep_time = drift - 20
- await asyncio.sleep(sleep_time / 1000)
- await child_player.async_cmd_play()
- break # no more processing this round if we've just corrected a lag
+++ /dev/null
-{
- "en": {
- "Universal Group Players": "Universal Group Players",
- "group_player_count": "Number of group players",
- "group_player_count_desc": "Select how many Universal group players should be created.",
- "group_player_players": "Players in group",
- "group_player_players_desc": "Select the players that should be part of this group.",
- "group_player_master": "Group master",
- "group_player_master_desc": "Select the player that should act as group master."
- },
- "nl": {
- "Universal Group Players": "Universele groep spelers",
- "group_player_count": "Aantal groep spelers",
- "group_player_count_desc": "Selecteer hoeveel groep spelers er aangemaakt moeten worden.",
- "group_player_players": "Groepsspelers",
- "group_player_players_desc": "Selecteer de spelers die deel uitmaken van deze groep.",
- "group_player_master": "Groepsbeheerder",
- "group_player_master_desc": "Selecteer de speler die dient als groepsbeheerder."
- }
-}
\ No newline at end of file
--- /dev/null
+"""Builtin player provider."""
+import asyncio
+import logging
+import signal
+import subprocess
+import time
+from typing import List
+
+from music_assistant.helpers.typing import MusicAssistantType
+from music_assistant.helpers.util import get_hostname, run_periodic
+from music_assistant.models.config_entry import ConfigEntry
+from music_assistant.models.player import (
+ DeviceInfo,
+ PlaybackState,
+ Player,
+ PlayerFeature,
+)
+from music_assistant.models.provider import PlayerProvider
+
+PROV_ID = "mass"
+PROV_NAME = "Music Assistant"
+LOGGER = logging.getLogger(PROV_ID)
+
+CONFIG_ENTRIES = []
+PLAYER_CONFIG_ENTRIES = []
+PLAYER_FEATURES = []
+
+EVENT_WEBPLAYER_CMD = "webplayer command"
+EVENT_WEBPLAYER_STATE = "webplayer state"
+EVENT_WEBPLAYER_REGISTER = "webplayer register"
+
+
+async def async_setup(mass):
+ """Perform async setup of this Plugin/Provider."""
+ prov = MassPlayerProvider()
+ await mass.async_register_provider(prov)
+
+
+class MassPlayerProvider(PlayerProvider):
+ """
+ Built-in PlayerProvider.
+
+ Provides a single headless local player on the server using SoX.
+ Provides virtual players in the frontend using websockets.
+ """
+
+ @property
+ def id(self) -> str:
+ """Return provider ID for this provider."""
+ return PROV_ID
+
+ @property
+ def name(self) -> str:
+ """Return provider Name for this provider."""
+ return PROV_NAME
+
+ @property
+ def config_entries(self) -> List[ConfigEntry]:
+ """Return Config Entries for this provider."""
+ return []
+
+ async def async_on_start(self) -> bool:
+ """Handle initialization of the provider based on config."""
+ # add local sox player on the server
+ player = BuiltinLocalPlayer("server_player", f"Server: {get_hostname()}")
+ self.mass.add_job(self.mass.players.async_add_player(player))
+ # listen for websockets events to dynamically create players
+ self.mass.add_event_listener(
+ self.async_handle_mass_event,
+ [EVENT_WEBPLAYER_STATE, EVENT_WEBPLAYER_REGISTER],
+ )
+ self.mass.add_job(self.async_check_players())
+ return True
+
+ async def async_on_stop(self):
+ """Handle correct close/cleanup of the provider on exit."""
+ for player in self.players:
+ await player.async_cmd_stop()
+
+ async def async_handle_mass_event(self, msg, msg_details):
+ """Handle received event for the webplayer component."""
+ player = self.mass.players.get_player(msg_details["player_id"])
+ if not player:
+ # register new player
+ player = WebsocketsPlayer(
+ self.mass, msg_details["player_id"], msg_details["name"]
+ )
+ await self.mass.players.async_add_player(player)
+ await player.handle_player_state(msg_details)
+
+ @run_periodic(30)
+ async def async_check_players(self) -> None:
+ """Invalidate players that did not send a heartbeat message in a while."""
+ cur_time = time.time()
+ offline_players = []
+ for player in self.players:
+ if not isinstance(player, WebsocketsPlayer):
+ continue
+ if cur_time - player.last_message > 30:
+ offline_players.append(player.player_id)
+ for player_id in offline_players:
+ await self.mass.players.async_remove_player(player_id)
+
+ async def __async_handle_player_state(self, data):
+ """Handle state event from player."""
+ player_id = data["player_id"]
+ player = self.mass.players.get_player(player_id)
+ if "volume_level" in data:
+ player.volume_level = data["volume_level"]
+ if "muted" in data:
+ player.muted = data["muted"]
+ if "state" in data:
+ player.state = PlaybackState(data["state"])
+ if "cur_time" in data:
+ player.elapsed_time = data["elapsed_time"]
+ if "current_uri" in data:
+ player.current_uri = data["current_uri"]
+ if "powered" in data:
+ player.powered = data["powered"]
+ if "name" in data:
+ player.name = data["name"]
+ player.last_message = time.time()
+ player.update_state()
+
+
+class BuiltinLocalPlayer(Player):
+ """Representation of a local player on the server using SoX."""
+
+ def __init__(self, player_id: str, name: str) -> None:
+ """Initialize the built-in player."""
+ self._player_id = player_id
+ self._name = name
+ self._powered = False
+ self._elapsed_time = 0
+ self._state = PlaybackState.Stopped
+ self._current_uri = ""
+ self._volume_level = 100
+ self._muted = False
+ self._sox = None
+ self._progress_task = None
+
+ @property
+ def player_id(self) -> str:
+ """Return player id of this player."""
+ return self._player_id
+
+ @property
+ def provider_id(self) -> str:
+ """Return provider id of this player."""
+ return PROV_ID
+
+ @property
+ def name(self) -> str:
+ """Return name of the player."""
+ return self._name
+
+ @property
+ def powered(self) -> bool:
+ """Return current power state of player."""
+ return self._powered
+
+ @property
+ def elapsed_time(self) -> float:
+ """Return elapsed_time of current playing uri in seconds."""
+ return self._elapsed_time
+
+ @property
+ def state(self) -> PlaybackState:
+ """Return current PlaybackState of player."""
+ return self._state
+
+ @property
+ def available(self) -> bool:
+ """Return current availablity of player."""
+ return True
+
+ @property
+ def current_uri(self) -> str:
+ """Return currently loaded uri of player (if any)."""
+ return self._current_uri
+
+ @property
+ def volume_level(self) -> int:
+ """Return current volume level of player (scale 0..100)."""
+ return self._volume_level
+
+ @property
+ def muted(self) -> bool:
+ """Return current mute state of player."""
+ return self._muted
+
+ @property
+ def is_group_player(self) -> bool:
+ """Return True if this player is a group player."""
+ return False
+
+ @property
+ def device_info(self) -> DeviceInfo:
+ """Return the device info for this player."""
+ return DeviceInfo(
+ model="Demo", address="http://demo:12345", manufacturer=PROV_NAME
+ )
+
+ # SERVICE CALLS / PLAYER COMMANDS
+
+ async def async_cmd_play_uri(self, uri: str):
+ """Play the specified uri/url on the player."""
+ if self._sox:
+ await self.async_cmd_stop()
+ self._current_uri = uri
+ self._sox = subprocess.Popen(["play", "-t", "flac", "-q", uri])
+ self._state = PlaybackState.Playing
+ self._powered = True
+ self.update_state()
+
+ async def report_progress():
+ """Report fake progress while sox is playing."""
+ LOGGER.info("Playback started on player %s", self.name)
+ self._elapsed_time = 0
+ while self._sox and not self._sox.poll():
+ await asyncio.sleep(1)
+ self._elapsed_time += 1
+ self.update_state()
+ LOGGER.info("Playback stopped on player %s", self.name)
+ self._elapsed_time = 0
+ self._state = PlaybackState.Stopped
+ self.update_state()
+
+ if self._progress_task:
+ self._progress_task.cancel()
+ self._progress_task = self.mass.add_job(report_progress)
+
+ async def async_cmd_stop(self) -> None:
+ """Send STOP command to player."""
+ if self._sox:
+ self._sox.terminate()
+ self._sox = None
+ self._state = PlaybackState.Stopped
+ self.update_state()
+
+ async def async_cmd_play(self) -> None:
+ """Send PLAY command to player."""
+ if self._sox:
+ self._sox.send_signal(signal.SIGCONT)
+ self._state = PlaybackState.Playing
+ self.update_state()
+
+ async def async_cmd_pause(self):
+ """Send PAUSE command to given player."""
+ if self._sox:
+ self._sox.send_signal(signal.SIGSTOP)
+ self._state = PlaybackState.Paused
+ self.update_state()
+
+ async def async_cmd_power_on(self) -> None:
+ """Send POWER ON command to player."""
+ self._powered = True
+ self.update_state()
+
+ async def async_cmd_power_off(self) -> None:
+ """Send POWER OFF command to player."""
+ await self.async_cmd_stop()
+ self._powered = False
+ self.update_state()
+
+ async def async_cmd_volume_set(self, volume_level: int) -> None:
+ """
+ Send volume level command to given player.
+
+ :param volume_level: volume level to set (0..100).
+ """
+ self._volume_level = volume_level
+ self.update_state()
+
+ async def async_cmd_volume_mute(self, is_muted=False):
+ """
+ Send volume MUTE command to given player.
+
+ :param is_muted: bool with new mute state.
+ """
+ self._muted = is_muted
+ self.update_state()
+
+
+class WebsocketsPlayer(Player):
+ """
+ Implementation of a player using pure HTML/javascript.
+
+ Used in the front-end.
+ Communication is handled through the websocket connection
+ and our internal event bus.
+ """
+
+ def __init__(self, mass: MusicAssistantType, player_id: str, player_name: str):
+ """Initialize the webplayer."""
+ self._player_id = player_id
+ self._player_name = player_name
+ self._powered = True
+ self._elapsed_time = 0
+ self._state = PlaybackState.Stopped
+ self._current_uri = ""
+ self._volume_level = 100
+ self._muted = False
+ self.last_message = time.time()
+
+ async def handle_player_state(self, data: dict):
+ """Handle state event from player."""
+ if "volume_level" in data:
+ self._volume_level = data["volume_level"]
+ if "muted" in data:
+ self._muted = data["muted"]
+ if "state" in data:
+ self._state = PlaybackState(data["state"])
+ if "cur_time" in data:
+ self._elapsed_time = data["elapsed_time"]
+ if "current_uri" in data:
+ self._current_uri = data["current_uri"]
+ if "powered" in data:
+ self._powered = data["powered"]
+ if "name" in data:
+ self._player_name = data["name"]
+ self.last_message = time.time()
+ self.update_state()
+
+ @property
+ def player_id(self) -> str:
+ """Return player id of this player."""
+ return self._player_id
+
+ @property
+ def provider_id(self) -> str:
+ """Return provider id of this player."""
+ return PROV_ID
+
+ @property
+ def name(self) -> str:
+ """Return name of the player."""
+ return self._player_name
+
+ @property
+ def powered(self) -> bool:
+ """Return current power state of player."""
+ return self._powered
+
+ @property
+ def elapsed_time(self) -> int:
+ """Return elapsed time of current playing media in seconds."""
+ return self._elapsed_time
+
+ @property
+ def state(self) -> PlaybackState:
+ """Return current PlaybackState of player."""
+ return self._state
+
+ @property
+ def current_uri(self) -> str:
+ """Return currently loaded uri of player (if any)."""
+ return self._current_uri
+
+ @property
+ def volume_level(self) -> int:
+ """Return current volume level of player (scale 0..100)."""
+ return self._volume_level
+
+ @property
+ def muted(self) -> bool:
+ """Return current mute state of player."""
+ return self._muted
+
+ @property
+ def device_info(self) -> DeviceInfo:
+ """Return the device info for this player."""
+ return DeviceInfo()
+
+ @property
+ def should_poll(self) -> bool:
+ """Return True if this player should be polled for state updates."""
+ return False
+
+ @property
+ def features(self) -> List[PlayerFeature]:
+ """Return list of features this player supports."""
+ return PLAYER_FEATURES
+
+ @property
+ def config_entries(self) -> List[ConfigEntry]:
+ """Return player specific config entries (if any)."""
+ return PLAYER_CONFIG_ENTRIES
+
+ async def async_cmd_play_uri(self, uri: str) -> None:
+ """
+ Play the specified uri/url on the player.
+
+ :param uri: uri/url to send to the player.
+ """
+ data = {"player_id": self.player_id, "cmd": "play_uri", "uri": uri}
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def async_cmd_stop(self) -> None:
+ """Send STOP command to player."""
+ data = {"player_id": self.player_id, "cmd": "stop"}
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def async_cmd_play(self) -> None:
+ """Send PLAY command to player."""
+ data = {"player_id": self.player_id, "cmd": "play"}
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def async_cmd_pause(self) -> None:
+ """Send PAUSE command to player."""
+ data = {"player_id": self.player_id, "cmd": "pause"}
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def async_cmd_power_on(self) -> None:
+ """Send POWER ON command to player."""
+ data = {"player_id": self.player_id, "cmd": "power_on"}
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def async_cmd_power_off(self) -> None:
+ """Send POWER OFF command to player."""
+ data = {"player_id": self.player_id, "cmd": "power_off"}
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def async_cmd_volume_set(self, volume_level: int) -> None:
+ """
+ Send volume level command to player.
+
+ :param volume_level: volume level to set (0..100).
+ """
+ data = {
+ "player_id": self.player_id,
+ "cmd": "volume_set",
+ "volume_level": volume_level,
+ }
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
+
+ async def async_cmd_volume_mute(self, is_muted: bool = False) -> None:
+ """
+ Send volume MUTE command to given player.
+
+ :param is_muted: bool with new mute state.
+ """
+ data = {"player_id": self.player_id, "cmd": "volume_mute", "is_muted": is_muted}
+ self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
--- /dev/null
+"""Group player provider: enables grouping of all playertypes."""
+
+import asyncio
+import logging
+from typing import List
+
+from music_assistant.helpers.typing import MusicAssistantType
+from music_assistant.models.config_entry import ConfigEntry, ConfigEntryType
+from music_assistant.models.player import DeviceInfo, PlaybackState, Player
+from music_assistant.models.provider import PlayerProvider
+
+PROV_ID = "universal_group"
+PROV_NAME = "Universal Group player"
+LOGGER = logging.getLogger(PROV_ID)
+
+CONF_PLAYER_COUNT = "group_player_count"
+CONF_PLAYERS = "group_player_players"
+CONF_MASTER = "group_player_master"
+
+CONFIG_ENTRIES = [
+ ConfigEntry(
+ entry_key=CONF_PLAYER_COUNT,
+ entry_type=ConfigEntryType.INT,
+ description=CONF_PLAYER_COUNT,
+ default_value=1,
+ range=(0, 10),
+ )
+]
+
+
+async def async_setup(mass):
+ """Perform async setup of this Plugin/Provider."""
+ prov = GroupPlayerProvider()
+ await mass.async_register_provider(prov)
+
+
+class GroupPlayerProvider(PlayerProvider):
+ """PlayerProvider which allows users to group players."""
+
+ @property
+ def id(self) -> str:
+ """Return provider ID for this provider."""
+ return PROV_ID
+
+ @property
+ def name(self) -> str:
+ """Return provider Name for this provider."""
+ return PROV_NAME
+
+ @property
+ def config_entries(self) -> List[ConfigEntry]:
+ """Return Config Entries for this provider."""
+ return CONFIG_ENTRIES
+
+ async def async_on_start(self) -> bool:
+ """Handle initialization of the provider based on config."""
+ conf = self.mass.config.player_providers[PROV_ID]
+ for index in range(conf[CONF_PLAYER_COUNT]):
+ player = GroupPlayer(self.mass, index)
+ self.mass.add_job(self.mass.players.async_add_player(player))
+ return True
+
+ async def async_on_stop(self):
+ """Handle correct close/cleanup of the provider on exit. Called on shutdown."""
+ for player in self.players:
+ await player.async_cmd_stop()
+
+
+class GroupPlayer(Player):
+ """Model for a group player."""
+
+ def __init__(self, mass: MusicAssistantType, player_index: int):
+ """Initialize."""
+ self.mass = mass
+ self._player_index = player_index
+ self._player_id = f"{PROV_ID}_{player_index}"
+ self._provider_id = PROV_ID
+ self._name = f"{PROV_NAME} {player_index}"
+ self._powered = False
+ self._state = PlaybackState.Stopped
+ self._available = True
+ self._current_uri = ""
+ self._volume_level = 0
+ self._muted = False
+ self.connected_clients = {}
+ self.stream_task = None
+ self.sync_task = None
+ self._config_entries = self.__get_config_entries()
+ self._group_childs = self.__get_group_childs()
+
+ @property
+ def player_id(self) -> str:
+ """Return player id of this player."""
+ return self._player_id
+
+ @property
+ def provider_id(self) -> str:
+ """Return provider id of this player."""
+ return self._provider_id
+
+ @property
+ def name(self) -> str:
+ """Return name of the player."""
+ return self._name
+
+ @property
+ def powered(self) -> bool:
+ """Return current power state of player."""
+ return self._powered
+
+ @property
+ def state(self) -> PlaybackState:
+ """Return current PlaybackState of player."""
+ return self._state
+
+ @property
+ def available(self) -> bool:
+ """Return current availablity of player."""
+ return True
+
+ @property
+ def current_uri(self) -> str:
+ """Return currently loaded uri of player (if any)."""
+ return self._current_uri
+
+ @property
+ def volume_level(self) -> int:
+ """Return current volume level of player (scale 0..100)."""
+ return self._volume_level
+
+ @property
+ def muted(self) -> bool:
+ """Return current mute state of player."""
+ return self._muted
+
+ @property
+ def elapsed_time(self):
+ """Return elapsed time for first child player."""
+ if self.state in [PlaybackState.Playing, PlaybackState.Paused]:
+ for player_id in self.group_childs:
+ player = self.mass.players.get_player(player_id)
+ if player:
+ return player.elapsed_time
+ return 0
+
+ @property
+ def should_poll(self):
+ """Return True if this player should be polled for state."""
+ return True
+
+ @property
+ def is_group_player(self) -> bool:
+ """Return True if this player is a group player."""
+ return True
+
+ @property
+ def group_childs(self):
+ """Return group childs of this group player."""
+ return self._group_childs
+
+ @property
+ def device_info(self) -> DeviceInfo:
+ """Return deviceinfo."""
+ return DeviceInfo(
+ model="Group Player",
+ manufacturer=PROV_ID,
+ )
+
+ @property
+ def config_entries(self):
+ """Return config entries for this group player."""
+ return self._config_entries
+
+ async def async_on_update(self) -> None:
+ """Call when player is periodically polled by the player manager (should_poll=True)."""
+ self._config_entries = self.__get_config_entries()
+ self._group_childs = self.__get_group_childs()
+ self.update_state()
+
+ def __get_group_childs(self):
+ """Return group childs of this group player."""
+ player_conf = self.mass.config.get_player_config(self.player_id)
+ if player_conf and player_conf.get(CONF_PLAYERS):
+ return player_conf[CONF_PLAYERS]
+ return []
+
+ def __get_config_entries(self):
+ """Return config entries for this group player."""
+ all_players = [
+ {"text": item.name, "value": item.player_id}
+ for item in self.mass.players.player_states
+ if item.player_id is not self._player_id
+ ]
+ selected_players_ids = self.mass.config.get_player_config(self.player_id).get(
+ CONF_PLAYERS, []
+ )
+ # selected_players_ids = []
+ selected_players = []
+ for player_id in selected_players_ids:
+ player_state = self.mass.players.get_player_state(player_id)
+ if player_state:
+ selected_players.append(
+ {"text": player_state.name, "value": player_state.player_id}
+ )
+ default_master = ""
+ if selected_players:
+ default_master = selected_players[0]["value"]
+ return [
+ ConfigEntry(
+ entry_key=CONF_PLAYERS,
+ entry_type=ConfigEntryType.STRING,
+ default_value=[],
+ values=all_players,
+ label=CONF_PLAYERS,
+ description="group_player_players_desc",
+ multi_value=True,
+ ),
+ ConfigEntry(
+ entry_key=CONF_MASTER,
+ entry_type=ConfigEntryType.STRING,
+ default_value=default_master,
+ values=selected_players,
+ label=CONF_MASTER,
+ description="group_player_master_desc",
+ multi_value=False,
+ depends_on=CONF_PLAYERS,
+ ),
+ ]
+
+ # SERVICE CALLS / PLAYER COMMANDS
+
+ async def async_cmd_play_uri(self, uri: str):
+ """Play the specified uri/url on the player."""
+ await self.async_cmd_stop()
+ self._current_uri = uri
+ self._state = PlaybackState.Playing
+ self._powered = True
+ # forward this command to each child player
+ # TODO: Only start playing on powered players ?
+ # Monitor if a child turns on and join it to the sync ?
+ for child_player_id in self.group_childs:
+ child_player = self.mass.players.get_player(child_player_id)
+ if child_player:
+ queue_stream_uri = f"{self.mass.web.internal_url}/stream/group/{self.player_id}?player_id={child_player_id}"
+ await child_player.async_cmd_play_uri(queue_stream_uri)
+ self.update_state()
+ self.stream_task = self.mass.add_job(self.async_queue_stream_task())
+
+ async def async_cmd_stop(self) -> None:
+ """Send STOP command to player."""
+ self._state = PlaybackState.Stopped
+ if self.stream_task:
+ # cancel existing stream task if any
+ self.stream_task.cancel()
+ self.connected_clients = {}
+ await asyncio.sleep(0.5)
+ if self.sync_task:
+ self.sync_task.cancel()
+ # forward this command to each child player
+ # TODO: Only forward to powered child players
+ for child_player_id in self.group_childs:
+ child_player = self.mass.players.get_player(child_player_id)
+ if child_player:
+ await child_player.async_cmd_stop()
+ self.update_state()
+
+ async def async_cmd_play(self) -> None:
+ """Send PLAY command to player."""
+ if not self.state == PlaybackState.Paused:
+ return
+ # forward this command to each child player
+ for child_player_id in self.group_childs:
+ child_player = self.mass.players.get_player(child_player_id)
+ if child_player:
+ await child_player.async_cmd_play()
+ self._state = PlaybackState.Playing
+ self.update_state()
+
+ async def async_cmd_pause(self):
+ """Send PAUSE command to player."""
+ # forward this command to each child player
+ for child_player_id in self.group_childs:
+ child_player = self.mass.players.get_player(child_player_id)
+ if child_player:
+ await child_player.async_cmd_pause()
+ self._state = PlaybackState.Paused
+ self.update_state()
+
+ async def async_cmd_power_on(self) -> None:
+ """Send POWER ON command to player."""
+ self._powered = True
+ self.update_state()
+
+ async def async_cmd_power_off(self) -> None:
+ """Send POWER OFF command to player."""
+ await self.async_cmd_stop()
+ self._powered = False
+ self.update_state()
+
+ async def async_cmd_volume_set(self, volume_level: int) -> None:
+ """
+ Send volume level command to player.
+
+ :param volume_level: volume level to set (0..100).
+ """
+ # this is already handled by the player manager
+
+ async def async_cmd_volume_mute(self, is_muted=False):
+ """
+ Send volume MUTE command to given player.
+
+ :param is_muted: bool with new mute state.
+ """
+ for child_player_id in self.group_childs:
+ self.mass.players.async_cmd_volume_mute(child_player_id)
+ self.muted = is_muted
+
+ async def subscribe_stream_client(self, child_player_id):
+ """Handle streaming to all players of a group. Highly experimental."""
+
+ # each connected client gets its own Queue to which audio chunks (flac) are sent
+ try:
+ # report this client as connected
+ queue = asyncio.Queue()
+ self.connected_clients[child_player_id] = queue
+ LOGGER.debug(
+ "[%s] child player connected: %s",
+ self.player_id,
+ child_player_id,
+ )
+ # yield flac chunks from stdout to the http streamresponse
+ while True:
+ chunk = await queue.get()
+ yield chunk
+ queue.task_done()
+ if not chunk:
+ break
+ except (GeneratorExit, Exception): # pylint: disable=broad-except
+ LOGGER.warning(
+ "[%s] child player aborted stream: %s", self.player_id, child_player_id
+ )
+ self.connected_clients.pop(child_player_id, None)
+ else:
+ self.connected_clients.pop(child_player_id, None)
+ LOGGER.debug(
+ "[%s] child player completed streaming: %s",
+ self.player_id,
+ child_player_id,
+ )
+
+ async def async_queue_stream_task(self):
+ """Handle streaming queue to connected child players."""
+ ticks = 0
+ while ticks < 60 and len(self.connected_clients) != len(self.group_childs):
+ # TODO: Support situation where not all clients of the group are powered
+ await asyncio.sleep(0.1)
+ ticks += 1
+ if not self.connected_clients:
+ LOGGER.warning("no clients!")
+ return
+ LOGGER.debug(
+ "start queue stream with %s connected clients", len(self.connected_clients)
+ )
+ self.sync_task = asyncio.create_task(self.__synchronize_players())
+
+ async for audio_chunk in self.mass.streams.async_queue_stream_flac(
+ self.player_id
+ ):
+
+ # make sure we still have clients connected
+ if not self.connected_clients:
+ LOGGER.warning("no more clients!")
+ return
+
+ # send the audio chunk to all connected players
+ tasks = []
+ for _queue in self.connected_clients.values():
+ tasks.append(self.mass.add_job(_queue.put(audio_chunk)))
+ # wait for clients to consume the data
+ await asyncio.wait(tasks)
+
+ if not self.connected_clients:
+ LOGGER.warning("no more clients!")
+ return
+ self.sync_task.cancel()
+
+ async def __synchronize_players(self):
+ """Handle drifting/lagging by monitoring progress and compare to master player."""
+
+ master_player_id = self.mass.config.player_settings[self.player_id].get(
+ CONF_MASTER
+ )
+ master_player = self.mass.players.get_player(master_player_id)
+ if not master_player:
+ LOGGER.warning("Synchronization of playback aborted: no master player.")
+ return
+ LOGGER.debug(
+ "Synchronize playback of group using master player %s", master_player.name
+ )
+
+ # wait until master is playing
+ while master_player.state != PlaybackState.Playing:
+ await asyncio.sleep(0.1)
+ await asyncio.sleep(0.5)
+
+ prev_lags = {}
+ prev_drifts = {}
+
+ while self.connected_clients:
+
+ # check every 0.5 seconds for player sync
+ await asyncio.sleep(0.5)
+
+ for child_player_id in self.connected_clients:
+
+ if child_player_id == master_player_id:
+ continue
+ child_player = self.mass.players.get_player(child_player_id)
+
+ if (
+ not child_player
+ or child_player.state != PlaybackState.Playing
+ or child_player.elapsed_milliseconds is None
+ ):
+ continue
+
+ if child_player_id not in prev_lags:
+ prev_lags[child_player_id] = []
+ if child_player_id not in prev_drifts:
+ prev_drifts[child_player_id] = []
+
+ # calculate lag (player is too slow in relation to the master)
+ lag = (
+ master_player.elapsed_milliseconds
+ - child_player.elapsed_milliseconds
+ )
+ prev_lags[child_player_id].append(lag)
+ if len(prev_lags[child_player_id]) == 5:
+ # if we have 5 samples calclate the average lag
+ avg_lag = sum(prev_lags[child_player_id]) / len(
+ prev_lags[child_player_id]
+ )
+ prev_lags[child_player_id] = []
+ if avg_lag > 25:
+ LOGGER.debug(
+ "child player %s is lagging behind with %s milliseconds",
+ child_player_id,
+ avg_lag,
+ )
+ # we correct the lag by pausing the master player for a very short time
+ await master_player.async_cmd_pause()
+ # sending the command takes some time, account for that too
+ if avg_lag > 20:
+ sleep_time = avg_lag - 20
+ await asyncio.sleep(sleep_time / 1000)
+ asyncio.create_task(master_player.async_cmd_play())
+ break # no more processing this round if we've just corrected a lag
+
+ # calculate drift (player is going faster in relation to the master)
+ drift = (
+ child_player.elapsed_milliseconds
+ - master_player.elapsed_milliseconds
+ )
+ prev_drifts[child_player_id].append(drift)
+ if len(prev_drifts[child_player_id]) == 5:
+ # if we have 5 samples calculate the average drift
+ avg_drift = sum(prev_drifts[child_player_id]) / len(
+ prev_drifts[child_player_id]
+ )
+ prev_drifts[child_player_id] = []
+
+ if avg_drift > 25:
+ LOGGER.debug(
+ "child player %s is drifting ahead with %s milliseconds",
+ child_player_id,
+ avg_drift,
+ )
+ # we correct the drift by pausing the player for a very short time
+ # this is not the best approach but works with all playertypes
+ # temporary solution until I find something better like sending more/less pcm chunks
+ await child_player.async_cmd_pause()
+ # sending the command takes some time, account for that too
+ if avg_drift > 20:
+ sleep_time = drift - 20
+ await asyncio.sleep(sleep_time / 1000)
+ await child_player.async_cmd_play()
+ break # no more processing this round if we've just corrected a lag
--- /dev/null
+{
+ "en": {
+ "Universal Group player": "Universal Group Player",
+ "group_player_count": "Number of group players",
+ "group_player_count_desc": "Select how many Universal group players should be created.",
+ "group_player_players": "Players in group",
+ "group_player_players_desc": "Select the players that should be part of this group.",
+ "group_player_master": "Group master",
+ "group_player_master_desc": "Select the player that should act as group master."
+ },
+ "nl": {
+ "Universal Group player": "Universele groep speler",
+ "group_player_count": "Aantal groep spelers",
+ "group_player_count_desc": "Selecteer hoeveel groep spelers er aangemaakt moeten worden.",
+ "group_player_players": "Groepsspelers",
+ "group_player_players_desc": "Selecteer de spelers die deel uitmaken van deze groep.",
+ "group_player_master": "Groepsbeheerder",
+ "group_player_master_desc": "Selecteer de speler die dient als groepsbeheerder."
+ }
+}
\ No newline at end of file
+++ /dev/null
-"""Webplayer support."""
-import logging
-import time
-from typing import List
-
-from music_assistant.helpers.typing import MusicAssistantType
-from music_assistant.helpers.util import run_periodic
-from music_assistant.models.config_entry import ConfigEntry
-from music_assistant.models.player import (
- DeviceInfo,
- PlaybackState,
- Player,
- PlayerFeature,
-)
-from music_assistant.models.provider import PlayerProvider
-
-PROV_ID = "webplayer"
-PROV_NAME = "WebPlayer"
-LOGGER = logging.getLogger(PROV_ID)
-
-CONFIG_ENTRIES = []
-PLAYER_CONFIG_ENTRIES = []
-PLAYER_FEATURES = []
-
-EVENT_WEBPLAYER_CMD = "webplayer command"
-EVENT_WEBPLAYER_STATE = "webplayer state"
-EVENT_WEBPLAYER_REGISTER = "webplayer register"
-
-
-async def async_setup(mass):
- """Perform async setup of this Plugin/Provider."""
- prov = WebPlayerProvider()
- await mass.async_register_provider(prov)
-
-
-class WebPlayerProvider(PlayerProvider):
- """
- Implementation of a player using pure HTML/javascript.
-
- Used in the front-end.
- Communication is handled through the websocket connection
- and our internal event bus.
- """
-
- _players = {}
-
- ### Provider specific implementation #####
-
- @property
- def id(self) -> str:
- """Return provider ID for this provider."""
- return PROV_ID
-
- @property
- def name(self) -> str:
- """Return provider Name for this provider."""
- return PROV_NAME
-
- @property
- def config_entries(self) -> List[ConfigEntry]:
- """Return Config Entries for this provider."""
- return CONFIG_ENTRIES
-
- async def async_on_start(self) -> bool:
- """Handle initialization of the provider based on config."""
- self.mass.add_event_listener(
- self.async_handle_mass_event,
- [EVENT_WEBPLAYER_STATE, EVENT_WEBPLAYER_REGISTER],
- )
- self.mass.add_job(self.async_check_players())
-
- async def async_handle_mass_event(self, msg, msg_details):
- """Handle received event for the webplayer component."""
- player = self.mass.players.get_player(msg_details["player_id"])
- if not player:
- # register new player
- player = WebPlayer(self.mass, msg_details["player_id"], msg_details["name"])
- await self.mass.players.async_add_player(player)
- await player.handle_player_state(msg_details)
-
- @run_periodic(30)
- async def async_check_players(self) -> None:
- """Invalidate players that did not send a heartbeat message in a while."""
- cur_time = time.time()
- offline_players = []
- for player in self.players:
- if cur_time - player.last_message > 30:
- offline_players.append(player.player_id)
- for player_id in offline_players:
- await self.mass.players.async_remove_player(player_id)
-
- async def __async_handle_player_state(self, data):
- """Handle state event from player."""
- player_id = data["player_id"]
- player = self._players[player_id]
- if "volume_level" in data:
- player.volume_level = data["volume_level"]
- if "muted" in data:
- player.muted = data["muted"]
- if "state" in data:
- player.state = PlaybackState(data["state"])
- if "cur_time" in data:
- player.elapsed_time = data["elapsed_time"]
- if "current_uri" in data:
- player.current_uri = data["current_uri"]
- if "powered" in data:
- player.powered = data["powered"]
- if "name" in data:
- player.name = data["name"]
- player.last_message = time.time()
- self.mass.add_job(self.mass.players.async_update_player(player))
-
-
-class WebPlayer(Player):
- """Definition of a webplayer."""
-
- def __init__(self, mass: MusicAssistantType, player_id: str, player_name: str):
- """Initialize the webplayer."""
- self._player_id = player_id
- self._player_name = player_name
- self._powered = True
- self._elapsed_time = 0
- self._state = PlaybackState.Stopped
- self._current_uri = ""
- self._volume_level = 100
- self._muted = False
- self.last_message = time.time()
-
- async def handle_player_state(self, data: dict):
- """Handle state event from player."""
- if "volume_level" in data:
- self._volume_level = data["volume_level"]
- if "muted" in data:
- self._muted = data["muted"]
- if "state" in data:
- self._state = PlaybackState(data["state"])
- if "cur_time" in data:
- self._elapsed_time = data["elapsed_time"]
- if "current_uri" in data:
- self._current_uri = data["current_uri"]
- if "powered" in data:
- self._powered = data["powered"]
- if "name" in data:
- self._player_name = data["name"]
- self.last_message = time.time()
- self.update_state()
-
- @property
- def player_id(self) -> str:
- """Return player id of this player."""
- return self._player_id
-
- @property
- def provider_id(self) -> str:
- """Return provider id of this player."""
- return PROV_ID
-
- @property
- def name(self) -> str:
- """Return name of the player."""
- return self._player_name
-
- @property
- def powered(self) -> bool:
- """Return current power state of player."""
- return self._powered
-
- @property
- def elapsed_time(self) -> int:
- """Return elapsed time of current playing media in seconds."""
- return self._elapsed_time
-
- @property
- def state(self) -> PlaybackState:
- """Return current PlaybackState of player."""
- return self._state
-
- @property
- def current_uri(self) -> str:
- """Return currently loaded uri of player (if any)."""
- return self._current_uri
-
- @property
- def volume_level(self) -> int:
- """Return current volume level of player (scale 0..100)."""
- return self._volume_level
-
- @property
- def muted(self) -> bool:
- """Return current mute state of player."""
- return self._muted
-
- @property
- def device_info(self) -> DeviceInfo:
- """Return the device info for this player."""
- return DeviceInfo()
-
- @property
- def should_poll(self) -> bool:
- """Return True if this player should be polled for state updates."""
- return False
-
- @property
- def features(self) -> List[PlayerFeature]:
- """Return list of features this player supports."""
- return PLAYER_FEATURES
-
- @property
- def config_entries(self) -> List[ConfigEntry]:
- """Return player specific config entries (if any)."""
- return PLAYER_CONFIG_ENTRIES
-
- async def async_cmd_play_uri(self, uri: str) -> None:
- """
- Play the specified uri/url on the player.
-
- :param uri: uri/url to send to the player.
- """
- data = {"player_id": self.player_id, "cmd": "play_uri", "uri": uri}
- self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def async_cmd_stop(self) -> None:
- """Send STOP command to player."""
- data = {"player_id": self.player_id, "cmd": "stop"}
- self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def async_cmd_play(self) -> None:
- """Send PLAY command to player."""
- data = {"player_id": self.player_id, "cmd": "play"}
- self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def async_cmd_pause(self) -> None:
- """Send PAUSE command to player."""
- data = {"player_id": self.player_id, "cmd": "pause"}
- self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def async_cmd_power_on(self) -> None:
- """Send POWER ON command to player."""
- data = {"player_id": self.player_id, "cmd": "power_on"}
- self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def async_cmd_power_off(self) -> None:
- """Send POWER OFF command to player."""
- data = {"player_id": self.player_id, "cmd": "power_off"}
- self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def async_cmd_volume_set(self, volume_level: int) -> None:
- """
- Send volume level command to player.
-
- :param volume_level: volume level to set (0..100).
- """
- data = {
- "player_id": self.player_id,
- "cmd": "volume_set",
- "volume_level": volume_level,
- }
- self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
-
- async def async_cmd_volume_mute(self, is_muted: bool = False) -> None:
- """
- Send volume MUTE command to given player.
-
- :param is_muted: bool with new mute state.
- """
- data = {"player_id": self.player_id, "cmd": "volume_mute", "is_muted": is_muted}
- self.mass.signal_event(EVENT_WEBPLAYER_CMD, data)
"https_port": "HTTPS Port",
"ssl_certificate": "SSL Certificate file location",
"ssl_key": "Path to certificate key file",
+ "external_url": "External URL",
+ "group_delay": "Correction of groupdelay",
+ "web": "Webserver",
+ "security": "Security",
"desc_sample_rate": "Set the maximum sample rate this player can handle.",
"desc_volume_normalisation": "Enable R128 volume normalisation to play music at an equally loud volume.",
"desc_ssl_key": "Supply the full path to the file containing the private key.",
"desc_external_url": "Supply the full URL how this Music Assistant instance can be accessed from outside. Make sure this matches the common name of the certificate.",
"desc_base_username": "Username to access this Music Assistant server.",
- "desc_base_password": "A password to protect this Music Assistant server. Can be left blank but this is extremely dangerous if this server is reachable from outside."
+ "desc_base_password": "A password to protect this Music Assistant server. Can be left blank but this is extremely dangerous if this server is reachable from outside.",
+ "desc_group_delay": "Only used on grouped playback. Adjust the delay of the grouped playback on this player"
},
"nl": {
"enabled": "Ingeschakeld",
"https_port": "HTTPS Port",
"ssl_certificate": "SSL Certificaat bestandslocatie",
"ssl_key": "Pad naar het certificaat key bestand",
+ "external_url": "External URL",
+ "web": "Webserver",
+ "security": "Beveiliging",
+ "group_delay": "Correctie van groepsvertraging",
"desc_sample_rate": "Stel de maximale sample rate in die deze speler aankan.",
"desc_volume_normalisation": "R128 volume normalisatie inschakelen om muziek altijd op een gelijk volume af te spelen.",
"desc_ssl_key": "Geef het pad om naar het bestand met de private key.",
"desc_external_url": "Geef de URL waarop deze Music Assistant server extern te benaderen is. Zorg dat dit overeenomst met het certificaat.",
"desc_base_username": "Gebruikersnaam waarmee deze server beveiligd moet worden.",
- "desc_base_password": "Wachtwoord waarmee deze server beveiligd moet worden. Mag worden leeggelaten maar dit is extreem gevaarlijk indien je besluit de server extern toegankelijk te maken."
+ "desc_base_password": "Wachtwoord waarmee deze server beveiligd moet worden. Mag worden leeggelaten maar dit is extreem gevaarlijk indien je besluit de server extern toegankelijk te maken.",
+ "desc_group_delay": "Gebruikt bij afspelen in groep. Pas de vertraging aan voor deze player."
}
}