"players/queue/repeat", queue_id=queue_id, repeat_mode=repeat_mode
)
- async def queue_command_crossfade(self, queue_id: str, crossfade_enabled: bool) -> None:
- """Configure crossfade mode on the the queue."""
- await self.client.send_command(
- "players/queue/crossfade", queue_id=queue_id, crossfade_enabled=crossfade_enabled
- )
-
async def play_media(
self,
queue_id: str,
from music_assistant.common.models.enums import ProviderType
from music_assistant.constants import (
CONF_AUTO_PLAY,
+ CONF_CROSSFADE,
CONF_CROSSFADE_DURATION,
CONF_EQ_BASS,
CONF_EQ_MID,
CONF_HIDE_GROUP_CHILDS,
CONF_LOG_LEVEL,
CONF_OUTPUT_CHANNELS,
- CONF_OUTPUT_CODEC,
CONF_VOLUME_NORMALIZATION,
CONF_VOLUME_NORMALIZATION_TARGET,
SECURE_STRING_SUBSTITUTE,
# some reusable player config entries
-CONF_ENTRY_OUTPUT_CODEC = ConfigEntry(
- key=CONF_OUTPUT_CODEC,
- type=ConfigEntryType.STRING,
- label="Output codec",
- options=[
- ConfigValueOption("FLAC (lossless, compact file size)", "flac"),
- ConfigValueOption("AAC (lossy, superior quality)", "aac"),
- ConfigValueOption("MP3 (lossy, average quality)", "mp3"),
- ConfigValueOption("WAV (lossless, huge file size)", "wav"),
- ConfigValueOption("PCM (lossless, huge file size)", "pcm"),
- ],
- default_value="flac",
- description="Define the codec that is sent to the player when streaming audio. "
- "By default Music Assistant prefers FLAC because it is lossless, has a "
- "respectable filesize and is supported by most player devices. "
- "Change this setting only if needed for your device/environment.",
- advanced=True,
-)
-
CONF_ENTRY_FLOW_MODE = ConfigEntry(
key=CONF_FLOW_MODE,
type=ConfigEntryType.BOOLEAN,
label="Enable queue flow mode",
default_value=False,
description='Enable "flow" mode where all queue tracks are sent as a continuous '
- "audio stream. Use for players that do not natively support gapless and/or "
+ "audio stream. \nUse for players that do not natively support gapless and/or "
"crossfading or if the player has trouble transitioning between tracks.",
advanced=False,
)
+
CONF_ENTRY_AUTO_PLAY = ConfigEntry(
key=CONF_AUTO_PLAY,
type=ConfigEntryType.BOOLEAN,
advanced=False,
)
+CONF_ENTRY_CROSSFADE = ConfigEntry(
+ key=CONF_CROSSFADE,
+ type=ConfigEntryType.BOOLEAN,
+ label="Enable crossfade",
+ default_value=False,
+ description="Enable a crossfade transition between (queue) tracks.",
+ advanced=False,
+)
+
CONF_ENTRY_CROSSFADE_DURATION = ConfigEntry(
key=CONF_CROSSFADE_DURATION,
type=ConfigEntryType.INTEGER,
- range=(1, 20),
+ range=(1, 10),
default_value=8,
label="Crossfade duration",
description="Duration in seconds of the crossfade between tracks (if enabled)",
- depends_on=CONF_FLOW_MODE,
+ depends_on=CONF_CROSSFADE,
advanced=True,
)
-
-
-DEFAULT_PLAYER_CONFIG_ENTRIES = (
- CONF_ENTRY_VOLUME_NORMALIZATION,
- CONF_ENTRY_FLOW_MODE,
- CONF_ENTRY_AUTO_PLAY,
- CONF_ENTRY_OUTPUT_CODEC,
- CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
- CONF_ENTRY_EQ_BASS,
- CONF_ENTRY_EQ_MID,
- CONF_ENTRY_EQ_TREBLE,
- CONF_ENTRY_OUTPUT_CHANNELS,
- CONF_ENTRY_CROSSFADE_DURATION,
-)
SYNC = "sync"
ACCURATE_TIME = "accurate_time"
SEEK = "seek"
- SET_MEMBERS = "set_members"
- QUEUE = "queue"
+ ENQUEUE_NEXT = "enqueue_next"
CROSSFADE = "crossfade"
elapsed_time: float = 0
elapsed_time_last_updated: float = time.time()
- current_url: str | None = None
state: PlayerState = PlayerState.IDLE
volume_level: int = 100
# otherwise it will be set to the own player_id
active_source: str = ""
+ # current_item_id: return item_id/uri of the current active/loaded item on the player
+ # this may be a MA queue_item_id, url, uri or some provider specific string
+ current_item_id: str | None = None
+
# can_sync_with: return tuple of player_ids that can be synced to/with this player
# usually this is just a list of all player_ids within the playerprovider
can_sync_with: tuple[str, ...] = field(default=tuple())
shuffle_enabled: bool = False
repeat_mode: RepeatMode = RepeatMode.OFF
- crossfade_enabled: bool = True
# current_index: index that is active (e.g. being played) by the player
current_index: int | None = None
# index_in_buffer: index that has been preloaded/buffered by the player
radio_source: list[MediaItemType] = field(default_factory=list)
announcement_in_progress: bool = False
flow_mode: bool = False
+ # flow_mode_start_index: index of the first item of the flow stream
+ flow_mode_start_index: int = 0
+ next_track_enqueued: bool = False
@property
def corrected_elapsed_time(self) -> float:
CONF_FLOW_MODE: Final[str] = "flow_mode"
CONF_LOG_LEVEL: Final[str] = "log_level"
CONF_HIDE_GROUP_CHILDS: Final[str] = "hide_group_childs"
-CONF_OUTPUT_CODEC: Final[str] = "output_codec"
CONF_GROUPED_POWER_ON: Final[str] = "grouped_power_on"
CONF_CROSSFADE_DURATION: Final[str] = "crossfade_duration"
CONF_BIND_IP: Final[str] = "bind_ip"
CONF_BIND_PORT: Final[str] = "bind_port"
CONF_PUBLISH_IP: Final[str] = "publish_ip"
CONF_AUTO_PLAY: Final[str] = "auto_play"
+CONF_CROSSFADE: Final[str] = "crossfade"
# config default values
DEFAULT_HOST: Final[str] = "0.0.0.0"
from music_assistant.common.models import config_entries
from music_assistant.common.models.config_entries import (
DEFAULT_CORE_CONFIG_ENTRIES,
- DEFAULT_PLAYER_CONFIG_ENTRIES,
DEFAULT_PROVIDER_CONFIG_ENTRIES,
ConfigEntry,
ConfigValueType,
"""Return configuration for a single player."""
if raw_conf := self.get(f"{CONF_PLAYERS}/{player_id}"):
if prov := self.mass.get_provider(raw_conf["provider"]):
- prov_entries = await prov.get_player_config_entries(player_id)
+ conf_entries = await prov.get_player_config_entries(player_id)
if player := self.mass.players.get(player_id, False):
raw_conf["default_name"] = player.display_name
else:
- prov_entries = tuple()
+ conf_entries = tuple()
raw_conf["available"] = False
raw_conf["name"] = raw_conf.get("name")
raw_conf["default_name"] = raw_conf.get("default_name") or raw_conf["player_id"]
- prov_entries_keys = {x.key for x in prov_entries}
- # combine provider defined entries with default player config entries
- entries = prov_entries + tuple(
- x for x in DEFAULT_PLAYER_CONFIG_ENTRIES if x.key not in prov_entries_keys
- )
- return PlayerConfig.parse(entries, raw_conf)
+ return PlayerConfig.parse(conf_entries, raw_conf)
raise KeyError(f"No config found for player id {player_id}")
@api_command("config/players/get_value")
- async def get_player_config_value(self, player_id: str, key: str) -> ConfigValueType:
+ async def get_player_config_value(
+ self,
+ player_id: str,
+ key: str,
+ ) -> ConfigValueType:
"""Return single configentry value for a player."""
- cache_key = f"player_conf_value_{player_id}.{key}"
- if (cached_value := self._value_cache.get(cache_key)) and cached_value is not None:
- return cached_value
conf = await self.get_player_config(player_id)
val = (
conf.values[key].value
if conf.values[key].value is not None
else conf.values[key].default_value
)
- # store value in cache because this method can potentially be called very often
- self._value_cache[cache_key] = val
return val
def get_raw_player_config_value(
import random
import time
from collections.abc import AsyncGenerator
+from contextlib import suppress
from typing import TYPE_CHECKING, Any
from music_assistant.common.helpers.util import get_changed_keys
from music_assistant.common.models.media_items import MediaItemType, media_from_dict
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_FLOW_MODE, FALLBACK_DURATION, ROOT_LOGGER_NAME
+from music_assistant.constants import FALLBACK_DURATION, ROOT_LOGGER_NAME
from music_assistant.server.helpers.api import api_command
-from music_assistant.server.helpers.audio import get_stream_details
+from music_assistant.server.helpers.audio import set_stream_details
from music_assistant.server.models.core_controller import CoreController
if TYPE_CHECKING:
queue.repeat_mode = repeat_mode
self.signal_update(queue_id)
- @api_command("players/queue/crossfade")
- def set_crossfade(self, queue_id: str, crossfade_enabled: bool) -> None:
- """Configure crossfade setting on the the queue."""
- queue = self._queues[queue_id]
- if queue.crossfade_enabled == crossfade_enabled:
- return # no change
- queue.crossfade_enabled = crossfade_enabled
- self.signal_update(queue_id)
-
@api_command("players/queue/play_media")
async def play_media(
self,
if self._queues[queue_id].announcement_in_progress:
LOGGER.warning("Ignore queue command for %s because an announcement is in progress.")
return
+ player = self.mass.players.get(queue_id, True)
+ if PlayerFeature.PAUSE not in player.supported_features:
+ # if player does not support pause, we need to send stop
+ await self.stop(queue_id)
+ return
# simply forward the command to underlying player
await self.mass.players.cmd_pause(queue_id)
- queue_id: queue_id of the queue to handle the command.
"""
current_index = self._queues[queue_id].current_index
- next_index = self.get_next_index(queue_id, current_index, True)
+ next_index = self._get_next_index(queue_id, current_index, True)
if next_index is None:
return
await self.play_index(queue_id, next_index)
raise FileNotFoundError(f"Unknown index/id: {index}")
queue.current_index = index
queue.index_in_buffer = index
- # execute the play_media command on the player
- queue_player = self.mass.players.get(queue_id)
- need_multi_stream = (
- queue_player.provider in ("airplay", "ugp", "slimproto")
- and len(queue_player.group_childs) > 1
- )
+ queue.flow_mode_start_index = index
+ queue.flow_mode = False # reset
player_prov = self.mass.players.get_player_provider(queue_id)
- if need_multi_stream:
- # handle special multi client stream
- queue.flow_mode = True
- stream_job = await self.mass.streams.create_multi_client_stream_job(
- queue_id=queue_id,
- start_queue_item=queue_item,
- seek_position=int(seek_position),
- fade_in=fade_in,
- )
- await player_prov.cmd_handle_stream_job(player_id=queue_id, stream_job=stream_job)
- return
- # regular stream
- queue.flow_mode = await self.mass.config.get_player_config_value(
- queue.queue_id, CONF_FLOW_MODE
- )
- url = await self.mass.streams.resolve_stream_url(
- queue_id=queue_id,
+ await player_prov.play_media(
+ player_id=queue_id,
queue_item=queue_item,
seek_position=int(seek_position),
fade_in=fade_in,
- flow_mode=queue.flow_mode,
- )
- await player_prov.cmd_play_url(
- player_id=queue_id,
- url=url,
- # set queue_item to None if we're sending a flow mode url
- # as the metadata is rather useless then
- queue_item=None if queue.flow_mode else queue_item,
)
# Interaction with player
def on_player_update(
self, player: Player, changed_values: dict[str, tuple[Any, Any]] # noqa: ARG002
) -> None:
- """Call when a PlayerQueue needs to be updated (e.g. when player updates)."""
+ """
+ Call when a PlayerQueue needs to be updated (e.g. when player updates).
+
+ NOTE: This is called every second if the player is playing.
+ """
if player.player_id not in self._queues:
# race condition
return
queue.items = len(self._queue_items[queue_id])
# determine if this queue is currently active for this player
queue.active = player.active_source == queue.queue_id
- if queue.active:
- queue.state = player.state
- # update current item from player report
- player_item_index = self._get_player_item_index(queue_id, player.current_url)
- if player_item_index is not None:
- if queue.flow_mode:
- # flow mode active, calculate current item
- current_index, item_time = self.__get_queue_stream_index(
- queue, player, player_item_index
- )
- else:
- # queue is active and player has one of our tracks loaded, update state
- current_index = player_item_index
- item_time = int(player.corrected_elapsed_time)
- # only update these attributes if the queue is active
- # and has an item loaded so we are able to resume it
- queue.current_index = current_index
- queue.elapsed_time = item_time
- queue.elapsed_time_last_updated = time.time()
- queue.current_item = self.get_item(queue_id, queue.current_index)
- queue.next_item = self.get_next_item(queue_id)
- # correct elapsed time when seeking
- if (
- queue.current_item
- and queue.current_item.streamdetails
- and queue.current_item.streamdetails.seconds_skipped
- and not queue.flow_mode
- ):
- queue.elapsed_time += queue.current_item.streamdetails.seconds_skipped
- else:
+ if not queue.active:
queue.state = PlayerState.IDLE
+ return
+ # update current item from player report
+ if queue.flow_mode:
+ # flow mode active, calculate current item
+ queue.current_index, queue.elapsed_time = self.__get_queue_stream_index(queue, player)
+ else:
+ # queue is active and player has one of our tracks loaded, update state
+ if item_id := self._parse_player_current_item_id(queue_id, player.current_item_id):
+ queue.current_index = self.index_by_id(queue_id, item_id)
+ queue.elapsed_time = int(player.corrected_elapsed_time)
+
+ # only update these attributes if the queue is active
+ # and has an item loaded so we are able to resume it
+ queue.state = player.state
+ queue.elapsed_time_last_updated = time.time()
+ queue.current_item = self.get_item(queue_id, queue.current_index)
+ queue.next_item = self._get_next_item(queue_id)
+ # correct elapsed time when seeking
+ if (
+ queue.current_item
+ and queue.current_item.streamdetails
+ and queue.current_item.streamdetails.seconds_skipped
+ and not queue.flow_mode
+ ):
+ queue.elapsed_time += queue.current_item.streamdetails.seconds_skipped
+
# basic throttle: do not send state changed events if queue did not actually change
prev_state = self._prev_states.get(queue_id, {})
new_state = queue.to_dict()
# return early if nothing changed
if len(changed_keys) == 0:
return
+ # handle enqueuing of next item to play
+ if not queue.flow_mode:
+ self._check_enqueue_next(player, queue, prev_state, new_state)
# do not send full updates if only time was updated
if changed_keys == {"elapsed_time"}:
self.mass.signal_event(
self._queues.pop(player_id, None)
self._queue_items.pop(player_id, None)
- async def preload_next_url(
- self, queue_id: str, current_item_id: str | None = None
- ) -> tuple[str, QueueItem, bool]:
- """Call when a player wants to load the next track/url into the buffer.
+ async def preload_next_item(
+ self, queue_id: str, current_item_id_or_index: str | int | None = None
+ ) -> QueueItem:
+ """Call when a player wants to (pre)load the next item into the buffer.
- The result is a tuple of the next url + QueueItem to Play,
- and a bool if the player should crossfade (if supported).
Raises QueueEmpty if there are no more tracks left.
"""
queue = self.get(queue_id)
if not queue:
raise PlayerUnavailableError(f"PlayerQueue {queue_id} is not available")
- if current_item_id:
- cur_index = self.index_by_id(queue_id, current_item_id) or 0
- else:
+ if current_item_id_or_index is None:
cur_index = queue.index_in_buffer or queue.current_index or 0
- cur_item = self.get_item(queue_id, cur_index)
+ elif isinstance(current_item_id_or_index, str):
+ cur_index = self.index_by_id(queue_id, current_item_id_or_index)
+ else:
+ cur_index = current_item_id_or_index
idx = 0
while True:
- next_index = self.get_next_index(queue_id, cur_index + idx)
- next_item = self.get_item(queue_id, next_index)
- if not cur_item or not next_item:
+ next_index = self._get_next_index(queue_id, cur_index + idx)
+ if next_index is None:
raise QueueEmpty("No more tracks left in the queue.")
+ next_item = self.get_item(queue_id, next_index)
try:
# Check if the QueueItem is playable. For example, YT Music returns Radio Items
# that are not playable which will stop playback.
- next_item.streamdetails = await get_stream_details(
- mass=self.mass, queue_item=next_item
- )
+ await set_stream_details(mass=self.mass, queue_item=next_item)
# Lazy load the full MediaItem for the QueueItem, making sure to get the
# maximum quality of thumbs
next_item.media_item = await self.mass.music.get_item_by_uri(next_item.uri)
break
except MediaNotFoundError:
# No stream details found, skip this QueueItem
+ next_item = None
idx += 1
+ if next_item is None:
+ raise QueueEmpty("No more (playable) tracks left in the queue.")
queue.index_in_buffer = next_index
- # work out crossfade
- crossfade = queue.crossfade_enabled
- if (
- cur_item.media_type == MediaType.TRACK
- and next_item.media_type == MediaType.TRACK
- and cur_item.media_item.album == next_item.media_item.album
- ):
- # disable crossfade if playing tracks from same album
- # TODO: make this a bit more intelligent.
- crossfade = False
- url = await self.mass.streams.resolve_stream_url(
- queue_id=queue_id,
- queue_item=next_item,
- )
- return (url, next_item, crossfade)
+ queue.next_track_enqueued = True
+ return next_item
# Main queue manipulation methods
return next((x for x in queue_items if x.queue_item_id == item_id_or_index), None)
return None
- def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
- """Get index by queue_item_id."""
- queue_items = self._queue_items[queue_id]
- for index, item in enumerate(queue_items):
- if item.queue_item_id == queue_item_id:
- return index
- return None
-
- def get_next_index(self, queue_id: str, cur_index: int | None, is_skip: bool = False) -> int:
- """Return the next index for the queue, accounting for repeat settings."""
- queue = self._queues[queue_id]
- queue_items = self._queue_items[queue_id]
- # handle repeat single track
- if queue.repeat_mode == RepeatMode.ONE and not is_skip:
- return cur_index
- # handle repeat all
- if (
- queue.repeat_mode == RepeatMode.ALL
- and queue_items
- and cur_index == (len(queue_items) - 1)
- ):
- return 0
- # simply return the next index. other logic is guarded to detect the index
- # being higher than the number of items to detect end of queue and/or handle repeat.
- if cur_index is None:
- return 0
- next_index = cur_index + 1
- return next_index
-
- def get_next_item(self, queue_id: str, cur_index: int | None = None) -> QueueItem | None:
- """Return next QueueItem for given queue."""
- queue = self._queues[queue_id]
- if cur_index is None:
- cur_index = queue.current_index
- next_index = self.get_next_index(queue_id, cur_index)
- return self.get_item(queue_id, next_index)
-
def signal_update(self, queue_id: str, items_changed: bool = False) -> None:
"""Signal state changed of given queue."""
queue = self._queues[queue_id]
)
)
+ def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
+ """Get index by queue_item_id."""
+ queue_items = self._queue_items[queue_id]
+ for index, item in enumerate(queue_items):
+ if item.queue_item_id == queue_item_id:
+ return index
+ return None
+
+ def _get_next_index(
+ self, queue_id: str, cur_index: int | None, is_skip: bool = False
+ ) -> int | None:
+ """
+ Return the next index for the queue, accounting for repeat settings.
+
+ Will return None if there are no (more) items in the queue.
+ """
+ queue = self._queues[queue_id]
+ queue_items = self._queue_items[queue_id]
+ if not queue_items or cur_index is None:
+ # queue is empty
+ return None
+ if cur_index is None:
+ cur_index = queue.current_index
+ # handle repeat single track
+ if queue.repeat_mode == RepeatMode.ONE and not is_skip:
+ return cur_index
+ # handle cur_index is last index of the queue
+ if cur_index >= (len(queue_items) - 1):
+ # if repeat all is enabled, we simply start again from the beginning
+ return 0 if RepeatMode.ALL else None
+ return cur_index + 1
+
+ def _get_next_item(self, queue_id: str, cur_index: int | None = None) -> QueueItem | None:
+ """Return next QueueItem for given queue."""
+ if (next_index := self._get_next_index(queue_id, cur_index)) is not None:
+ return self.get_item(queue_id, next_index)
+ return None
+
async def _fill_radio_tracks(self, queue_id: str) -> None:
"""Fill a Queue with (additional) Radio tracks."""
tracks = await self._get_radio_tracks(queue_id)
insert_at_index=len(self._queue_items[queue_id]) - 1,
)
+ def _check_enqueue_next(
+ self,
+ player: Player,
+ queue: PlayerQueue,
+ prev_state: dict[str, Any],
+ new_state: dict[str, Any],
+ ) -> None:
+ """Check if we need to enqueue the next item to the player itself."""
+ if not queue.active:
+ return
+ if prev_state.get("state") != PlayerState.PLAYING:
+ return
+ current_item = self.get_item(queue.queue_id, queue.current_index)
+ if not current_item:
+ return # guard, just in case something bad happened
+ if not current_item.duration:
+ return
+ if current_item.streamdetails and current_item.streamdetails.seconds_streamed:
+ duration = current_item.streamdetails.seconds_streamed
+ else:
+ duration = current_item.duration
+ seconds_remaining = duration - player.corrected_elapsed_time
+
+ if PlayerFeature.ENQUEUE_NEXT in player.supported_features:
+ # player supports enqueue next feature.
+ # we enqueue the next track 15 seconds before the current track ends
+ end_of_track_reached = seconds_remaining <= 15
+ else:
+ # player does not support enqueue next feature.
+ # we wait for the player to stop after it reaches the end of the track
+ prev_seconds_remaining = prev_state.get("seconds_remaining", seconds_remaining)
+ end_of_track_reached = prev_seconds_remaining <= 6 and queue.state == PlayerState.IDLE
+ new_state["seconds_remaining"] = seconds_remaining
+
+ if not end_of_track_reached:
+ queue.next_track_enqueued = False # reset
+ return
+ if queue.next_track_enqueued:
+ return # already enqueued
+
+ async def _enqueue_next(index: int):
+ player_prov = self.mass.players.get_player_provider(player.player_id)
+ with suppress(QueueEmpty):
+ next_item = await self.preload_next_item(queue.queue_id, index)
+ await player_prov.enqueue_next_queue_item(
+ player_id=player.player_id, queue_item=next_item
+ )
+
+ self.mass.create_task(_enqueue_next(queue.current_index))
+
async def _get_radio_tracks(self, queue_id: str) -> list[MediaItemType]:
"""Call the registered music providers for dynamic tracks."""
queue = self._queues[queue_id]
break
return tracks
- def __get_queue_stream_index(
- self, queue: PlayerQueue, player: Player, start_index: int
- ) -> tuple[int, int]:
+ def __get_queue_stream_index(self, queue: PlayerQueue, player: Player) -> tuple[int, int]:
"""Calculate current queue index and current track elapsed time."""
# player is playing a constant stream so we need to do this the hard way
queue_index = 0
total_time = 0
track_time = 0
queue_items = self._queue_items[queue.queue_id]
- if queue_items and len(queue_items) > start_index:
+ if queue_items and len(queue_items) > queue.flow_mode_start_index:
# start_index: holds the position from which the flow stream started
- queue_index = start_index
+ queue_index = queue.flow_mode_start_index
queue_track = None
while len(queue_items) > queue_index:
# keep enumerating the queue tracks to find current track
if not queue_track.streamdetails:
track_time = elapsed_time_queue - total_time
break
- if queue_track.streamdetails.seconds_streamed is not None:
- track_duration = queue_track.streamdetails.seconds_streamed
- else:
- track_duration = queue_track.duration or FALLBACK_DURATION
+ track_duration = (
+ queue_track.streamdetails.seconds_streamed
+ or queue_track.streamdetails.duration
+ or queue_track.duration
+ or FALLBACK_DURATION
+ )
if elapsed_time_queue > (track_duration + total_time):
# total elapsed time is more than (streamed) track duration
# move index one up
break
return queue_index, track_time
- def _get_player_item_index(self, queue_id: str, url: str) -> str | None:
- """Parse (start) QueueItem ID from Player's current url."""
- if url and self.mass.streams.base_url in url and queue_id in url:
- # try to extract the item id from the uri
- current_item_id = url.rsplit("/")[-1].split(".")[0]
- return self.index_by_id(queue_id, current_item_id)
+ def _parse_player_current_item_id(self, queue_id: str, current_item_id: str) -> str | None:
+ """Parse QueueItem ID from Player's current url."""
+ if not current_item_id:
+ return None
+ if queue_id in current_item_id:
+ # try to extract the item id from either a url or queue_id/item_id combi
+ current_item_id = current_item_id.rsplit("/")[-1].split(".")[0]
+ if self.get_item(queue_id, current_item_id):
+ return current_item_id
return None
- player_id: player_id of the player to handle the command.
"""
player_id = self._check_redirect(player_id)
+ player = self.get(player_id, True)
+ if PlayerFeature.PAUSE not in player.supported_features:
+ # if player does not support pause, we need to send stop
+ await self.cmd_stop(player_id)
+ return
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_pause(player_id)
if group_players := self._get_player_groups(player.player_id):
# prefer the first playing (or paused) group parent
for group_player in group_players:
- # if the group player's playerid is within the curtrent url,
+ # if the group player's playerid is within the current_item_id
# this group is definitely active
- if player.current_url and group_player.player_id in player.current_url:
+ if player.current_item_id and group_player.player_id in player.current_item_id:
return group_player.player_id
# fallback to the first powered group player
for group_player in group_players:
if group_player.powered:
return group_player.player_id
- # guess source from player's current url
- if player.current_url and player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
- if self.mass.streams.base_url in player.current_url:
- return player.player_id
- if ":" in player.current_url:
- # extract source from uri/url
- return player.current_url.split(":")[0]
- return player.current_url
- # defaults to the player's own player id
- return player.player_id
+ # defaults to the player's own player id if not active source set
+ return player.active_source or player.player_id
def _get_group_volume_level(self, player: Player) -> int:
"""Calculate a group volume from the grouped members."""
from music_assistant.constants import (
CONF_BIND_IP,
CONF_BIND_PORT,
+ CONF_CROSSFADE,
CONF_CROSSFADE_DURATION,
CONF_EQ_BASS,
CONF_EQ_MID,
CONF_EQ_TREBLE,
CONF_OUTPUT_CHANNELS,
- CONF_OUTPUT_CODEC,
CONF_PUBLISH_IP,
)
from music_assistant.server.helpers.audio import (
check_audio_support,
crossfade_pcm_parts,
get_media_stream,
- get_stream_details,
+ set_stream_details,
)
from music_assistant.server.helpers.process import AsyncProcess
from music_assistant.server.helpers.util import get_ips
with suppress(asyncio.QueueFull):
sub_queue.put_nowait(b"")
- async def resolve_stream_url(
- self,
- child_player_id: str,
- ) -> str:
+ def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str:
"""Resolve the childplayer specific stream URL to this streamjob."""
- output_codec = ContentType(
- await self.stream_controller.mass.config.get_player_config_value(
- child_player_id, CONF_OUTPUT_CODEC
- )
- )
fmt = output_codec.value
# handle raw pcm
if output_codec.is_pcm():
player_max_bit_depth = 24 if player.supports_24bit else 16
output_sample_rate = min(self.pcm_format.sample_rate, player.max_sample_rate)
output_bit_depth = min(self.pcm_format.bit_depth, player_max_bit_depth)
- output_channels = await self.stream_controller.mass.config.get_player_config_value(
- child_player_id, CONF_OUTPUT_CHANNELS
+ output_channels = self.stream_controller.mass.config.get_raw_player_config_value(
+ child_player_id, CONF_OUTPUT_CHANNELS, "stereo"
)
channels = 1 if output_channels != "stereo" else 2
fmt += (
async def resolve_stream_url(
self,
- queue_id: str,
queue_item: QueueItem,
+ output_codec: ContentType,
seek_position: int = 0,
fade_in: bool = False,
flow_mode: bool = False,
) -> str:
- """Resolve the (regular, single player) stream URL for the given QueueItem.
-
- This is called just-in-time by the Queue controller to get the URL to the audio.
- """
- output_codec = ContentType(
- await self.mass.config.get_player_config_value(queue_id, CONF_OUTPUT_CODEC)
- )
+ """Resolve the stream URL for the given QueueItem."""
fmt = output_codec.value
# handle raw pcm
if output_codec.is_pcm():
- player = self.mass.players.get(queue_id)
+ player = self.mass.players.get(queue_item.queue_id)
player_max_bit_depth = 24 if player.supports_24bit else 16
if flow_mode:
output_sample_rate = min(FLOW_MAX_SAMPLE_RATE, player.max_sample_rate)
output_bit_depth = min(FLOW_MAX_BIT_DEPTH, player_max_bit_depth)
else:
- streamdetails = await get_stream_details(self.mass, queue_item)
+ await set_stream_details(self.mass, queue_item)
output_sample_rate = min(
- streamdetails.audio_format.sample_rate, player.max_sample_rate
+ queue_item.streamdetails.audio_format.sample_rate, player.max_sample_rate
)
- output_bit_depth = min(streamdetails.audio_format.bit_depth, player_max_bit_depth)
- output_channels = await self.mass.config.get_player_config_value(
- queue_id, CONF_OUTPUT_CHANNELS
+ output_bit_depth = min(
+ queue_item.streamdetails.audio_format.bit_depth, player_max_bit_depth
+ )
+ output_channels = self.mass.config.get_raw_player_config_value(
+ queue_item.queue_id, CONF_OUTPUT_CHANNELS, "stereo"
)
channels = 1 if output_channels != "stereo" else 2
fmt += (
)
query_params = {}
base_path = "flow" if flow_mode else "single"
- url = f"{self._server.base_url}/{queue_id}/{base_path}/{queue_item.queue_item_id}.{fmt}"
+ url = f"{self._server.base_url}/{queue_item.queue_id}/{base_path}/{queue_item.queue_item_id}.{fmt}" # noqa: E501
if seek_position:
query_params["seek_position"] = str(seek_position)
if fade_in:
if not queue_item:
raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
try:
- streamdetails = await get_stream_details(self.mass, queue_item=queue_item)
+ await set_stream_details(self.mass, queue_item=queue_item)
except MediaNotFoundError:
raise web.HTTPNotFound(
reason=f"Unable to retrieve streamdetails for item: {queue_item}"
output_format = await self._get_output_format(
output_format_str=request.match_info["fmt"],
queue_player=queue_player,
- default_sample_rate=streamdetails.audio_format.sample_rate,
- default_bit_depth=streamdetails.audio_format.bit_depth,
+ default_sample_rate=queue_item.streamdetails.audio_format.sample_rate,
+ default_bit_depth=queue_item.streamdetails.audio_format.bit_depth,
)
# prepare request, add some DLNA/UPNP compatible headers
# collect player specific ffmpeg args to re-encode the source PCM stream
pcm_format = AudioFormat(
- content_type=ContentType.from_bit_depth(streamdetails.audio_format.bit_depth),
- sample_rate=streamdetails.audio_format.sample_rate,
- bit_depth=streamdetails.audio_format.bit_depth,
+ content_type=ContentType.from_bit_depth(
+ queue_item.streamdetails.audio_format.bit_depth
+ ),
+ sample_rate=queue_item.streamdetails.audio_format.sample_rate,
+ bit_depth=queue_item.streamdetails.audio_format.bit_depth,
)
ffmpeg_args = await self._get_player_ffmpeg_args(
queue_player,
try:
async for chunk in get_media_stream(
self.mass,
- streamdetails=streamdetails,
+ streamdetails=queue_item.streamdetails,
pcm_format=pcm_format,
seek_position=seek_position,
fade_in=fade_in,
queue_track = None
last_fadeout_part = b""
total_bytes_written = 0
- self.logger.info("Start Queue Flow stream for Queue %s", queue.display_name)
+ queue.flow_mode = True
+ use_crossfade = self.mass.config.get_raw_player_config_value(
+ queue.queue_id, CONF_CROSSFADE, False
+ )
+ pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2)
+ self.logger.info(
+ "Start Queue Flow stream for Queue %s - crossfade: %s",
+ queue.display_name,
+ use_crossfade,
+ )
while True:
# get (next) queue item to stream
if queue_track is None:
queue_track = start_queue_item
- use_crossfade = queue.crossfade_enabled
+ await set_stream_details(self.mass, queue_track)
else:
seek_position = 0
fade_in = False
try:
- (
- _,
- queue_track,
- use_crossfade,
- ) = await self.mass.player_queues.preload_next_url(queue.queue_id)
+ queue_track = await self.mass.player_queues.preload_next_item(queue.queue_id)
except QueueEmpty:
break
- # get streamdetails
- try:
- streamdetails = await get_stream_details(self.mass, queue_track)
- except MediaNotFoundError as err:
- # streamdetails retrieval failed, skip to next track instead of bailing out...
- self.logger.warning(
- "Skip track %s due to missing streamdetails",
- queue_track.name,
- exc_info=err,
- )
- continue
-
self.logger.debug(
- "Start Streaming queue track: %s (%s) for queue %s - crossfade: %s",
- streamdetails.uri,
+ "Start Streaming queue track: %s (%s) for queue %s",
+ queue_track.streamdetails.uri,
queue_track.name,
queue.display_name,
- use_crossfade,
)
# set some basic vars
crossfade_size = int(pcm_sample_size * crossfade_duration)
queue_track.streamdetails.seconds_skipped = seek_position
buffer_size = crossfade_size if use_crossfade else int(pcm_sample_size * 2)
-
- buffer = b""
bytes_written = 0
+ buffer = b""
chunk_num = 0
# handle incoming audio chunks
async for chunk in get_media_stream(
self.mass,
- streamdetails,
+ queue_track.streamdetails,
pcm_format=pcm_format,
seek_position=seek_position,
fade_in=fade_in,
- # only allow strip silence from begin if track is being crossfaded
- strip_silence_begin=last_fadeout_part != b"",
+ # strip silence from begin/end if track is being crossfaded
+ strip_silence_begin=use_crossfade,
+ strip_silence_end=use_crossfade,
):
chunk_num += 1
+ # throttle buffer, do not allow more than 30 seconds in buffer
+ seconds_buffered = total_bytes_written / pcm_sample_size
+ player = self.mass.players.get(queue.queue_id)
+ while (seconds_buffered - player.corrected_elapsed_time) > 30:
+ await asyncio.sleep(1)
+
#### HANDLE FIRST PART OF TRACK
# buffer full for crossfade
#### HANDLE END OF TRACK
- if bytes_written == 0:
- # stream error: got empty first chunk ?!
- self.logger.warning("Stream error on %s", streamdetails.uri)
- queue_track.streamdetails.seconds_streamed = 0
- continue
-
if buffer and use_crossfade:
# if crossfade is enabled, save fadeout part to pickup for next track
last_fadeout_part = buffer[-crossfade_size:]
yield buffer
bytes_written += len(buffer)
- # end of the track reached - store accurate duration
+ # update duration details based on the actual pcm data we sent
+ # this also accounts for crossfade and silence stripping
queue_track.streamdetails.seconds_streamed = bytes_written / pcm_sample_size
- total_bytes_written += bytes_written
+ queue_track.streamdetails.duration = (
+ seek_position + queue_track.streamdetails.seconds_streamed
+ )
self.logger.debug(
- "Finished Streaming queue track: %s (%s) on queue %s",
+ "Finished Streaming queue track: %s (%s) on queue %s - seconds streamed: %s",
queue_track.streamdetails.uri,
queue_track.name,
queue.display_name,
+ queue_track.streamdetails.seconds_streamed,
)
self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
output_format: AudioFormat,
) -> list[str]:
"""Get player specific arguments for the given (pcm) input and output details."""
- player_conf = await self.mass.config.get_player_config(player.player_id)
# generic args
generic_args = [
"ffmpeg",
# the below is a very basic 3-band equalizer,
# this could be a lot more sophisticated at some point
- if eq_bass := player_conf.get_value(CONF_EQ_BASS):
+ if (
+ eq_bass := self.mass.config.get_raw_player_config_value(
+ player.player_id, CONF_EQ_BASS, 0
+ )
+ ) != 0:
filter_params.append(f"equalizer=frequency=100:width=200:width_type=h:gain={eq_bass}")
- if eq_mid := player_conf.get_value(CONF_EQ_MID):
+ if (
+ eq_mid := self.mass.config.get_raw_player_config_value(player.player_id, CONF_EQ_MID, 0)
+ ) != 0:
filter_params.append(f"equalizer=frequency=900:width=1800:width_type=h:gain={eq_mid}")
- if eq_treble := player_conf.get_value(CONF_EQ_TREBLE):
+ if (
+ eq_treble := self.mass.config.get_raw_player_config_value(
+ player.player_id, CONF_EQ_TREBLE, 0
+ )
+ ) != 0:
filter_params.append(
f"equalizer=frequency=9000:width=18000:width_type=h:gain={eq_treble}"
)
# handle output mixing only left or right
- conf_channels = player_conf.get_value(CONF_OUTPUT_CHANNELS)
+ conf_channels = self.mass.config.get_raw_player_config_value(
+ player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
+ )
if conf_channels == "left":
filter_params.append("pan=mono|c0=FL")
elif conf_channels == "right":
output_sample_rate = min(default_sample_rate, queue_player.max_sample_rate)
player_max_bit_depth = 24 if queue_player.supports_24bit else 16
output_bit_depth = min(default_bit_depth, player_max_bit_depth)
- output_channels_str = await self.mass.config.get_player_config_value(
- queue_player.player_id, CONF_OUTPUT_CHANNELS
+ output_channels_str = self.mass.config.get_raw_player_config_value(
+ queue_player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
)
output_channels = 1 if output_channels_str != "stereo" else 2
return AudioFormat(
)
-async def get_stream_details(mass: MusicAssistant, queue_item: QueueItem) -> StreamDetails:
- """Get streamdetails for the given QueueItem.
+async def set_stream_details(mass: MusicAssistant, queue_item: QueueItem) -> None:
+ """Set streamdetails for the given QueueItem.
This is called just-in-time when a PlayerQueue wants a MediaItem to be played.
Do not try to request streamdetails in advance as this is expiring data.
and streamdetails.data.startswith("http")
):
streamdetails.direct = streamdetails.data
- # set streamdetails as attribute on the media_item
- # this way the app knows what content is playing
+ # set streamdetails as attribute on the queue_item
queue_item.streamdetails = streamdetails
- return streamdetails
async def get_gain_correct(
# update duration details based on the actual pcm data we sent
streamdetails.seconds_streamed = bytes_sent / pcm_sample_size
+ streamdetails.duration = seek_position + streamdetails.seconds_streamed
except (asyncio.CancelledError, GeneratorExit) as err:
LOGGER.debug("media stream aborted for: %s", streamdetails.uri)
import urllib.error
import urllib.parse
import urllib.request
+from collections.abc import Iterator
from functools import lru_cache
from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as pkg_version
if platform.system() == "Linux":
return memory_tempfile.MemoryTempfile(fallback=True).NamedTemporaryFile(buffering=0)
return tempfile.NamedTemporaryFile(buffering=0)
+
+
+def divide_chunks(data: bytes, chunk_size: int) -> Iterator[bytes]:
+ """Chunk bytes data into smaller chunks."""
+ for i in range(0, len(data), chunk_size):
+ yield data[i : i + chunk_size]
from abc import abstractmethod
from typing import TYPE_CHECKING
+from music_assistant.common.models.config_entries import (
+ CONF_ENTRY_AUTO_PLAY,
+ CONF_ENTRY_VOLUME_NORMALIZATION,
+ CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
+)
from music_assistant.common.models.player import Player
-from music_assistant.common.models.queue_item import QueueItem
from .provider import Provider
if TYPE_CHECKING:
from music_assistant.common.models.config_entries import ConfigEntry, PlayerConfig
- from music_assistant.server.controllers.streams import MultiClientStreamJob
+ from music_assistant.common.models.queue_item import QueueItem
# ruff: noqa: ARG001, ARG002
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
- return tuple()
+ return (
+ CONF_ENTRY_VOLUME_NORMALIZATION,
+ CONF_ENTRY_AUTO_PLAY,
+ CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
+ )
def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
"""Call (by config manager) when the configuration of a player changes."""
- player_id: player_id of the player to handle the command.
"""
- @abstractmethod
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player.
- player_id: player_id of the player to handle the command.
"""
+ # will only be called for players with Pause feature set.
- @abstractmethod
- async def cmd_play_url(
+ async def play_media(
self,
player_id: str,
- url: str,
- queue_item: QueueItem | None,
+ queue_item: QueueItem,
+ seek_position: int,
+ fade_in: bool,
) -> None:
- """Send PLAY URL command to given player.
+ """Handle PLAY MEDIA on given player.
- This is called when the Queue wants the player to start playing a specific url.
- If an item from the Queue is being played, the QueueItem will be provided with
- all metadata present.
+ This is called by the Queue controller to start playing a queue item on the given player.
+ The provider's own implementation should work out how to handle this request.
- player_id: player_id of the player to handle the command.
- - url: the url that the player should start playing.
- - queue_item: the QueueItem that is related to the URL (None when playing direct url).
+ - queue_item: The QueueItem that needs to be played on the player.
+ - seek_position: Optional seek to this position.
+ - fade_in: Optionally fade in the item at playback start.
"""
- async def cmd_handle_stream_job(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
- """Handle StreamJob play command on given player.
+ async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
+ """
+ Handle enqueuing of the next queue item on the player.
- This is called when the Queue wants the player to start playing media
- to multiple subscribers at the same time using a MultiClientStreamJob.
- The default implementation is that the URL to the stream is resolved for the player
- and played like any regular play_url command, but implementation may override
- this behavior for any more sophisticated handling (e.g. when syncing etc.)
+ If the player supports PlayerFeature.ENQUE_NEXT:
+ This will be called about 10 seconds before the end of the track.
+ If the player does NOT report support for PlayerFeature.ENQUE_NEXT:
+ This will be called when the end of the track is reached.
- - player_id: player_id of the player to handle the command.
- - stream_job: the MultiClientStreamJob that the player should start playing.
+ A PlayerProvider implementation is in itself responsible for handling this
+ so that the queue items keep playing until its empty or the player stopped.
+
+ This will NOT be called if the end of the queue is reached (and repeat disabled).
+ This will NOT be called if the player is using flow mode to playback the queue.
"""
- url = await stream_job.resolve_stream_url(player_id)
- await self.cmd_play_url(player_id=player_id, url=url, queue_item=None)
+ # default implementation (for a player without enqueue_next feature):
+ # simply start playback of the next track.
+ # player providers need to override this behavior if/when needed
+ await self.play_media(
+ player_id=player_id, queue_item=queue_item, seek_position=0, fade_in=False
+ )
async def cmd_power(self, player_id: str, powered: bool) -> None:
"""Send POWER command to given player.
import aiofiles
-from music_assistant.common.models.config_entries import (
- CONF_ENTRY_OUTPUT_CODEC,
- ConfigEntry,
- ConfigValueType,
-)
+from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import ConfigEntryType
from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.common.models.queue_item import QueueItem
from music_assistant.constants import CONF_LOG_LEVEL, CONF_PLAYERS
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
+ from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
- from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
from music_assistant.server.providers.slimproto import SlimprotoProvider
"this number of seconds.",
advanced=True,
),
- ConfigEntry.from_dict(
- {**CONF_ENTRY_OUTPUT_CODEC.to_dict(), "default_value": "flac", "hidden": True}
- ),
)
NEED_BRIDGE_RESTART = {"values/read_ahead", "values/encryption", "values/alac_encode", "enabled"}
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
slimproto_prov = self.mass.get_provider("slimproto")
base_entries = await slimproto_prov.get_player_config_entries(player_id)
- return tuple(base_entries + PLAYER_CONFIG_ENTRIES)
+ return base_entries + PLAYER_CONFIG_ENTRIES
def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
"""Call (by config manager) when the configuration of a player changes."""
slimproto_prov = self.mass.get_provider("slimproto")
await slimproto_prov.cmd_play(player_id)
- async def cmd_play_url(
+ async def play_media(
self,
player_id: str,
- url: str,
- queue_item: QueueItem | None,
+ queue_item: QueueItem,
+ seek_position: int,
+ fade_in: bool,
) -> None:
- """Send PLAY URL command to given player.
+ """Handle PLAY MEDIA on given player.
- This is called when the Queue wants the player to start playing a specific url.
- If an item from the Queue is being played, the QueueItem will be provided with
- all metadata present.
+ This is called by the Queue controller to start playing a queue item on the given player.
+ The provider's own implementation should work out how to handle this request.
- player_id: player_id of the player to handle the command.
- - url: the url that the player should start playing.
- - queue_item: the QueueItem that is related to the URL (None when playing direct url).
+ - queue_item: The QueueItem that needs to be played on the player.
+ - seek_position: Optional seek to this position.
+ - fade_in: Optionally fade in the item at playback start.
"""
# simply forward to underlying slimproto player
slimproto_prov = self.mass.get_provider("slimproto")
- await slimproto_prov.cmd_play_url(
+ await slimproto_prov.play_media(
player_id,
- url=url,
queue_item=queue_item,
+ seek_position=seek_position,
+ fade_in=fade_in,
)
- async def cmd_handle_stream_job(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
- """Handle StreamJob play command on given player."""
+ async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
+ """Handle enqueuing of the next queue item on the player."""
# simply forward to underlying slimproto player
slimproto_prov = self.mass.get_provider("slimproto")
- await slimproto_prov.cmd_handle_stream_job(player_id=player_id, stream_job=stream_job)
+ await slimproto_prov.enqueue_next_queue_item(player_id, queue_item)
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player."""
from pychromecast.socket_client import CONNECTION_STATUS_CONNECTED, CONNECTION_STATUS_DISCONNECTED
from music_assistant.common.models.config_entries import (
+ CONF_ENTRY_CROSSFADE_DURATION,
CONF_ENTRY_HIDE_GROUP_MEMBERS,
ConfigEntry,
ConfigValueType,
)
from music_assistant.common.models.enums import (
ConfigEntryType,
+ ContentType,
MediaType,
PlayerFeature,
PlayerState,
PlayerType,
)
-from music_assistant.common.models.errors import PlayerUnavailableError, QueueEmpty
+from music_assistant.common.models.errors import PlayerUnavailableError
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_LOG_LEVEL, CONF_PLAYERS, MASS_LOGO_ONLINE
+from music_assistant.constants import CONF_CROSSFADE, CONF_LOG_LEVEL, CONF_PLAYERS, MASS_LOGO_ONLINE
from music_assistant.server.models.player_provider import PlayerProvider
from .helpers import CastStatusListener, ChromecastInfo
CONF_ALT_APP = "alt_app"
-BASE_PLAYER_CONFIG_ENTRIES = (
+PLAYER_CONFIG_ENTRIES = (
+ ConfigEntry(
+ key=CONF_CROSSFADE,
+ type=ConfigEntryType.BOOLEAN,
+ label="Enable crossfade",
+ default_value=False,
+ description="Enable a crossfade transition between (queue) tracks. \n"
+ "Note that Chromecast does not natively support crossfading so Music Assistant "
+ "uses a 'flow mode' workaround for this at the cost of on-player metadata.",
+ advanced=False,
+ ),
ConfigEntry(
key=CONF_ALT_APP,
type=ConfigEntryType.BOOLEAN,
"the playback experience but may not work on non-Google hardware.",
advanced=True,
),
+ CONF_ENTRY_CROSSFADE_DURATION,
)
logger: Logger
status_listener: CastStatusListener | None = None
mz_controller: MultizoneController | None = None
- next_url: str | None = None
active_group: str | None = None
current_queue_item_id: str | None = None
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
cast_player = self.castplayers.get(player_id)
- entries = BASE_PLAYER_CONFIG_ENTRIES
+ base_entries = await super().get_player_config_entries(player_id)
+ entries = base_entries + PLAYER_CONFIG_ENTRIES
if (
cast_player
and cast_player.cast_info.is_audio_group
castplayer = self.castplayers[player_id]
await asyncio.to_thread(castplayer.cc.media_controller.play)
- async def cmd_play_url(
+ async def cmd_pause(self, player_id: str) -> None:
+ """Send PAUSE command to given player."""
+ castplayer = self.castplayers[player_id]
+ await asyncio.to_thread(castplayer.cc.media_controller.pause)
+
+ async def cmd_power(self, player_id: str, powered: bool) -> None:
+ """Send POWER command to given player."""
+ castplayer = self.castplayers[player_id]
+ # set mute_as_power feature for group members
+ if castplayer.player.type == PlayerType.GROUP:
+ for child_player_id in castplayer.player.group_childs:
+ if child_player := self.mass.players.get(child_player_id):
+ child_player.mute_as_power = powered
+ if powered:
+ await self._launch_app(castplayer)
+ else:
+ await asyncio.to_thread(castplayer.cc.quit_app)
+
+ async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
+ """Send VOLUME_SET command to given player."""
+ castplayer = self.castplayers[player_id]
+ await asyncio.to_thread(castplayer.cc.set_volume, volume_level / 100)
+
+ async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
+ """Send VOLUME MUTE command to given player."""
+ castplayer = self.castplayers[player_id]
+ await asyncio.to_thread(castplayer.cc.set_volume_muted, muted)
+
+ async def play_media(
self,
player_id: str,
- url: str,
- queue_item: QueueItem | None,
+ queue_item: QueueItem,
+ seek_position: int,
+ fade_in: bool,
) -> None:
- """Send PLAY URL command to given player.
+ """Handle PLAY MEDIA on given player.
- This is called when the Queue wants the player to start playing a specific url.
- If an item from the Queue is being played, the QueueItem will be provided with
- all metadata present.
+ This is called by the Queue controller to start playing a queue item on the given player.
+ The provider's own implementation should work out how to handle this request.
- player_id: player_id of the player to handle the command.
- - url: the url that the player should start playing.
- - queue_item: the QueueItem that is related to the URL (None when playing direct url).
+ - queue_item: The QueueItem that needs to be played on the player.
+ - seek_position: Optional seek to this position.
+ - fade_in: Optionally fade in the item at playback start.
"""
castplayer = self.castplayers[player_id]
-
- # in flow/direct url mode, we just send the url and the metadata is of no use
- if not queue_item:
+ # Google cast does not support crossfading so we use flow mode to provide this feature
+ use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
+ url = await self.mass.streams.resolve_stream_url(
+ queue_item=queue_item,
+ output_codec=ContentType.FLAC,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ flow_mode=use_flow_mode,
+ )
+ if use_flow_mode:
+ # In flow mode, all queue tracks are sent to the player as continuous stream.
+ # This comes at the cost of metadata (cast does not support ICY metadata).
await asyncio.to_thread(
castplayer.cc.play_media,
url,
thumb=MASS_LOGO_ONLINE,
)
return
-
- cc_queue_items = [self._create_queue_item(queue_item, url)]
+ # handle normal playback using the chromecast queue to play items one by one
+ cc_queue_items = [self._create_cc_queue_item(queue_item, url)]
queuedata = {
"type": "QUEUE_LOAD",
"repeatMode": "REPEAT_OFF", # handled by our queue controller
"startIndex": 0, # Item index to play after this request or keep same item if undefined
"items": cc_queue_items,
}
- # make sure that media controller app is launched
+ # make sure that the media controller app is launched
await self._launch_app(castplayer)
# send queue info to the CC
- castplayer.next_url = None
media_controller = castplayer.cc.media_controller
await asyncio.to_thread(media_controller.send_message, queuedata, True)
- async def cmd_pause(self, player_id: str) -> None:
- """Send PAUSE command to given player."""
+ async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
+ """Handle enqueuing of the next queue item on the player."""
castplayer = self.castplayers[player_id]
- await asyncio.to_thread(castplayer.cc.media_controller.pause)
-
- async def cmd_power(self, player_id: str, powered: bool) -> None:
- """Send POWER command to given player."""
- castplayer = self.castplayers[player_id]
- # set mute_as_power feature for group members
- if castplayer.player.type == PlayerType.GROUP:
- for child_player_id in castplayer.player.group_childs:
- if child_player := self.mass.players.get(child_player_id):
- child_player.mute_as_power = powered
- if powered:
- await self._launch_app(castplayer)
- else:
- await asyncio.to_thread(castplayer.cc.quit_app)
-
- async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
- """Send VOLUME_SET command to given player."""
- castplayer = self.castplayers[player_id]
- await asyncio.to_thread(castplayer.cc.set_volume, volume_level / 100)
-
- async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
- """Send VOLUME MUTE command to given player."""
- castplayer = self.castplayers[player_id]
- await asyncio.to_thread(castplayer.cc.set_volume_muted, muted)
+ url = await self.mass.streams.resolve_stream_url(
+ queue_item=queue_item,
+ output_codec=ContentType.FLAC,
+ )
+ queuedata = {
+ "type": "QUEUE_INSERT",
+ "insertBefore": None,
+ "items": [self._create_cc_queue_item(queue_item, url)],
+ }
+ media_controller = castplayer.cc.media_controller
+ queuedata["mediaSessionId"] = media_controller.status.media_session_id
+ await asyncio.to_thread(media_controller.send_message, queuedata, True)
async def poll_player(self, player_id: str) -> None:
"""Poll player for state updates.
PlayerFeature.POWER,
PlayerFeature.VOLUME_MUTE,
PlayerFeature.VOLUME_SET,
+ PlayerFeature.ENQUEUE_NEXT,
+ PlayerFeature.PAUSE,
),
max_sample_rate=96000,
supports_24bit=True,
"""Handle updated MediaStatus."""
castplayer.logger.debug("Received media status update: %s", status.player_state)
# player state
- prev_state = castplayer.player.state
if status.player_is_playing:
castplayer.player.state = PlayerState.PLAYING
elif status.player_is_paused:
else:
castplayer.player.elapsed_time = status.current_time
+ # active source
+ if status.content_id and castplayer.player_id in status.content_id:
+ castplayer.player.active_source = castplayer.player_id
+ else:
+ castplayer.player.active_source = castplayer.cc.app_display_name
+
# current media
- castplayer.player.current_url = status.content_id
+ castplayer.player.current_item_id = status.content_id
self.mass.loop.call_soon_threadsafe(self.mass.players.update, castplayer.player_id)
- # enqueue next item if player is almost at the end of the track
- if ( # noqa: SIM114
- castplayer.player.state == PlayerState.PLAYING
- and castplayer.player.active_source == castplayer.player.player_id
- and (queue := self.mass.player_queues.get(castplayer.player_id))
- and (current_item := queue.current_item)
- and current_item.duration
- and (current_item.duration - castplayer.player.elapsed_time) <= 10
- ):
- asyncio.run_coroutine_threadsafe(self._enqueue_next_track(castplayer), self.mass.loop)
- # failsafe enqueue next item if player stopped at the end of the track
- elif (
- castplayer.player.state == PlayerState.IDLE
- and prev_state == PlayerState.PLAYING
- and castplayer.player.active_source == castplayer.player.player_id
- and castplayer.player.current_url == castplayer.next_url
- ):
- asyncio.run_coroutine_threadsafe(self._enqueue_next_track(castplayer), self.mass.loop)
- # handle end of MA queue - set current item to None
- elif (
+ # handle end of MA queue - reset current_item_id
+ if (
castplayer.player.state == PlayerState.IDLE
- and castplayer.player.current_url
+ and castplayer.player.current_item_id
and (queue := self.mass.player_queues.get(castplayer.player_id))
and queue.next_item is None
):
- castplayer.player.current_url = None
+ castplayer.player.current_item_id = None
def on_new_connection_status(self, castplayer: CastPlayer, status: ConnectionStatus) -> None:
"""Handle updated ConnectionStatus."""
### Helpers / utils
- async def _enqueue_next_track(self, castplayer: CastPlayer) -> None:
- """Enqueue the next track of the MA queue on the CC queue."""
- try:
- next_url, next_item, _ = await self.mass.player_queues.preload_next_url(
- castplayer.player_id, castplayer.current_queue_item_id
- )
- except QueueEmpty:
- return
-
- if castplayer.next_url == next_url:
- return # already set ?!
- castplayer.next_url = next_url
- castplayer.current_queue_item_id = next_item.queue_item_id
-
- # in flow/direct url mode, we just send the url and the metadata is of no use
- if not next_item:
- await asyncio.to_thread(
- castplayer.cc.play_media,
- next_url,
- content_type=f'audio/{next_url.split(".")[-1].split("?")[0]}',
- title="Music Assistant",
- thumb=MASS_LOGO_ONLINE,
- enqueue=True,
- media_info={
- "customData": {
- "queue_item_id": "flow",
- }
- },
- )
- return
- cc_queue_items = [self._create_queue_item(next_item, next_url)]
-
- queuedata = {
- "type": "QUEUE_INSERT",
- "insertBefore": None,
- "items": cc_queue_items,
- }
- media_controller = castplayer.cc.media_controller
- queuedata["mediaSessionId"] = media_controller.status.media_session_id
-
- await asyncio.sleep(0.5) # throttle commands to CC a bit or it will crash
- await asyncio.to_thread(media_controller.send_message, queuedata, True)
-
async def _launch_app(self, castplayer: CastPlayer) -> None:
"""Launch the default Media Receiver App on a Chromecast."""
event = asyncio.Event()
castplayer.status_listener = None
self.castplayers.pop(castplayer.player_id, None)
- def _create_queue_item(self, queue_item: QueueItem, stream_url: str):
+ def _create_cc_queue_item(self, queue_item: QueueItem, stream_url: str):
"""Create CC queue item from MA QueueItem."""
duration = int(queue_item.duration) if queue_item.duration else None
image_url = self.mass.metadata.get_image_url(queue_item.image) if queue_item.image else ""
from async_upnp_client.search import async_search
from async_upnp_client.utils import CaseInsensitiveDict
-from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
+from music_assistant.common.models.config_entries import (
+ CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_FLOW_MODE,
+ ConfigEntry,
+ ConfigValueType,
+)
from music_assistant.common.models.enums import (
ConfigEntryType,
+ ContentType,
PlayerFeature,
PlayerState,
PlayerType,
)
-from music_assistant.common.models.errors import PlayerUnavailableError, QueueEmpty
+from music_assistant.common.models.errors import PlayerUnavailableError
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_PLAYERS
+from music_assistant.constants import CONF_CROSSFADE, CONF_FLOW_MODE, CONF_PLAYERS
from music_assistant.server.helpers.didl_lite import create_didl_metadata
from music_assistant.server.models.player_provider import PlayerProvider
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
-PLAYER_FEATURES = (
- PlayerFeature.SET_MEMBERS,
- PlayerFeature.SYNC,
+BASE_PLAYER_FEATURES = (
PlayerFeature.VOLUME_MUTE,
PlayerFeature.VOLUME_SET,
)
+CONF_ENQUEUE_NEXT = "enqueue_next"
+CONF_ENFORCE_MP3 = "enforce_mp3"
+
+PLAYER_CONFIG_ENTRIES = (
+ ConfigEntry(
+ key=CONF_ENQUEUE_NEXT,
+ type=ConfigEntryType.BOOLEAN,
+ label="Player supports enqueue next/gapless",
+ default_value=False,
+ description="If the player supports enqueuing the next item for fluid/gapless playback. "
+ "\n\nUnfortunately this feature is missing or broken on many DLNA players. \n"
+ "Enable it with care. If music stops after one song, disable this setting.",
+ ),
+ ConfigEntry(
+ key=CONF_CROSSFADE,
+ type=ConfigEntryType.BOOLEAN,
+ label="Enable crossfade",
+ default_value=False,
+ description="Enable a crossfade transition between (queue) tracks. \n\n"
+ "Note that DLNA does not natively support crossfading so you need to enable "
+ "the 'flow mode' workaround to use crossfading with DLNA players.",
+ advanced=False,
+ depends_on=CONF_FLOW_MODE,
+ ),
+ CONF_ENTRY_FLOW_MODE,
+ CONF_ENTRY_CROSSFADE_DURATION,
+ ConfigEntry(
+ key=CONF_ENFORCE_MP3,
+ type=ConfigEntryType.BOOLEAN,
+ label="Enforce (lossy) mp3 stream",
+ default_value=False,
+ description="By default, Music Assistant sends lossless, high quality audio "
+ "to all players. Some players can not deal with that and require the stream to be packed "
+ "into a lossy mp3 codec. \n\n "
+ "Only enable when needed. Saves some bandwidth at the cost of audio quality.",
+ advanced=True,
+ ),
+)
+
CONF_NETWORK_SCAN = "network_scan"
_DLNAPlayerProviderT = TypeVar("_DLNAPlayerProviderT", bound="DLNAPlayerProvider")
# Track BOOTID in SSDP advertisements for device changes
bootid: int | None = None
last_seen: float = field(default_factory=time.time)
- next_url: str | None = None
- next_item: QueueItem | None = None
- supports_next_uri: bool | None = None
- end_of_track_reached: float | None = None
last_command: float = field(default_factory=time.time)
- need_elapsed_time_workaround: bool = False
def update_attributes(self):
"""Update attributes of the MA Player from DLNA state."""
self.player.volume_muted = self.device.is_volume_muted or False
self.player.state = self.get_state(self.device)
self.player.supported_features = self.get_supported_features(self.device)
- self.player.current_url = self.device.current_track_uri or ""
+ self.player.current_item_id = self.device.current_track_uri or ""
+ if self.player.player_id in self.player.current_item_id:
+ self.player.active_source = self.player.player_id
+ elif "spotify" in self.player.current_item_id:
+ self.player.active_source = "spotify"
+ elif self.player.current_item_id.startswith("http"):
+ self.player.active_source = "http"
+ else:
+ # TODO: handle other possible sources here
+ self.player.active_source = None
if self.device.media_position:
# only update elapsed_time if the device actually reports it
self.player.elapsed_time = float(self.device.media_position)
self.player.elapsed_time_last_updated = (
self.device.media_position_updated_at.timestamp()
)
- # some dlna players get stuck at the end of the track and won't
- # automatically play the next track, try to workaround that
- if (
- self.device.media_duration
- and self.player.corrected_elapsed_time
- and self.player.state == PlayerState.PLAYING
- and (self.device.media_duration - self.player.corrected_elapsed_time) <= 10
- ):
- self.end_of_track_reached = time.time()
else:
# device is unavailable
self.player.available = False
if device.has_volume_mute:
supported_features.add(PlayerFeature.VOLUME_MUTE)
- if device.can_seek_rel_time or device.can_seek_abs_time:
- supported_features.add(PlayerFeature.SEEK)
-
return supported_features
for dlna_player in self.dlnaplayers.values():
tg.create_task(self._device_disconnect(dlna_player))
+ async def get_player_config_entries(
+ self, player_id: str # noqa: ARG002
+ ) -> tuple[ConfigEntry, ...]:
+ """Return all (provider/player specific) Config Entries for the given player (if any)."""
+ base_entries = await super().get_player_config_entries(player_id)
+ return base_entries + PLAYER_CONFIG_ENTRIES
+
def on_player_config_changed(
self, config: PlayerConfig, changed_keys: set[str] # noqa: ARG002
) -> None:
"""Call (by config manager) when the configuration of a player changes."""
# run discovery to catch any re-enabled players
self.mass.create_task(self._run_discovery())
+ # reset player features based on config values
+ if not (dlna_player := self.dlnaplayers.get(config.player_id)):
+ return
+ self._set_player_features(dlna_player)
@catch_request_errors
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
dlna_player = self.dlnaplayers[player_id]
- dlna_player.end_of_track_reached = None
- dlna_player.next_url = None
assert dlna_player.device is not None
await dlna_player.device.async_stop()
await dlna_player.device.async_play()
@catch_request_errors
- async def cmd_play_url(
+ async def play_media(
self,
player_id: str,
- url: str,
- queue_item: QueueItem | None,
+ queue_item: QueueItem,
+ seek_position: int,
+ fade_in: bool,
) -> None:
- """Send PLAY URL command to given player.
+ """Handle PLAY MEDIA on given player.
- This is called when the Queue wants the player to start playing a specific url.
- If an item from the Queue is being played, the QueueItem will be provided with
- all metadata present.
+ This is called by the Queue controller to start playing a queue item on the given player.
+ The provider's own implementation should work out how to handle this request.
- player_id: player_id of the player to handle the command.
- - url: the url that the player should start playing.
- - queue_item: the QueueItem that is related to the URL (None when playing direct url).
+ - queue_item: The QueueItem that needs to be played on the player.
+ - seek_position: Optional seek to this position.
+ - fade_in: Optionally fade in the item at playback start.
"""
+ # DLNA players do not support crossfading so we enforce flow mode to provide this feature
+ use_flow_mode = await self.mass.config.get_player_config_value(
+ player_id, CONF_FLOW_MODE
+ ) or await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
+ enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
+ url = await self.mass.streams.resolve_stream_url(
+ queue_item=queue_item,
+ output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ flow_mode=use_flow_mode,
+ )
dlna_player = self.dlnaplayers[player_id]
-
# always clear queue (by sending stop) first
if dlna_player.device.can_stop:
await self.cmd_stop(player_id)
- dlna_player.next_url = None
- dlna_player.end_of_track_reached = None
-
- didl_metadata = create_didl_metadata(self.mass, url, queue_item)
+ didl_metadata = create_didl_metadata(
+ self.mass, url, queue_item if not use_flow_mode else None
+ )
title = queue_item.name if queue_item else "Music Assistant"
await dlna_player.device.async_set_transport_uri(url, title, didl_metadata)
# Play it
dlna_player.player.elapsed_time = 0
dlna_player.player.elapsed_time_last_updated = now
await dlna_player.device.async_play()
-
# force poll the device
for sleep in (1, 2):
await asyncio.sleep(sleep)
dlna_player.force_poll = True
await self.poll_player(dlna_player.udn)
+ @catch_request_errors
+ async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
+ """
+ Handle enqueuing of the next queue item on the player.
+
+ If the player supports PlayerFeature.ENQUE_NEXT:
+ This will be called about 10 seconds before the end of the track.
+ If the player does NOT report support for PlayerFeature.ENQUE_NEXT:
+ This will be called when the end of the track is reached.
+
+ A PlayerProvider implementation is in itself responsible for handling this
+ so that the queue items keep playing until its empty or the player stopped.
+
+ This will NOT be called if the end of the queue is reached (and repeat disabled).
+ This will NOT be called if flow mode is enabled on the queue.
+ """
+ dlna_player = self.dlnaplayers[player_id]
+ url = await self.mass.streams.resolve_stream_url(
+ queue_item=queue_item,
+ output_codec=ContentType.FLAC,
+ )
+ didl_metadata = create_didl_metadata(self.mass, url, queue_item)
+ title = queue_item.name
+ if self.mass.config.get_raw_player_config_value(player_id, CONF_ENQUEUE_NEXT, False):
+ # use the 'next_transport_uri' feature
+ await dlna_player.device.async_set_next_transport_uri(url, title, didl_metadata)
+ self.logger.debug(
+ "Enqued next track (%s) to player %s",
+ title,
+ dlna_player.player.display_name,
+ )
+ else:
+ # simply use regular play command
+ await dlna_player.device.async_set_transport_uri(url, title, didl_metadata)
+ # Play it
+ await dlna_player.device.async_wait_for_can_play(10)
+ # optimistically set this timestamp to help in case of a player
+ # that does not report the progress
+ now = time.time()
+ dlna_player.player.elapsed_time = 0
+ dlna_player.player.elapsed_time_last_updated = now
+ await dlna_player.device.async_play()
+
@catch_request_errors
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player."""
if ssdp_udn in discovered_devices:
# already processed this device
return
+ if "rincon" in ssdp_udn.lower():
+ # ignore Sonos devices
+ return
discovered_devices.add(ssdp_udn)
dlna_player.description_url = description_url
else:
# new player detected, setup our DLNAPlayer wrapper
-
- # ignore disabled players
conf_key = f"{CONF_PLAYERS}/{udn}/enabled"
enabled = self.mass.config.get(conf_key, True)
+ # ignore disabled players
if not enabled:
self.logger.debug("Ignoring disabled player: %s", udn)
return
name=udn,
available=False,
powered=False,
- supported_features=PLAYER_FEATURES,
# device info will be discovered later after connect
device_info=DeviceInfo(
model="unknown",
await self._device_connect(dlna_player)
+ self._set_player_features(dlna_player)
dlna_player.update_attributes()
self.mass.players.register_or_update(dlna_player.player)
dlna_player.last_seen = time.time()
self.mass.create_task(self._update_player(dlna_player))
- async def _enqueue_next_track(self, dlna_player: DLNAPlayer) -> None:
- """Enqueue the next track of the MA queue on the CC queue."""
- try:
- (
- next_url,
- next_item,
- _,
- ) = await self.mass.player_queues.preload_next_url(dlna_player.udn)
- except QueueEmpty:
- return
-
- if dlna_player.next_url == next_url:
- return # already set ?!
- dlna_player.next_url = next_url
- dlna_player.next_item = next_item
-
- # no need to try setting the next url if we already know the player does not support it
- if dlna_player.supports_next_uri is False:
- return
-
- # send queue item to dlna queue
- didl_metadata = create_didl_metadata(self.mass, next_url, next_item)
- title = next_item.name if next_item else "Music Assistant"
- try:
- await dlna_player.device.async_set_next_transport_uri(next_url, title, didl_metadata)
- except UpnpError:
- dlna_player.supports_next_uri = False
- self.logger.info(
- "Player does not support next transport uri feature, "
- "gapless playback is not possible."
- )
-
- self.logger.debug(
- "Enqued next track (%s) to player %s",
- title,
- dlna_player.player.display_name,
- )
-
async def _update_player(self, dlna_player: DLNAPlayer) -> None:
"""Update DLNA Player."""
- prev_url = dlna_player.player.current_url
+ prev_url = dlna_player.player.current_item_id
prev_state = dlna_player.player.state
dlna_player.update_attributes()
- current_url = dlna_player.player.current_url
+ current_url = dlna_player.player.current_item_id
current_state = dlna_player.player.state
if (prev_url != current_url) or (prev_state != current_state):
# let the MA player manager work out if something actually updated
self.mass.players.update(dlna_player.udn)
- # enqueue next item if needed
- if (
- dlna_player.player.state == PlayerState.PLAYING
- and dlna_player.player.active_source == dlna_player.player.player_id
- and dlna_player.next_url in (None, dlna_player.player.current_url)
- # prevent race conditions at start/stop by doing this check
- and (time.time() - dlna_player.last_command) > 4
- ):
- self.mass.create_task(self._enqueue_next_track(dlna_player))
- # if player does not support next uri, manual play it
- if (
- (
- dlna_player.supports_next_uri is False
- or (dlna_player.supports_next_uri is None and dlna_player.end_of_track_reached)
- )
- and prev_state == PlayerState.PLAYING
- and current_state == PlayerState.IDLE
- and dlna_player.next_url
- ):
- self.logger.warning(
- "Player does not support next_uri and end of track reached, "
- "sending next url manually."
+ def _set_player_features(self, dlna_player: DLNAPlayer) -> None:
+ """Set Player Features based on config values and capabilities."""
+ dlna_player.player.supported_features = BASE_PLAYER_FEATURES
+ player_id = dlna_player.player.player_id
+ if self.mass.config.get_raw_player_config_value(player_id, CONF_ENQUEUE_NEXT, False):
+ dlna_player.player.supported_features = dlna_player.player.supported_features + (
+ PlayerFeature.ENQUEUE_NEXT,
)
- await self.cmd_play_url(dlna_player.udn, dlna_player.next_url, dlna_player.next_item)
- dlna_player.end_of_track_reached = False
- dlna_player.next_url = None
- dlna_player.supports_next_uri = False
import statistics
import time
from collections import deque
-from collections.abc import Callable, Coroutine, Generator
+from collections.abc import Callable, Coroutine
from contextlib import suppress
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from aioslimproto.discovery import start_discovery
from music_assistant.common.models.config_entries import (
- CONF_ENTRY_OUTPUT_CODEC,
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_EQ_BASS,
+ CONF_ENTRY_EQ_MID,
+ CONF_ENTRY_EQ_TREBLE,
+ CONF_ENTRY_OUTPUT_CHANNELS,
ConfigEntry,
ConfigValueOption,
ConfigValueType,
from music_assistant.common.models.errors import QueueEmpty, SetupFailedError
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_CROSSFADE_DURATION, CONF_PORT
+from music_assistant.constants import CONF_CROSSFADE, CONF_CROSSFADE_DURATION, CONF_PORT
from music_assistant.server.models.player_provider import PlayerProvider
from .cli import LmsCli
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
- from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
+
+# monkey patch the SlimClient
+SlimClient._process_stat_stmf = lambda x, y: None # noqa: ARG005
+
CACHE_KEY_PREV_STATE = "slimproto_prev_state"
# sync constants
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
- # pick default codec based on capabilities
- default_codec = ContentType.PCM
- if client := self._socket_clients.get(player_id):
- for fmt, fmt_type in (
- ("flc", ContentType.FLAC),
- ("pcm", ContentType.PCM),
- ("mp3", ContentType.MP3),
- ):
- if fmt in client.supported_codecs:
- default_codec = fmt_type
- break
+ base_entries = await super().get_player_config_entries(player_id)
+ if not (client := self._socket_clients.get(player_id)):
+ return base_entries
# create preset entries (for players that support it)
preset_entries = tuple()
- if not (client and client.device_model in self._virtual_providers):
+ if client.device_model not in self._virtual_providers:
presets = []
async for playlist in self.mass.music.playlists.iter_library_items(True):
presets.append(ConfigValueOption(playlist.name, playlist.uri))
for index in range(1, preset_count + 1)
)
- return preset_entries + (
- ConfigEntry(
- key=CONF_SYNC_ADJUST,
- type=ConfigEntryType.INTEGER,
- range=(0, 1500),
- default_value=0,
- label="Audio synchronization delay correction",
- description="If this player is playing audio synced with other players "
- "and you always hear the audio too late on this player, "
- "you can shift the audio a bit.",
- advanced=True,
- ),
- CONF_ENTRY_CROSSFADE_DURATION,
- ConfigEntry.from_dict(
- {**CONF_ENTRY_OUTPUT_CODEC.to_dict(), "default_value": default_codec}
- ),
+ return (
+ base_entries
+ + preset_entries
+ + (
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_EQ_BASS,
+ CONF_ENTRY_EQ_MID,
+ CONF_ENTRY_EQ_TREBLE,
+ CONF_ENTRY_OUTPUT_CHANNELS,
+ CONF_ENTRY_CROSSFADE_DURATION,
+ ConfigEntry(
+ key=CONF_SYNC_ADJUST,
+ type=ConfigEntryType.INTEGER,
+ range=(0, 1500),
+ default_value=0,
+ label="Audio synchronization delay correction",
+ description="If this player is playing audio synced with other players "
+ "and you always hear the audio too late on this player, "
+ "you can shift the audio a bit.",
+ advanced=True,
+ ),
+ )
)
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
- # forward command to player and any connected sync child's
+ # forward command to player and any connected sync members
for client in self._get_sync_clients(player_id):
if client.state == SlimPlayerState.STOPPED:
continue
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY command to given player."""
- # forward command to player and any connected sync child's
+ # forward command to player and any connected sync members
async with asyncio.TaskGroup() as tg:
for client in self._get_sync_clients(player_id):
if client.state not in (
continue
tg.create_task(client.play())
- async def cmd_play_url(
+ async def play_media(
self,
player_id: str,
- url: str,
- queue_item: QueueItem | None,
+ queue_item: QueueItem,
+ seek_position: int,
+ fade_in: bool,
) -> None:
- """Send PLAY URL command to given player.
+ """Handle PLAY MEDIA on given player.
- This is called when the Queue wants the player to start playing a specific url.
- If an item from the Queue is being played, the QueueItem will be provided with
- all metadata present.
+ This is called by the Queue controller to start playing a queue item on the given player.
+ The provider's own implementation should work out how to handle this request.
- player_id: player_id of the player to handle the command.
- - url: the url that the player should start playing.
- - queue_item: the QueueItem that is related to the URL (None when playing direct url).
+ - queue_item: The QueueItem that needs to be played on the player.
+ - seek_position: Optional seek to this position.
+ - fade_in: Optionally fade in the item at playback start.
"""
- # send stop first
- await self.cmd_stop(player_id)
-
player = self.mass.players.get(player_id)
if player.synced_to:
raise RuntimeError("A synced player cannot receive play commands directly")
-
- # forward command to player and any connected sync child's
- sync_clients = [x for x in self._get_sync_clients(player_id)]
- async with asyncio.TaskGroup() as tg:
- for client in sync_clients:
- tg.create_task(
- self._handle_play_url(
- client,
- url=url,
- queue_item=queue_item,
- send_flush=True,
- auto_play=len(sync_clients) == 1,
- )
- )
-
- async def cmd_handle_stream_job(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
- """Handle StreamJob play command on given player."""
- # send stop first
+ # stop any existing streams first
await self.cmd_stop(player_id)
-
- player = self.mass.players.get(player_id)
- if player.synced_to:
- raise RuntimeError("A synced player cannot receive play commands directly")
- sync_clients = [x for x in self._get_sync_clients(player_id)]
- async with asyncio.TaskGroup() as tg:
- for client in sync_clients:
- url = await stream_job.resolve_stream_url(client.player_id)
- tg.create_task(
- self._handle_play_url(
- client,
- url=url,
- queue_item=None,
- send_flush=True,
- auto_play=len(sync_clients) == 1,
+ if player.group_childs:
+ # player has sync members, we need to start a multi client stream job
+ stream_job = await self.mass.streams.create_multi_client_stream_job(
+ queue_id=queue_item.queue_id,
+ start_queue_item=queue_item,
+ seek_position=int(seek_position),
+ fade_in=fade_in,
+ )
+ # forward command to player and any connected sync members
+ sync_clients = self._get_sync_clients(player_id)
+ async with asyncio.TaskGroup() as tg:
+ for client in sync_clients:
+ tg.create_task(
+ self._handle_play_url(
+ client,
+ url=stream_job.resolve_stream_url(
+ client.player_id, output_codec=ContentType.FLAC
+ ),
+ queue_item=None,
+ send_flush=True,
+ auto_play=False,
+ )
)
- )
+ else:
+ # regular, single player playback
+ client = self._socket_clients[player_id]
+ url = await self.mass.streams.resolve_stream_url(
+ queue_item=queue_item,
+ # for now just hardcode flac as we assume that every (modern)
+ # slimproto based player can handle that just fine
+ output_codec=ContentType.FLAC,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ flow_mode=False,
+ )
+ await self._handle_play_url(
+ client,
+ url=url,
+ queue_item=queue_item,
+ send_flush=True,
+ auto_play=True,
+ )
+
+ async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
+ """Handle enqueuing of the next queue item on the player."""
+ # we don't have to do anything,
+ # enqueuing the next item is handled in the buffer ready callback
async def _handle_play_url(
self,
url: str,
queue_item: QueueItem | None,
send_flush: bool = True,
- crossfade: bool = False,
auto_play: bool = False,
) -> None:
- """Handle PlayMedia on slimproto player(s)."""
+ """Handle playback of an url on slimproto player(s)."""
player_id = client.player_id
- if crossfade:
+ if crossfade := await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE):
transition_duration = await self.mass.config.get_player_config_value(
player_id, CONF_CROSSFADE_DURATION
)
mime_type=f"audio/{url.split('.')[-1].split('?')[0]}",
metadata={"item_id": queue_item.queue_item_id, "title": queue_item.name}
if queue_item
- else None,
+ else {"item_id": client.player_id, "title": "Music Assistant"},
send_flush=send_flush,
transition=SlimTransition.CROSSFADE if crossfade else SlimTransition.NONE,
transition_duration=transition_duration,
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player."""
- # forward command to player and any connected sync child's
+ # forward command to player and any connected sync members
async with asyncio.TaskGroup() as tg:
for client in self._get_sync_clients(player_id):
if client.state not in (
# update player state on player events
player.available = True
- player.current_url = client.current_url
+ player.current_item_id = (
+ client.current_metadata.get("item_id")
+ if client.current_metadata
+ else client.current_url
+ )
+ player.active_source = player.player_id
player.name = client.name
player.powered = client.powered
player.state = STATE_MAP[client.state]
player = self.mass.players.get(client.player_id)
sync_master_id = player.synced_to
if not sync_master_id:
- # we only correct sync child's, not the sync master itself
+ # we only correct sync members, not the sync master itself
return
if sync_master_id not in self._socket_clients:
return # just here as a guard as bad things can happen
return
if player.active_source != player.player_id:
return
- try:
- next_url, next_item, crossfade = await self.mass.player_queues.preload_next_url(
- client.player_id
+ with suppress(QueueEmpty):
+ next_item = await self.mass.player_queues.preload_next_item(client.player_id)
+ url = await self.mass.streams.resolve_stream_url(
+ queue_item=next_item,
+ output_codec=ContentType.FLAC,
+ flow_mode=False,
+ )
+ await self._handle_play_url(
+ client,
+ url=url,
+ queue_item=next_item,
+ send_flush=False,
+ auto_play=True,
)
- async with asyncio.TaskGroup() as tg:
- for client in self._get_sync_clients(client.player_id):
- tg.create_task(
- self._handle_play_url(
- client,
- url=next_url,
- queue_item=next_item,
- send_flush=False,
- crossfade=crossfade,
- auto_play=True,
- )
- )
- except QueueEmpty:
- pass
async def _handle_buffer_ready(self, client: SlimClient) -> None:
"""Handle buffer ready event, player has buffered a (new) track."""
# https://wiki.slimdevices.com/index.php/SlimProto_TCP_protocol.html#u.2C_p.2C_a_.26_t_commands_and_replay_gain_field
await client.send_strm(b"a", replay_gain=int(millis))
- def _get_sync_clients(self, player_id: str) -> Generator[SlimClient]:
+ def _get_sync_clients(self, player_id: str) -> list[SlimClient]:
"""Get all sync clients for a player."""
player = self.mass.players.get(player_id)
+ sync_clients: list[SlimClient] = []
# we need to return the player itself too
group_child_ids = {player_id}
group_child_ids.update(player.group_childs)
for child_id in group_child_ids:
if client := self._socket_clients.get(child_id):
- yield client
+ sync_clients.append(client)
+ return sync_clients
def _get_corrected_elapsed_milliseconds(self, client: SlimClient) -> int:
"""Return corrected elapsed milliseconds."""
new_repeat_mode = repeat_map.get(int(arg))
self.mass.player_queues.set_repeat(queue.queue_id, new_repeat_mode)
return
- if subcommand == "crossfade":
- self.mass.player_queues.set_crossfade(queue.queue_id, bool(arg))
- return
self.logger.warning("Unhandled command: playlist/%s", subcommand)
from __future__ import annotations
import asyncio
+import random
import time
-import uuid
-from typing import TYPE_CHECKING
+from contextlib import suppress
+from typing import TYPE_CHECKING, cast
-from ffmpeg import FFmpegError
-from ffmpeg.asyncio import FFmpeg
from snapcast.control import create_server
-from snapcast.control.client import Snapclient as SnapClient
-from snapcast.control.group import Snapgroup as SnapGroup
-from snapcast.control.stream import Snapstream as SnapStream
-
-from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
+from snapcast.control.client import Snapclient
+from snapcast.control.group import Snapgroup
+from snapcast.control.stream import Snapstream
+
+from music_assistant.common.models.config_entries import (
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_CROSSFADE_DURATION,
+ ConfigEntry,
+ ConfigValueType,
+)
from music_assistant.common.models.enums import (
ConfigEntryType,
+ ContentType,
PlayerFeature,
PlayerState,
PlayerType,
)
from music_assistant.common.models.errors import SetupFailedError
+from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
+ from snapcast.control.server import Snapserver
+
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
+
CONF_SNAPCAST_SERVER_HOST = "snapcast_server_host"
CONF_SNAPCAST_SERVER_CONTROL_PORT = "snapcast_server_control_port"
class SnapCastProvider(PlayerProvider):
"""Player provider for Snapcast based players."""
- _snapserver: [asyncio.Server | asyncio.BaseTransport]
+ _snapserver: Snapserver
snapcast_server_host: str
snapcast_server_control_port: int
+ _stream_tasks: dict[str, asyncio.Task]
async def handle_setup(self) -> None:
"""Handle async initialization of the provider."""
self.snapcast_server_host = self.config.get_value(CONF_SNAPCAST_SERVER_HOST)
self.snapcast_server_control_port = self.config.get_value(CONF_SNAPCAST_SERVER_CONTROL_PORT)
+ self._stream_tasks = {}
try:
self._snapserver = await create_server(
self.mass.loop,
f"Started Snapserver connection on:"
f"{self.snapcast_server_host}:{self.snapcast_server_control_port}"
)
- except OSError:
- raise SetupFailedError("Unable to start the Snapserver connection ?")
+ except OSError as err:
+ raise SetupFailedError("Unable to start the Snapserver connection ?") from err
def _handle_update(self) -> None:
"""Process Snapcast init Player/Group and set callback ."""
for snap_group in self._snapserver.groups:
snap_group.set_callback(self._handle_group_update)
- def _handle_group_update(self, snap_group: SnapGroup) -> None: # noqa: ARG002
+ def _handle_group_update(self, snap_group: Snapgroup) -> None: # noqa: ARG002
"""Process Snapcast group callback."""
for snap_client in self._snapserver.clients:
self._handle_player_update(snap_client)
- def _handle_player_init(self, snap_client: SnapClient) -> None:
+ def _handle_player_init(self, snap_client: Snapclient) -> None:
"""Process Snapcast add to Player controller."""
player_id = snap_client.identifier
player = self.mass.players.get(player_id, raise_unavailable=False)
if not player:
- snap_client = self._snapserver.client(player_id)
- self.mass.create_task(self._set_snapclient_empty_stream(player_id))
+ snap_client = cast(Snapclient, self._snapserver.client(player_id))
player = Player(
player_id=player_id,
provider=self.domain,
)
self.mass.players.register_or_update(player)
- def _handle_player_update(self, snap_client: SnapClient) -> None:
+ def _handle_player_update(self, snap_client: Snapclient) -> None:
"""Process Snapcast update to Player controller."""
player_id = snap_client.identifier
player = self.mass.players.get(player_id)
)
player.synced_to = self._synced_to(player_id)
player.group_childs = self._group_childs(player_id)
+ if player.current_item_id and player_id in player.current_item_id:
+ player.active_source = player_id
+ elif stream := self._get_snapstream(player_id):
+ player.active_source = stream.name
self.mass.players.register_or_update(player)
async def unload(self) -> None:
await self.cmd_stop(client.identifier)
self._snapserver.stop()
+ async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
+ """Return all (provider/player specific) Config Entries for the given player (if any)."""
+ base_entries = await super().get_player_config_entries(player_id)
+ return base_entries + (
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_CROSSFADE_DURATION,
+ )
+
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
- mass_player = self.mass.players.get(player_id)
- if mass_player.volume_level != volume_level:
- await self._snapserver.client_volume(
- player_id, {"percent": volume_level, "muted": mass_player.volume_muted}
- )
- self.cmd_volume_mute(player_id, False)
-
- async def cmd_play_url(
- self,
- player_id: str,
- url: str,
- queue_item: QueueItem | None, # noqa: ARG002
- ) -> None:
- """Send PLAY URL command to given player.
-
- This is called when the Queue wants the player to start playing a specific url.
- If an item from the Queue is being played, the QueueItem will be provided with
- all metadata present.
-
- - player_id: player_id of the player to handle the command.
- - url: the url that the player should start playing.
- - queue_item: the QueueItem that is related to the URL (None when playing direct url).
- """
- player = self.mass.players.get(player_id)
- stream = await self._set_snapclient_empty_stream(player_id)
-
- stream_host = stream._stream.get("uri").get("host")
- stream_host = stream_host.replace("0.0.0.0", self.snapcast_server_host)
- ffmpeg = (
- FFmpeg()
- .option("y")
- .option("re")
- .input(url=url)
- .output(
- f"tcp://{stream_host}",
- f="s16le",
- acodec="pcm_s16le",
- ac=2,
- ar=48000,
- )
+ await self._snapserver.client_volume(
+ player_id, {"percent": volume_level, "muted": volume_level != 0}
)
- await self.cmd_stop(player_id)
-
- ffmpeg_task = self.mass.create_task(ffmpeg.execute())
-
- @ffmpeg.on("start")
- async def on_start(arguments: list[str]):
- self.logger.debug("Ffmpeg stream is running")
- stream.ffmpeg = ffmpeg
- stream.ffmpeg_task = ffmpeg_task
- player.current_url = url
- player.elapsed_time = 0
- player.elapsed_time_last_updated = time.time()
- player.state = PlayerState.PLAYING
- self._set_childs_state(player_id, PlayerState.PLAYING)
- self.mass.players.register_or_update(player)
-
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
player = self.mass.players.get(player_id, raise_unavailable=False)
- if player.state != PlayerState.IDLE:
- stream = self._get_snapstream(player_id)
- if hasattr(stream, "ffmpeg_task") and stream.ffmpeg_task.done() is False:
- try:
- stream.ffmpeg.terminate()
- stream.ffmpeg_task.cancel()
- self.logger.debug("ffmpeg player stopped")
- except FFmpegError:
- self.logger.debug("Fail to stop ffmpeg player")
- player.state = PlayerState.IDLE
- self._set_childs_state(player_id, PlayerState.IDLE)
- self.mass.players.register_or_update(player)
-
- async def cmd_pause(self, player_id: str) -> None:
- """Send PAUSE command to given player."""
- await self.cmd_stop(player_id)
+ if stream_task := self._stream_tasks.pop(player_id, None): # noqa: SIM102
+ if not stream_task.done():
+ stream_task.cancel()
+ player.state = PlayerState.IDLE
+ self._set_childs_state(player_id, PlayerState.IDLE)
+ self.mass.players.register_or_update(player)
+ # assign default/empty stream to the player
+ await self._get_snapgroup(player_id).set_stream("default")
async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
"""Send MUTE command to given player."""
"""Unsync Snapcast player."""
group = self._get_snapgroup(player_id)
await group.remove_client(player_id)
- group = self._get_snapgroup(player_id)
- stream_id = await self._get_empty_stream()
- await group.set_stream(stream_id)
+ # assign default/empty stream to the player
+ await self._get_snapgroup(player_id).set_stream("default")
self._handle_update()
- def _get_snapgroup(self, player_id: str) -> SnapGroup:
+ async def play_media(
+ self,
+ player_id: str,
+ queue_item: QueueItem,
+ seek_position: int,
+ fade_in: bool,
+ ) -> None:
+ """Handle PLAY MEDIA on given player.
+
+ This is called by the Queue controller to start playing a queue item on the given player.
+ The provider's own implementation should work out how to handle this request.
+
+ - player_id: player_id of the player to handle the command.
+ - queue_item: The QueueItem that needs to be played on the player.
+ - seek_position: Optional seek to this position.
+ - fade_in: Optionally fade in the item at playback start.
+ """
+ player = self.mass.players.get(player_id)
+ if player.synced_to:
+ raise RuntimeError("A synced player cannot receive play commands directly")
+ # stop any existing streams first
+ await self.cmd_stop(player_id)
+ queue = self.mass.player_queues.get(queue_item.queue_id)
+ stream, port = await self._create_stream()
+ snap_group = self._get_snapgroup(player_id)
+ await snap_group.set_stream(stream.identifier)
+
+ async def queue_streamer():
+ host = self.snapcast_server_host
+ _, writer = await asyncio.open_connection(host, port)
+ self.logger.debug("Opened connection to %s:%s", host, port)
+ player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}"
+ player.elapsed_time = 0
+ player.elapsed_time_last_updated = time.time()
+ player.state = PlayerState.PLAYING
+ self._set_childs_state(player_id, PlayerState.PLAYING)
+ self.mass.players.register_or_update(player)
+ # TODO: can we handle 24 bits bit depth ?
+ pcm_format = AudioFormat(
+ content_type=ContentType.PCM_S16LE,
+ sample_rate=48000,
+ bit_depth=16,
+ channels=2,
+ )
+ try:
+ async for pcm_chunk in self.mass.streams.get_flow_stream(
+ queue,
+ start_queue_item=queue_item,
+ pcm_format=pcm_format,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ ):
+ writer.write(pcm_chunk)
+ await writer.drain()
+
+ finally:
+ await self._snapserver.stream_remove_stream(stream.identifier)
+ if writer.can_write_eof():
+ writer.close()
+ if not writer.is_closing():
+ writer.close()
+ self.logger.debug("Closed connection to %s:%s", host, port)
+
+ # start streaming the queue (pcm) audio in a background task
+ self._stream_tasks[player_id] = asyncio.create_task(queue_streamer())
+
+ def _get_snapgroup(self, player_id: str) -> Snapgroup:
"""Get snapcast group for given player_id."""
- client = self._snapserver.client(player_id)
+ client: Snapclient = self._snapserver.client(player_id)
return client.group
- def _get_snapstream(self, player_id: str) -> SnapStream:
+ def _get_snapstream(self, player_id: str) -> Snapstream | None:
"""Get snapcast stream for given player_id."""
- group = self._get_snapgroup(player_id)
- return self._snapserver.stream(group.stream)
+ if group := self._get_snapgroup(player_id):
+ with suppress(KeyError):
+ return self._snapserver.stream(group.stream)
+ return None
def _synced_to(self, player_id: str) -> str | None:
"""Return player_id of the player this player is synced to."""
snap_group = self._get_snapgroup(player_id)
return {snap_client for snap_client in snap_group.clients if snap_client != player_id}
- async def _get_empty_stream(self) -> str:
- """Find or create empty stream on snapcast server.
-
- This method ensures that there is a snapstream for each snapclient,
- even if the snapserver only have one stream configured. This is needed
- because the default config of snapserver is one stream on a named pipe.
- """
- used_streams = {group.stream for group in self._snapserver.groups}
- for stream in self._snapserver.streams:
- if stream.path == "" and stream.identifier not in used_streams:
- return stream.identifier
- port = 4953
- name = str(uuid.uuid4())
- while True:
- new_stream = await self._snapserver.stream_add_stream(
- f"tcp://0.0.0.0:{port}?name={f'MA_{name}'}&sampleformat=48000:16:2",
+ async def _create_stream(self) -> tuple[Snapstream, int]:
+ """Create new stream on snapcast server."""
+ attempts = 50
+ while attempts:
+ attempts -= 1
+ # pick a random port
+ port = random.randint(4953, 4953 + 200)
+ name = f"MusicAssistant--{port}"
+ result = await self._snapserver.stream_add_stream(
+ # TODO: can we handle 24 bits bit depth ?
+ f"tcp://0.0.0.0:{port}?name={name}&sampleformat=48000:16:2",
)
- if "id" in new_stream and new_stream["id"] not in used_streams:
- return new_stream["id"]
- port += 1
+ if "id" not in result:
+ # if the port is already taken, the result will be an error
+ self.logger.warning(result)
+ continue
+ stream = self._snapserver.stream(result["id"])
+ return (stream, port)
+ raise RuntimeError("Unable to create stream - No free port found?")
def _get_player_state(self, player_id: str) -> PlayerState:
"""Return the state of the player."""
for child_player_id in self._group_childs(player_id):
player = self.mass.players.get(child_player_id)
player.state = state
- self.mass.players.update(player)
-
- async def _set_snapclient_empty_stream(self, player_id: str) -> SnapStream:
- """Set the snapclient stream to empty and return new stream."""
- new_stream_id = await self._get_empty_stream()
- await self._get_snapgroup(player_id).set_stream(new_stream_id)
- stream = self._snapserver.stream(new_stream_id)
- return stream
+ self.mass.players.update(child_player_id)
from soco.events_base import SubscriptionBase
from soco.groups import ZoneGroup
-from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
+from music_assistant.common.models.config_entries import (
+ CONF_ENTRY_CROSSFADE,
+ ConfigEntry,
+ ConfigValueType,
+)
from music_assistant.common.models.enums import (
ConfigEntryType,
+ ContentType,
PlayerFeature,
PlayerState,
PlayerType,
)
-from music_assistant.common.models.errors import PlayerUnavailableError, QueueEmpty
+from music_assistant.common.models.errors import PlayerCommandFailed, PlayerUnavailableError
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_PLAYERS
+from music_assistant.constants import CONF_CROSSFADE, CONF_PLAYERS
from music_assistant.server.helpers.didl_lite import create_didl_metadata
from music_assistant.server.models.player_provider import PlayerProvider
PLAYER_FEATURES = (
- PlayerFeature.SET_MEMBERS,
PlayerFeature.SYNC,
PlayerFeature.VOLUME_MUTE,
PlayerFeature.VOLUME_SET,
+ PlayerFeature.ENQUEUE_NEXT,
)
CONF_NETWORK_SCAN = "network_scan"
# to allow coextistence with HA on the same host
config.EVENT_LISTENER_PORT = 1700
+HIRES_MODELS = (
+ "Sonos Roam",
+ "Sonos Arc",
+ "Sonos Beam",
+ "Sonos Five",
+ "Sonos Move",
+ "Sonos One SL",
+ "Sonos Port",
+ "Sonos Amp",
+ "SYMFONISK Bookshelf",
+ "SYMFONISK Table Lamp",
+)
+
async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
soco: soco.SoCo
player: Player
is_stereo_pair: bool = False
- next_url: str | None = None
elapsed_time: int = 0
playback_started: float | None = None
need_elapsed_time_workaround: bool = False
self.playback_started = now
# media info (track info)
- self.player.current_url = self.track_info["uri"]
+ self.player.current_item_id = self.track_info["uri"]
+ if self.player.player_id in self.player.current_item_id:
+ self.player.active_source = self.player.player_id
+ elif "spotify" in self.player.current_item_id:
+ self.player.active_source = "spotify"
+ elif self.player.current_item_id.startswith("http"):
+ self.player.active_source = "http"
+ else:
+ # TODO: handle other possible sources here
+ self.player.active_source = None
if not self.need_elapsed_time_workaround:
self.player.elapsed_time = self.elapsed_time
self.player.elapsed_time_last_updated = self.track_info_updated
player.soco.end_direct_control_session()
self.sonosplayers = None
+ async def get_player_config_entries(
+ self, player_id: str # noqa: ARG002
+ ) -> tuple[ConfigEntry, ...]:
+ """Return Config Entries for the given player."""
+ base_entries = await super().get_player_config_entries(player_id)
+ if not (sonos_player := self.sonosplayers.get(player_id)):
+ return base_entries
+ return base_entries + (
+ CONF_ENTRY_CROSSFADE,
+ ConfigEntry(
+ key="sonos_bass",
+ type=ConfigEntryType.INTEGER,
+ label="Bass",
+ default_value=0,
+ range=(-10, 10),
+ description="Set the Bass level for the Sonos player",
+ value=sonos_player.soco.bass,
+ advanced=True,
+ ),
+ ConfigEntry(
+ key="sonos_treble",
+ type=ConfigEntryType.INTEGER,
+ label="Treble",
+ default_value=0,
+ range=(-10, 10),
+ description="Set the Treble level for the Sonos player",
+ value=sonos_player.soco.treble,
+ advanced=True,
+ ),
+ ConfigEntry(
+ key="sonos_loudness",
+ type=ConfigEntryType.BOOLEAN,
+ label="Loudness compensation",
+ default_value=True,
+ description="Enable loudness compensation on the Sonos player",
+ value=sonos_player.soco.loudness,
+ advanced=True,
+ ),
+ )
+
def on_player_config_changed(
self, config: PlayerConfig, changed_keys: set[str] # noqa: ARG002
) -> None:
"""Call (by config manager) when the configuration of a player changes."""
- # run discovery to catch any re-enabled players
- self.mass.create_task(self._run_discovery())
+ if "enabled" in changed_keys:
+ # run discovery to catch any re-enabled players
+ self.mass.create_task(self._run_discovery())
+ if not (sonos_player := self.sonosplayers.get(config.player_id)):
+ return
+ if "values/sonos_bass" in changed_keys:
+ self.mass.create_task(
+ sonos_player.soco.renderingControl.SetBass,
+ [("InstanceID", 0), ("DesiredBass", config.get_value("sonos_bass"))],
+ )
+ if "values/sonos_treble" in changed_keys:
+ self.mass.create_task(
+ sonos_player.soco.renderingControl.SetTreble,
+ [("InstanceID", 0), ("DesiredTreble", config.get_value("sonos_treble"))],
+ )
+ if "values/sonos_loudness" in changed_keys:
+ loudness_value = "1" if config.get_value("sonos_loudness") else "0"
+ self.mass.create_task(
+ sonos_player.soco.renderingControl.SetLoudness,
+ [
+ ("InstanceID", 0),
+ ("Channel", "Master"),
+ ("DesiredLoudness", loudness_value),
+ ],
+ )
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
return
await asyncio.to_thread(sonos_player.soco.play)
- async def cmd_play_url(
- self,
- player_id: str,
- url: str,
- queue_item: QueueItem | None,
- ) -> None:
- """Send PLAY URL command to given player.
-
- This is called when the Queue wants the player to start playing a specific url.
- If an item from the Queue is being played, the QueueItem will be provided with
- all metadata present.
-
- - player_id: player_id of the player to handle the command.
- - url: the url that the player should start playing.
- - queue_item: the QueueItem that is related to the URL (None when playing direct url).
- """
- sonos_player = self.sonosplayers[player_id]
- if not sonos_player.soco.is_coordinator:
- self.logger.debug(
- "Ignore PLAY_MEDIA command for %s: Player is synced to another player.",
- player_id,
- )
- return
- # always stop and clear queue first
- sonos_player.next_url = None
- await asyncio.to_thread(sonos_player.soco.stop)
- await asyncio.to_thread(sonos_player.soco.clear_queue)
-
- if queue_item is None:
- # enforce mp3 radio mode for flow stream
- url = url.replace(".flac", ".mp3").replace(".wav", ".mp3")
- await asyncio.to_thread(
- sonos_player.soco.play_uri, url, title="Music Assistant", force_radio=True
- )
- else:
- await self._enqueue_item(sonos_player, url=url, queue_item=queue_item)
- await asyncio.to_thread(sonos_player.soco.play_from_queue, 0)
- # optimistically set this timestamp to help figure out elapsed time later
- now = time.time()
- sonos_player.playback_started = now
- sonos_player.player.elapsed_time = 0
- sonos_player.player.elapsed_time_last_updated = now
-
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player."""
sonos_player = self.sonosplayers[player_id]
update_group_info=True,
)
+ async def play_media(
+ self,
+ player_id: str,
+ queue_item: QueueItem,
+ seek_position: int,
+ fade_in: bool,
+ ) -> None:
+ """Handle PLAY MEDIA on given player.
+
+ This is called by the Queue controller to start playing a queue item on the given player.
+ The provider's own implementation should work out how to handle this request.
+
+ - player_id: player_id of the player to handle the command.
+ - queue_item: The QueueItem that needs to be played on the player.
+ - seek_position: Optional seek to this position.
+ - fade_in: Optionally fade in the item at playback start.
+ """
+ url = await self.mass.streams.resolve_stream_url(
+ queue_item=queue_item,
+ output_codec=ContentType.FLAC,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ flow_mode=False,
+ )
+ sonos_player = self.sonosplayers[player_id]
+ if not sonos_player.soco.is_coordinator:
+ # this should be already handled by the player manager, but just in case...
+ raise PlayerCommandFailed(
+ f"Player {sonos_player.player.display_name} can not "
+ "accept play_media command, it is synced to another player."
+ )
+ # always stop and clear queue first
+ await asyncio.to_thread(sonos_player.soco.stop)
+ await asyncio.to_thread(sonos_player.soco.clear_queue)
+ await self._enqueue_item(sonos_player, url=url, queue_item=queue_item)
+ await asyncio.to_thread(sonos_player.soco.play_from_queue, 0)
+ # optimistically set this timestamp to help figure out elapsed time later
+ now = time.time()
+ sonos_player.playback_started = now
+ sonos_player.player.elapsed_time = 0
+ sonos_player.player.elapsed_time_last_updated = now
+
+ async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
+ """
+ Handle enqueuing of the next queue item on the player.
+
+ If the player supports PlayerFeature.ENQUE_NEXT:
+ This will be called about 10 seconds before the end of the track.
+ If the player does NOT report support for PlayerFeature.ENQUE_NEXT:
+ This will be called when the end of the track is reached.
+
+ A PlayerProvider implementation is in itself responsible for handling this
+ so that the queue items keep playing until its empty or the player stopped.
+
+ This will NOT be called if the end of the queue is reached (and repeat disabled).
+ This will NOT be called if flow mode is enabled on the queue.
+ """
+ sonos_player = self.sonosplayers[player_id]
+ url = await self.mass.streams.resolve_stream_url(
+ queue_item=queue_item,
+ output_codec=ContentType.FLAC,
+ )
+ # set crossfade according to player setting
+ crossfade = await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
+ if sonos_player.soco.cross_fade != crossfade:
+
+ def set_crossfade():
+ with suppress(Exception):
+ sonos_player.soco.cross_fade = crossfade
+
+ await asyncio.to_thread(set_crossfade)
+
+ await self._enqueue_item(sonos_player, url=url, queue_item=queue_item)
+
async def poll_player(self, player_id: str) -> None:
"""Poll player for state updates.
address=soco_device.ip_address,
manufacturer=self.name,
),
- max_sample_rate=48000,
- supports_24bit=True,
+ max_sample_rate=441000,
+ supports_24bit=False,
),
speaker_info=speaker_info,
speaker_info_updated=time.time(),
)
+ if speaker_info["model_name"] in HIRES_MODELS:
+ sonos_player.player.max_sample_rate = 48000
+ sonos_player.player.supports_24bit = True
+
# poll all endpoints once and update attributes
await sonos_player.check_poll()
sonos_player.update_attributes()
sonos_player.group_info_updated = time.time()
asyncio.run_coroutine_threadsafe(self._update_player(sonos_player), self.mass.loop)
- async def _enqueue_next_track(self, sonos_player: SonosPlayer) -> None:
- """Enqueue the next track of the MA queue on the CC queue."""
- try:
- next_url, next_item, crossfade = await self.mass.player_queues.preload_next_url(
- sonos_player.player_id
- )
- except QueueEmpty:
- return
-
- if sonos_player.next_url == next_url:
- return # already set ?!
- sonos_player.next_url = next_url
-
- # set crossfade according to queue mode
- if sonos_player.soco.cross_fade != crossfade:
-
- def set_crossfade():
- with suppress(Exception):
- sonos_player.soco.cross_fade = crossfade
-
- await asyncio.to_thread(set_crossfade)
-
- # send queue item to sonos queue
- await self._enqueue_item(sonos_player, url=next_url, queue_item=next_item)
-
async def _enqueue_item(
self,
sonos_player: SonosPlayer,
url: str,
- queue_item: QueueItem | None = None,
+ queue_item: QueueItem,
) -> None:
"""Enqueue a queue item to the Sonos player Queue."""
metadata = create_didl_metadata(self.mass, url, queue_item)
async def _update_player(self, sonos_player: SonosPlayer, signal_update: bool = True) -> None:
"""Update Sonos Player."""
- prev_url = sonos_player.player.current_url
+ prev_url = sonos_player.player.current_item_id
prev_state = sonos_player.player.state
sonos_player.update_attributes()
sonos_player.player.can_sync_with = tuple(
x for x in self.sonosplayers if x != sonos_player.player_id
)
- current_url = sonos_player.player.current_url
+ current_url = sonos_player.player.current_item_id
current_state = sonos_player.player.state
if (prev_url != current_url) or (prev_state != current_state):
# will detect changes to the player object itself
self.mass.players.update(sonos_player.player_id)
- # enqueue next item if needed
- if (
- sonos_player.player.state == PlayerState.PLAYING
- and sonos_player.player.active_source == sonos_player.player.player_id
- and sonos_player.next_url in (None, sonos_player.player.current_url)
- ):
- self.mass.create_task(self._enqueue_next_track(sonos_player))
-
def _convert_state(sonos_state: str) -> PlayerState:
"""Convert Sonos state to PlayerState."""
import asyncio
from typing import TYPE_CHECKING
-import shortuuid
-
from music_assistant.common.models.config_entries import (
CONF_ENTRY_EQ_BASS,
CONF_ENTRY_EQ_MID,
CONF_ENTRY_FLOW_MODE,
CONF_ENTRY_HIDE_GROUP_MEMBERS,
CONF_ENTRY_OUTPUT_CHANNELS,
- CONF_ENTRY_OUTPUT_CODEC,
ConfigEntry,
ConfigValueOption,
ConfigValueType,
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
- from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
CONF_ENTRY_EQ_TREBLE_HIDDEN = ConfigEntry.from_dict(
{**CONF_ENTRY_EQ_TREBLE.to_dict(), "hidden": True}
)
-CONF_ENTRY_OUTPUT_CODEC_HIDDEN = ConfigEntry.from_dict(
- {**CONF_ENTRY_OUTPUT_CODEC.to_dict(), "hidden": True}
-)
CONF_ENTRY_GROUPED_POWER_ON = ConfigEntry(
key=CONF_GROUPED_POWER_ON,
type=ConfigEntryType.BOOLEAN,
PlayerFeature.PAUSE,
PlayerFeature.VOLUME_SET,
PlayerFeature.VOLUME_MUTE,
- PlayerFeature.SET_MEMBERS,
),
- max_sample_rate=96000,
+ max_sample_rate=48000,
supports_24bit=True,
active_source=conf_key,
group_childs=player_conf,
"child players will be treated as (un)mute commands to prevent the small "
"interruption of music when the stream is restarted.",
),
- CONF_ENTRY_OUTPUT_CHANNELS_FORCED_STEREO,
- CONF_ENTRY_FORCED_FLOW_MODE,
- # group player outputs to individual members so
- # these settings make no sense, hide them
- CONF_ENTRY_EQ_BASS_HIDDEN,
- CONF_ENTRY_EQ_MID_HIDDEN,
- CONF_ENTRY_EQ_TREBLE_HIDDEN,
- CONF_ENTRY_OUTPUT_CODEC_HIDDEN,
)
async def cmd_stop(self, player_id: str) -> None:
):
tg.create_task(self.mass.players.cmd_play(member.player_id))
- async def cmd_play_url(
+ async def play_media(
self,
player_id: str,
- url: str,
- queue_item: QueueItem | None,
+ queue_item: QueueItem,
+ seek_position: int,
+ fade_in: bool,
) -> None:
- """Send PLAY URL command to given player.
+ """Handle PLAY MEDIA on given player.
- This is called when the Queue wants the player to start playing a specific url.
- If an item from the Queue is being played, the QueueItem will be provided with
- all metadata present.
+ This is called by the Queue controller to start playing a queue item on the given player.
+ The provider's own implementation should work out how to handle this request.
- player_id: player_id of the player to handle the command.
- - url: the url that the player should start playing.
- - queue_item: the QueueItem that is related to the URL (None when playing direct url).
+ - queue_item: The QueueItem that needs to be played on the player.
+ - seek_position: Optional seek to this position.
+ - fade_in: Optionally fade in the item at playback start.
"""
- # send stop first
- await self.cmd_stop(player_id)
- # debounce
- # this can potentially be called multiple times at the (near) exact time
- # due to many child players being powered on (or resynced) at the same time
- # debounce the command a bit by only letting through the last one.
- self.debounce_id = debounce_id = shortuuid.uuid()
- await asyncio.sleep(100)
- if self.debounce_id != debounce_id:
- return
# power ON
await self.cmd_power(player_id, True)
group_player = self.mass.players.get(player_id)
-
active_members = self._get_active_members(
player_id, only_powered=True, skip_sync_childs=True
)
group_player.extra_data["optimistic_state"] = PlayerState.PLAYING
- # forward command to all (powered) group child's
+ # forward the command to all (sync master) group child's
async with asyncio.TaskGroup() as tg:
for member in active_members:
player_prov = self.mass.players.get_player_provider(member.player_id)
tg.create_task(
- player_prov.cmd_play_url(member.player_id, url=url, queue_item=queue_item)
+ player_prov.play_media(
+ member.player_id,
+ queue_item=queue_item,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ )
)
- async def cmd_handle_stream_job(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
- """Handle StreamJob play command on given player."""
- # send stop first
- await self.cmd_stop(player_id)
- # power ON
- await self.cmd_power(player_id, True)
- group_player = self.mass.players.get(player_id)
+ async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
+ """
+ Handle enqueuing of the next queue item on the player.
- active_members = self._get_active_members(
- player_id, only_powered=True, skip_sync_childs=True
- )
- if len(active_members) == 0:
- self.logger.warning(
- "Play media requested for player %s but no member players are powered, "
- "the request will be ignored",
- group_player.display_name,
- )
- return
+ If the player supports PlayerFeature.ENQUE_NEXT:
+ This will be called about 10 seconds before the end of the track.
+ If the player does NOT report support for PlayerFeature.ENQUE_NEXT:
+ This will be called when the end of the track is reached.
- group_player.extra_data["optimistic_state"] = PlayerState.PLAYING
+ A PlayerProvider implementation is in itself responsible for handling this
+ so that the queue items keep playing until its empty or the player stopped.
- # forward command to all (powered) group child's
+ This will NOT be called if the end of the queue is reached (and repeat disabled).
+ This will NOT be called if the player is using flow mode to playback the queue.
+ """
+ # forward the command to all (sync master) group child's
async with asyncio.TaskGroup() as tg:
- for member in active_members:
+ for member in self._get_active_members(
+ player_id, only_powered=False, skip_sync_childs=True
+ ):
player_prov = self.mass.players.get_player_provider(member.player_id)
- # we forward the stream_job to child to allow for nested groups etc
- tg.create_task(
- player_prov.cmd_handle_stream_job(member.player_id, stream_job=stream_job)
- )
+ tg.create_task(player_prov.enqueue_next_queue_item(member.player_id, queue_item))
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player."""
group_player = self.mass.players.get(player_id)
if not group_player.powered:
group_player.state = PlayerState.IDLE
+ group_player.active_source = None
return
all_members = self._get_active_members(
player_id, only_powered=False, skip_sync_childs=False
)
- if all_members:
- group_player.max_sample_rate = max(x.max_sample_rate for x in all_members)
group_player.group_childs = list(x.player_id for x in all_members)
+ group_player.active_source = player_id
# read the state from the first active group member
for member in all_members:
if member.synced_to:
player_powered = member.powered
if not player_powered:
continue
- group_player.current_url = member.current_url
+ group_player.current_item_id = member.current_item_id
group_player.elapsed_time = member.elapsed_time
group_player.elapsed_time_last_updated = member.elapsed_time_last_updated
group_player.state = member.state