types: [python]
entry: scripts/run-in-env.sh ruff check --fix
require_serial: true
- stages: [commit, push, manual]
+ stages: [pre-commit, pre-push, manual]
- id: ruff-format
name: 🐶 Ruff Formatter
language: system
types: [python]
entry: scripts/run-in-env.sh ruff format
require_serial: true
- stages: [commit, push, manual]
+ stages: [pre-commit, pre-push, manual]
- id: check-ast
name: 🐍 Check Python AST
language: system
language: system
types: [text, executable]
entry: scripts/run-in-env.sh check-executables-have-shebangs
- stages: [commit, push, manual]
+ stages: [pre-commit, pre-push, manual]
- id: check-json
name: { Check JSON files
language: system
language: system
types: [text]
entry: scripts/run-in-env.sh end-of-file-fixer
- stages: [commit, push, manual]
+ stages: [pre-commit, pre-push, manual]
- id: no-commit-to-branch
name: 🛑 Don't commit to main branch
language: system
language: system
types: [text]
entry: scripts/run-in-env.sh trailing-whitespace-fixer
- stages: [commit, push, manual]
+ stages: [pre-commit, pre-push, manual]
- id: mypy
name: mypy
entry: scripts/run-in-env.sh mypy
bit_depth: int = 16
channels: int = 2
output_format_str: str = ""
- bit_rate: int = 320 # optional
+ bit_rate: int | None = None # optional bitrate in kbps
def __post_init__(self) -> None:
"""Execute actions after init."""
)
elif not self.output_format_str:
self.output_format_str = self.content_type.value
+ if self.bit_rate and self.bit_rate > 100000:
+ # correct bit rate in bits per second to kbps
+ self.bit_rate = int(self.bit_rate / 1000)
@property
def quality(self) -> int:
# lossy content, bit_rate is most important score
# but prefer some codecs over others
# calculate a rough score based on bit rate per channel
- bit_rate_score = (self.bit_rate / self.channels) / 100
+ bit_rate = self.bit_rate or 320
+ bit_rate_score = (bit_rate / self.channels) / 100
if self.content_type in (ContentType.AAC, ContentType.OGG):
bit_rate_score += 1
return int(bit_rate_score)
return False
return self.output_format_str == other.output_format_str
+ def __post_serialize__(self, d: dict[Any, Any]) -> dict[Any, Any]:
+ """Execute action(s) on serialization."""
+ # bit_rate is now optional. Set default value to keep compatibility
+ # TODO: remove this after release of MA 2.5
+ d["bit_rate"] = d["bit_rate"] or 0
+ return d
+
@dataclass(kw_only=True)
class ProviderMapping(DataClassDictMixin):
player_id,
CONF_TTS_PRE_ANNOUNCE,
)
- if not native_announce_support and player.active_group:
- for group_member in self.iter_group_members(player, True, True):
- if PlayerFeature.PLAY_ANNOUNCEMENT in group_member.supported_features:
- native_announce_support = True
- break
- # redirect to group player if playergroup is active
- self.logger.warning(
- "Detected announcement request to a player which has a group active, "
- "this will be redirected to the group."
- )
- await self.play_announcement(
- player.active_group, url, use_pre_announce, volume_level
- )
- return
-
- # if player type is group with all members supporting announcements
- # or if the groupplayer is not powered, we forward the request to each individual player
+ # if player type is group with all members supporting announcements,
+ # we forward the request to each individual player
if player.type == PlayerType.GROUP and (
all(
- x
+ PlayerFeature.PLAY_ANNOUNCEMENT in x.supported_features
for x in self.iter_group_members(player)
- if PlayerFeature.PLAY_ANNOUNCEMENT in x.supported_features
)
- or not player.powered
):
# forward the request to each individual player
async with TaskManager(self.mass) as tg:
)
)
return
-
self.logger.info(
"Playback announcement to player %s (with pre-announce: %s): %s",
player.display_name,
- restore the previous power and volume
- restore playback (if needed and if possible)
- This default implementation will only be used if the player's
- provider has no native support for the PLAY_ANNOUNCEMENT feature.
+ This default implementation will only be used if the player
+ (provider) has no native support for the PLAY_ANNOUNCEMENT feature.
"""
prev_power = player.powered
prev_state = player.state
prev_synced_to = player.synced_to
- queue = self.mass.player_queues.get_active_queue(player.player_id)
- prev_queue_active = queue.active
+ queue = self.mass.player_queues.get(player.active_source)
+ prev_queue_active = queue and queue.active
prev_item_id = player.current_item_id
# unsync player if its currently synced
if prev_synced_to:
for volume_player_id in player.group_childs or (player.player_id,):
if not (volume_player := self.get(volume_player_id)):
continue
- # filter out players that have a different source active
- if volume_player.active_source not in (
- player.active_source,
- volume_player.player_id,
- None,
+ # catch any players that have a different source active
+ if (
+ volume_player.active_source
+ not in (
+ player.active_source,
+ volume_player.player_id,
+ None,
+ )
+ and volume_player.state == PlayerState.PLAYING
):
- continue
+ self.logger.warning(
+ "Detected announcement to playergroup %s while group member %s is playing "
+ "other content, this may lead to unexpected behavior.",
+ player.display_name,
+ volume_player.display_name,
+ )
+ tg.create_task(self.cmd_stop(volume_player.player_id))
prev_volume = volume_player.volume_level
announcement_volume = self.get_announcement_volume(volume_player_id, volume_level)
temp_volume = announcement_volume or player.volume_level
"Server": "Music Assistant",
"transferMode.dlna.org": "Streaming",
"contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000", # noqa: E501
- "Cache-Control": "no-cache,must-revalidate",
+ "Cache-Control": "no-cache",
"Pragma": "no-cache",
- "Accept-Ranges": "none",
- "Connection": "close",
}
ICY_HEADERS = {
"icy-name": "Music Assistant",
default_sample_rate=queue_item.streamdetails.audio_format.sample_rate,
default_bit_depth=queue_item.streamdetails.audio_format.bit_depth,
)
- http_profile: str = await self.mass.config.get_player_config_value(
- queue_id, CONF_HTTP_PROFILE
- )
+
# prepare request, add some DLNA/UPNP compatible headers
headers = {
**DEFAULT_STREAM_HEADERS,
- "Content-Type": f"audio/{output_format.output_format_str}",
"icy-name": queue_item.name,
}
resp = web.StreamResponse(
reason="OK",
headers=headers,
)
- if http_profile == "forced_content_length":
- resp.content_length = get_chunksize(
- output_format, queue_item.streamdetails.duration or 120
- )
+ resp.content_type = f"audio/{output_format.output_format_str}"
+ http_profile: str = await self.mass.config.get_player_config_value(
+ queue_id, CONF_HTTP_PROFILE
+ )
+ if http_profile == "forced_content_length" and queue_item.duration:
+ # guess content length based on duration
+ resp.content_length = get_chunksize(output_format, queue_item.duration)
elif http_profile == "chunked":
resp.enable_chunked_encoding()
enable_icy = request.headers.get("Icy-MetaData", "") == "1" and icy_preference != "disabled"
icy_meta_interval = 256000 if icy_preference == "full" else 16384
- # prepare request, add some DLNA/UPNP compatible headers
- http_profile: str = await self.mass.config.get_player_config_value(
- queue_id, CONF_HTTP_PROFILE
- )
# prepare request, add some DLNA/UPNP compatible headers
headers = {
**DEFAULT_STREAM_HEADERS,
**ICY_HEADERS,
- "Content-Type": f"audio/{output_format.output_format_str}",
"Accept-Ranges": "none",
- "Cache-Control": "no-cache",
- "Connection": "close",
+ "Content-Type": f"audio/{output_format.output_format_str}",
}
if enable_icy:
headers["icy-metaint"] = str(icy_meta_interval)
reason="OK",
headers=headers,
)
+ http_profile: str = await self.mass.config.get_player_config_value(
+ queue_id, CONF_HTTP_PROFILE
+ )
if http_profile == "forced_content_length":
- resp.content_length = get_chunksize(output_format, 24 * 2600)
+ # just set an insane high content length to make sure the player keeps playing
+ resp.content_length = get_chunksize(output_format, 12 * 3600)
elif http_profile == "chunked":
resp.enable_chunked_encoding()
+
await resp.prepare(request)
# return early if this is not a GET request
# work out output format/details
fmt = request.match_info.get("fmt", announcement_url.rsplit(".")[-1])
audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
- # prepare request, add some DLNA/UPNP compatible headers
- headers = {
- **DEFAULT_STREAM_HEADERS,
- "Content-Type": f"audio/{audio_format.output_format_str}",
- }
+
+ http_profile: str = await self.mass.config.get_player_config_value(
+ player_id, CONF_HTTP_PROFILE
+ )
+ if http_profile == "forced_content_length":
+ # given the fact that an announcement is just a short audio clip,
+ # just send it over completely at once so we have a fixed content length
+ data = b""
+ async for chunk in self.get_announcement_stream(
+ announcement_url=announcement_url,
+ output_format=audio_format,
+ use_pre_announce=use_pre_announce,
+ ):
+ data += chunk
+ return web.Response(
+ body=data,
+ content_type=f"audio/{audio_format.output_format_str}",
+ headers=DEFAULT_STREAM_HEADERS,
+ )
+
resp = web.StreamResponse(
status=200,
reason="OK",
- headers=headers,
+ headers=DEFAULT_STREAM_HEADERS,
)
+ resp.content_type = f"audio/{audio_format.output_format_str}"
+ if http_profile == "chunked":
+ resp.enable_chunked_encoding()
+
await resp.prepare(request)
# return early if this is not a GET request
fmt: AudioFormat,
seconds: int = 1,
) -> int:
- """Get a default chunksize for given contenttype."""
- pcm_size = int(fmt.sample_rate * (fmt.bit_depth / 8) * 2 * seconds)
+ """Get a default chunk/file size for given contenttype in bytes."""
+ pcm_size = int(fmt.sample_rate * (fmt.bit_depth / 8) * fmt.channels * seconds)
if fmt.content_type.is_pcm() or fmt.content_type == ContentType.WAV:
return pcm_size
if fmt.content_type in (ContentType.WAV, ContentType.AIFF, ContentType.DSF):
return pcm_size
+ if fmt.bit_rate:
+ return int(((fmt.bit_rate * 1000) / 8) * seconds)
if fmt.content_type in (ContentType.FLAC, ContentType.WAVPACK, ContentType.ALAC):
- return int(pcm_size * 0.8)
+ # assume 74.7% compression ratio (level 0)
+ # source: https://z-issue.com/wp/flac-compression-level-comparison/
+ return int(pcm_size * 0.747)
if fmt.content_type in (ContentType.MP3, ContentType.OGG):
return int((320000 / 8) * seconds)
if fmt.content_type in (ContentType.AAC, ContentType.M4A):
str(output_format.channels),
]
if output_format.output_format_str == "flac":
- output_args += ["-compression_level", "6"]
+ # use level 0 compression for fastest encoding
+ output_args += ["-compression_level", "0"]
output_args += [output_path]
# edge case: source file is not stereo - downmix to stereo
channels: int
bits_per_sample: int
format: str
- bit_rate: int
+ bit_rate: int | None
duration: float | None
tags: dict[str, str]
has_cover_image: bool
audio_stream.get("bits_per_raw_sample", audio_stream.get("bits_per_sample")) or 16
),
format=raw["format"]["format_name"],
- bit_rate=int(raw["format"].get("bit_rate", 320)),
+ bit_rate=int(raw["format"].get("bit_rate", 0)) or None,
duration=float(raw["format"].get("duration", 0)) or None,
tags=tags,
has_cover_image=has_cover_image,
This will NOT be called if the end of the queue is reached (and repeat disabled).
This will NOT be called if the player is using flow mode to playback the queue.
"""
+ # will only be called for players with ENQUEUE NEXT feature set.
+ raise NotImplementedError
async def play_announcement(
self, player_id: str, announcement: PlayerMedia, volume_level: int | None = None
member.active_source = group_player.active_source
else:
# handle TURN_OFF of the group player by turning off all members
+ # optimistically set the group state to prevent race conditions
+ # with the unsync command
+ group_player.powered = False
for member in self.mass.players.iter_group_members(
group_player, only_powered=True, active_only=True
):
# start the stream task
self.ugp_streams[player_id] = UGPStream(audio_source=audio_source, audio_format=UGP_FORMAT)
- base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.aac"
+ base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.mp3"
# set the state optimistically
group_player.current_media = media
CONFIG_ENTRY_DYNAMIC_MEMBERS.key,
CONFIG_ENTRY_DYNAMIC_MEMBERS.default_value,
)
- if not dynamic_members_enabled:
+ if group_player.powered and not dynamic_members_enabled:
raise UnsupportedFeaturedException(
f"Adjusting group members is not allowed for group {group_player.display_name}"
)
model_name = "Universal Group"
manufacturer = self.name
# register dynamic route for the ugp stream
- route_path = f"/ugp/{group_player_id}.aac"
+ route_path = f"/ugp/{group_player_id}.mp3"
self._on_unload.append(
self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream)
)
PlayerFeature.PAUSE,
PlayerFeature.VOLUME_MUTE,
):
- if all(x for x in player_provider.players if feature in x.supported_features):
+ if all(feature in x.supported_features for x in player_provider.players):
player_features.add(feature)
else:
raise PlayerUnavailableError(f"Provider for syncgroup {group_type} is not available!")
"""Update attributes of a player."""
for child_player in self.mass.players.iter_group_members(player, active_only=True):
# just grab the first active player
- if child_player.state not in (PlayerState.PLAYING, PlayerState.PAUSED):
- continue
if child_player.synced_to:
continue
player.state = child_player.state
)
headers = {
**DEFAULT_STREAM_HEADERS,
- "Content-Type": "audio/aac",
+ "Content-Type": "audio/mp3",
"Accept-Ranges": "none",
"Cache-Control": "no-cache",
"Connection": "close",
"""
Implementation of a Stream for the Universal Group Player.
-Basically this is like a fake radio radio stream (AAC) format with multiple subscribers.
-The AAC format is chosen because it is widely supported and has a good balance between
-quality and bandwidth and also allows for mid-stream joining of (extra) players.
+Basically this is like a fake radio radio stream (MP3) format with multiple subscribers.
+The MP3 format is chosen because it is widely supported.
"""
from __future__ import annotations
import asyncio
from collections.abc import AsyncGenerator, Awaitable, Callable
+from music_assistant.common.helpers.util import empty_queue
from music_assistant.common.models.enums import ContentType
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.server.helpers.audio import get_ffmpeg_stream
UGP_FORMAT = AudioFormat(
content_type=ContentType.PCM_F32LE,
- sample_rate=44100,
+ sample_rate=48000,
bit_depth=32,
)
"""
Implementation of a Stream for the Universal Group Player.
- Basically this is like a fake radio radio stream (AAC) format with multiple subscribers.
- The AAC format is chosen because it is widely supported and has a good balance between
- quality and bandwidth and also allows for mid-stream joining of (extra) players.
+ Basically this is like a fake radio radio stream (MP3) format with multiple subscribers.
+ The MP3 format is chosen because it is widely supported.
"""
def __init__(
"""Initialize UGP Stream."""
self.audio_source = audio_source
self.input_format = audio_format
- self.output_format = AudioFormat(content_type=ContentType.AAC)
+ self.output_format = AudioFormat(content_type=ContentType.MP3)
self.subscribers: list[Callable[[bytes], Awaitable]] = []
self._task: asyncio.Task | None = None
self._done: asyncio.Event = asyncio.Event()
# start the runner as soon as the (first) client connects
if not self._task:
self._task = asyncio.create_task(self._runner())
- queue = asyncio.Queue(1)
+ queue = asyncio.Queue(10)
try:
self.subscribers.append(queue.put)
while True:
yield chunk
finally:
self.subscribers.remove(queue.put)
+ empty_queue(queue)
+ del queue
async def _runner(self) -> None:
"""Run the stream for the given audio source."""
audio_input=self.audio_source,
input_format=self.input_format,
output_format=self.output_format,
- # TODO: enable readrate limiting + initial burst once we have a newer ffmpeg version
- # extra_input_args=["-readrate", "1.15"],
+ # enable realtime to prevent too much buffering ahead
+ # TODO: enable initial burst once we have a newer ffmpeg version
+ extra_input_args=["-re"],
):
- await asyncio.gather(*[sub(chunk) for sub in self.subscribers], return_exceptions=True)
+ await asyncio.gather(
+ *[sub(chunk) for sub in self.subscribers],
+ return_exceptions=True,
+ )
# empty chunk when done
await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True)
self._done.set()
from aiosonos.const import SonosEvent
from aiosonos.exceptions import ConnectionFailed, FailedCommand
-from music_assistant.common.models.enums import EventType, PlayerFeature, PlayerState, PlayerType
+from music_assistant.common.models.enums import (
+ EventType,
+ PlayerFeature,
+ PlayerState,
+ PlayerType,
+ RepeatMode,
+)
from music_assistant.common.models.event import MassEvent
from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
+from music_assistant.constants import CONF_CROSSFADE
from .const import (
CONF_AIRPLAY_MODE,
# register callback for playerqueue state changes
self._on_cleanup_callbacks.append(
self.mass.subscribe(
- self._on_mass_queue_event,
+ self._on_mass_queue_items_event,
EventType.QUEUE_ITEMS_UPDATED,
self.player_id,
)
self.update_attributes()
self.mass.players.update(self.player_id)
- async def _on_mass_queue_event(self, event: MassEvent) -> None:
+ async def _on_mass_queue_items_event(self, event: MassEvent) -> None:
"""Handle incoming event from linked MA playerqueue."""
# If the queue items changed and we have an active sonos queue,
# we need to inform the sonos queue to refresh the items.
if self.mass_player.active_source != event.object_id:
return
+ if not self.connected:
+ return
+ queue = self.mass.player_queues.get(event.object_id)
+ if not queue or queue.state not in (PlayerState.PLAYING, PlayerState.PAUSED):
+ return
if session_id := self.client.player.group.active_session_id:
await self.client.api.playback_session.refresh_cloud_queue(session_id)
+
+ async def _on_mass_queue_event(self, event: MassEvent) -> None:
+ """Handle incoming event from linked MA playerqueue."""
+ if self.mass_player.active_source != event.object_id:
+ return
+ if not self.connected:
+ return
+ # sync crossfade and repeat modes
+ queue = self.mass.player_queues.get(event.object_id)
+ if not queue or queue.state not in (PlayerState.PLAYING, PlayerState.PAUSED):
+ return
+ crossfade = await self.mass.config.get_player_config_value(queue.queue_id, CONF_CROSSFADE)
+ repeat_single_enabled = queue.repeat_mode == RepeatMode.ONE
+ repeat_all_enabled = queue.repeat_mode == RepeatMode.ALL
+ play_modes = self.client.player.group.play_modes
+ if (
+ play_modes.crossfade != crossfade
+ or play_modes.repeat != repeat_all_enabled
+ or play_modes.repeat_one != repeat_single_enabled
+ ):
+ await self.client.player.group.set_play_modes(
+ crossfade=crossfade, repeat=repeat_all_enabled, repeat_one=repeat_single_enabled
+ )
ConfigEntry,
create_sample_rates_config_entry,
)
-from music_assistant.common.models.enums import (
- ConfigEntryType,
- ContentType,
- ProviderFeature,
- RepeatMode,
-)
+from music_assistant.common.models.enums import ConfigEntryType, ContentType, ProviderFeature
from music_assistant.common.models.errors import PlayerCommandFailed
from music_assistant.common.models.player import DeviceInfo, PlayerMedia
-from music_assistant.constants import CONF_CROSSFADE, MASS_LOGO_ONLINE, VERBOSE_LOG_LEVEL
+from music_assistant.constants import MASS_LOGO_ONLINE, VERBOSE_LOG_LEVEL
from music_assistant.server.models.player_provider import PlayerProvider
from .const import CONF_AIRPLAY_MODE
return
# play a single uri/url
+ # note that this most probably will only work for (long running) radio streams
if self.mass.config.get_raw_player_config_value(
player_id, CONF_ENTRY_ENFORCE_MP3.key, CONF_ENTRY_ENFORCE_MP3.default_value
):
async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle enqueuing of the next queue item on the player."""
- sonos_player = self.sonos_players[player_id]
- if sonos_player.get_linked_airplay_player(True):
- # linked airplay player is active, ignore this command
- return
- if session_id := sonos_player.client.player.group.active_session_id:
- await sonos_player.client.api.playback_session.refresh_cloud_queue(session_id)
- # sync play modes from player queue --> sonos
- mass_queue = self.mass.player_queues.get(media.queue_id)
- crossfade = await self.mass.config.get_player_config_value(
- mass_queue.queue_id, CONF_CROSSFADE
- )
- repeat_single_enabled = mass_queue.repeat_mode == RepeatMode.ONE
- repeat_all_enabled = mass_queue.repeat_mode == RepeatMode.ALL
- play_modes = sonos_player.client.player.group.play_modes
- if (
- play_modes.crossfade != crossfade
- or play_modes.repeat != repeat_all_enabled
- or play_modes.repeat_one != repeat_single_enabled
- ):
- await sonos_player.client.player.group.set_play_modes(
- crossfade=crossfade, repeat=repeat_all_enabled, repeat_one=repeat_single_enabled
- )
+ # We do nothing here as we handle the queue in the cloud queue endpoint.
+ # For sonos s2, instead of enqueuing tracks one by one, the sonos player itself
+ # can interact with our queue directly through the cloud queue endpoint.
async def play_announcement(
self, player_id: str, announcement: PlayerMedia, volume_level: int | None = None
dict({
'audio_format': dict({
'bit_depth': 16,
- 'bit_rate': 320,
+ 'bit_rate': 0,
'channels': 2,
'content_type': '?',
'output_format_str': '?',
dict({
'audio_format': dict({
'bit_depth': 16,
- 'bit_rate': 320,
+ 'bit_rate': 0,
'channels': 2,
'content_type': '?',
'output_format_str': '?',
dict({
'audio_format': dict({
'bit_depth': 16,
- 'bit_rate': 320,
+ 'bit_rate': 0,
'channels': 2,
'content_type': '?',
'output_format_str': '?',
dict({
'audio_format': dict({
'bit_depth': 16,
- 'bit_rate': 320,
+ 'bit_rate': 0,
'channels': 2,
'content_type': '?',
'output_format_str': '?',
dict({
'audio_format': dict({
'bit_depth': 16,
- 'bit_rate': 320,
+ 'bit_rate': 0,
'channels': 2,
'content_type': 'mp3',
'output_format_str': 'mp3',
dict({
'audio_format': dict({
'bit_depth': 16,
- 'bit_rate': 320,
+ 'bit_rate': 0,
'channels': 2,
'content_type': 'aac',
'output_format_str': 'aac',
dict({
'audio_format': dict({
'bit_depth': 16,
- 'bit_rate': 320,
+ 'bit_rate': 0,
'channels': 2,
'content_type': 'aac',
'output_format_str': 'aac',