import subprocess
import sys
import threading
+import traceback
from contextlib import suppress
from logging.handlers import RotatingFileHandler
-from typing import Final
+from typing import Any, Final
from aiorun import run
from colorlog import ColoredFormatter
from music_assistant.common.helpers.json import json_loads
-from music_assistant.constants import ROOT_LOGGER_NAME
+from music_assistant.constants import ROOT_LOGGER_NAME, VERBOSE_LOG_LEVEL
from music_assistant.server import MusicAssistant
from music_assistant.server.helpers.logging import activate_log_queue_handler
ALPINE_RELEASE_FILE = "/etc/alpine-release"
+class VerboseLogger(logging.Logger):
+ """Custom python logger with included verbose log level."""
+
+ def verbose(self, msg, *args, **kwargs):
+ """Log a verbose message."""
+ self.log(VERBOSE_LOG_LEVEL, msg, *args, **kwargs)
+
+
def get_arguments():
"""Arguments handling."""
parser = argparse.ArgumentParser(description="MusicAssistant")
datefmt=FORMAT_DATETIME,
reset=True,
log_colors={
+ "VERBOSE": "light_black",
"DEBUG": "cyan",
"INFO": "green",
"WARNING": "yellow",
with suppress(OSError):
file_handler.doRollover()
file_handler.setFormatter(logging.Formatter(log_fmt, datefmt=FORMAT_DATETIME))
- # file_handler.setLevel(logging.INFO)
logger = logging.getLogger()
logger.addHandler(file_handler)
+ logging.addLevelName(VERBOSE_LOG_LEVEL, "VERBOSE")
+ logging.setLoggerClass(VerboseLogger)
# apply the configured global log level to the (root) music assistant logger
logging.getLogger(ROOT_LOGGER_NAME).setLevel(level)
subprocess._USE_POSIX_SPAWN = os.path.exists(ALPINE_RELEASE_FILE)
+def _global_loop_exception_handler(_: Any, context: dict[str, Any]) -> None:
+ """Handle all exception inside the core loop."""
+ kwargs = {}
+ if exception := context.get("exception"):
+ kwargs["exc_info"] = (type(exception), exception, exception.__traceback__)
+
+ logger = logging.getLogger(__package__)
+ if source_traceback := context.get("source_traceback"):
+ stack_summary = "".join(traceback.format_list(source_traceback))
+ logger.error(
+ "Error doing job: %s: %s",
+ context["message"],
+ stack_summary,
+ **kwargs, # type: ignore[arg-type]
+ )
+ return
+
+ logger.error(
+ "Error doing task: %s",
+ context["message"],
+ **kwargs, # type: ignore[arg-type]
+ )
+
+
def main() -> None:
"""Start MusicAssistant."""
# parse arguments
activate_log_queue_handler()
if dev_mode or log_level == "DEBUG":
loop.set_debug(True)
+ loop.set_exception_handler(_global_loop_exception_handler)
await mass.start()
run(
start_mass(),
use_uvloop=enable_uvloop,
shutdown_callback=on_shutdown,
- executor_workers=64,
+ executor_workers=32,
)
--- /dev/null
+"""Provides a simple global memory cache."""
+
+
+from __future__ import annotations
+
+import asyncio
+from typing import Any
+
+# global cache - we use this on a few places (as limited as possible)
+# where we have no other options
+_global_cache_lock = asyncio.Lock()
+_global_cache = {}
+
+
+def get_global_cache_value(key: str, default: Any = None) -> Any:
+ """Get a value from the global cache."""
+ return _global_cache.get(key, default)
+
+
+async def set_global_cache_values(values: dict[str, Any]) -> Any:
+ """Set a value in the global cache (without locking)."""
+ async with _global_cache_lock:
+ for key, value in values.items():
+ _set_global_cache_value(key, value)
+
+
+def _set_global_cache_value(key: str, value: Any) -> Any:
+ """Set a value in the global cache (without locking)."""
+ _global_cache[key] = value
"""Enum with possible Player Types.
player: A regular player.
+ stereo_pair: Same as player but a dedicated stereo pair of 2 speakers.
group: A (dedicated) group player or (universal) playergroup.
sync_group: A group/preset of players that can be synced together.
"""
PLAYER = "player"
+ STEREO_PAIR = "stereo_pair"
GROUP = "group"
SYNC_GROUP = "sync_group"
from __future__ import annotations
from dataclasses import dataclass, field, fields
-from typing import Any, Self
+from typing import TYPE_CHECKING, Any, Self, cast
from mashumaro import DataClassDictMixin
+from music_assistant.common.helpers.global_cache import get_global_cache_value
from music_assistant.common.helpers.uri import create_uri
from music_assistant.common.helpers.util import create_sort_name, is_valid_uuid, merge_lists
from music_assistant.common.models.enums import (
return self.output_format_str == other.output_format_str
-@dataclass(frozen=True, kw_only=True)
+@dataclass(kw_only=True)
class ProviderMapping(DataClassDictMixin):
"""Model for a MediaItem's provider mapping details."""
"""Return quality score."""
return self.audio_format.quality
+ def __post_init__(self):
+ """Call after init."""
+ # having items for unavailable providers can have all sorts
+ # of unpredictable results so ensure we have accurate availability status
+ if available_providers := get_global_cache_value("unique_providers"):
+ if TYPE_CHECKING:
+ available_providers = cast(set[str], available_providers)
+ if not available_providers.intersection({self.provider_domain, self.provider_instance}):
+ self.available = False
+
def __hash__(self) -> int:
"""Return custom hash."""
return hash((self.provider_instance, self.item_id))
"player_queues",
)
SYNCGROUP_PREFIX: Final[str] = "syncgroup_"
+VERBOSE_LOG_LEVEL: Final[int] = 5
from aiofiles.os import wrap
from cryptography.fernet import Fernet, InvalidToken
+from music_assistant.common.helpers.global_cache import get_global_cache_value
from music_assistant.common.helpers.json import JSON_DECODE_EXCEPTIONS, json_dumps, json_loads
from music_assistant.common.models import config_entries
from music_assistant.common.models.config_entries import (
self, provider: str | None = None, include_values: bool = False
) -> list[PlayerConfig]:
"""Return all known player configurations, optionally filtered by provider domain."""
- available_providers = {x.instance_id for x in self.mass.providers}
return [
await self.get_player_config(raw_conf["player_id"])
if include_values
else PlayerConfig.parse([], raw_conf)
for raw_conf in list(self.get(CONF_PLAYERS, {}).values())
# filter out unavailable providers
- if raw_conf["provider"] in available_providers
+ if raw_conf["provider"] in get_global_cache_value("available_providers", [])
# optional provider filter
and (provider in (None, raw_conf["provider"]))
]
from aiohttp import web
from music_assistant.common.models.enums import ImageType, MediaType, ProviderFeature, ProviderType
-from music_assistant.common.models.errors import MediaNotFoundError
+from music_assistant.common.models.errors import MediaNotFoundError, ProviderUnavailableError
from music_assistant.common.models.media_items import (
Album,
Artist,
image_format: str = "png",
) -> bytes | str:
"""Get/create thumbnail image for path (image url or local path)."""
+ if provider != "url" and not self.mass.get_provider(provider):
+ raise ProviderUnavailableError
thumbnail = await get_image_thumb(
self.mass, path, size=size, provider=provider, image_format=image_format
)
provider = request.query.get("provider", "url")
size = int(request.query.get("size", "0"))
image_format = request.query.get("fmt", "png")
+ if provider != "url" and not self.mass.get_provider(provider):
+ return web.Response(status=404)
if "%" in path:
# assume (double) encoded url, decode it
path = urllib.parse.unquote(path)
-
with suppress(FileNotFoundError):
image_data = await self.get_thumbnail(
path, size=size, provider=provider, image_format=image_format
# Poll player;
# - every 120 seconds if the player if not powered
# - every 30 seconds if the player is powered
- # - every 10 seconds if the player is playing
+ # - every 5 seconds if the player is playing
if (
(player.powered and count % 30 == 0)
- or (player_playing and count % 10 == 0)
+ or (player_playing and count % 5 == 0)
or count % 120 == 0
) and (player_prov := self.get_player_provider(player_id)):
try:
CONF_PUBLISH_IP,
SILENCE_FILE,
)
+from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
from music_assistant.server.helpers.audio import (
check_audio_support,
crossfade_pcm_parts,
pcm_audio_source: AsyncGenerator[bytes, None],
pcm_format: AudioFormat,
expected_players: set[str],
- auto_start: bool = True,
) -> None:
"""Initialize MultiClientQueueStreamJob instance."""
self.mass = mass
self.pcm_format = pcm_format
self.expected_players = expected_players
self.job_id = shortuuid.uuid()
- self.auto_start = auto_start
self.bytes_streamed: int = 0
self.logger = self.mass.streams.logger.getChild(f"stream_job.{self.job_id}")
self._subscribed_players: dict[str, asyncio.Queue] = {}
- self._finished = asyncio.Event()
- self._audio_task: asyncio.Task | None = None
+ self._finished = False
+ self._running = False
+ self._allow_start = asyncio.Event()
+ self._audio_task = asyncio.create_task(self._stream_job_runner())
@property
def finished(self) -> bool:
"""Return if this StreamJob is finished."""
- return self._finished.is_set() or self._audio_task and self._audio_task.done()
+ return self._finished or self._audio_task and self._audio_task.done()
@property
def pending(self) -> bool:
@property
def running(self) -> bool:
"""Return if this Job is running."""
- return self._audio_task and not self._audio_task.done()
+ return self._running and self._audio_task and not self._audio_task.done()
def start(self) -> None:
"""Start running (send audio chunks to connected players)."""
- if self.running:
- return
if self.finished:
raise RuntimeError("Task is already finished")
- self.logger.debug(
- "Starting multi client stream job %s with %s out of %s connected clients",
- self.job_id,
- len(self._subscribed_players),
- len(self.expected_players),
- )
- self._audio_task = asyncio.create_task(self._stream_job_runner())
+ self._allow_start.set()
def stop(self) -> None:
"""Stop running this job."""
return
if self._audio_task:
self._audio_task.cancel()
- self._finished.set()
+ self._finished = True
def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str:
"""Resolve the childplayer specific stream URL to this streamjob."""
async def _subscribe_pcm(self, player_id: str) -> AsyncGenerator[bytes, None]:
"""Subscribe consumer and iterate incoming (raw pcm) chunks on the queue."""
try:
- self._subscribed_players[player_id] = queue = asyncio.Queue(1)
+ self._subscribed_players[player_id] = queue = asyncio.Queue(2)
if self.running:
# client subscribes while we're already started
else:
self.logger.debug("Subscribed player %s", player_id)
- await asyncio.sleep(0.2) # debounce
- if (
- self.auto_start
- and not self.running
- and len(self._subscribed_players) == len(self.expected_players)
- ):
- # we reached the number of expected subscribers, set event
- # so that chunks can be pushed
- self.start()
# yield from queue until finished
- while not self._finished.is_set():
+ while not self._finished:
yield await queue.get()
finally:
if sub_queue := self._subscribed_players.pop(player_id, None):
async def _stream_job_runner(self) -> None:
"""Feed audio chunks to StreamJob subscribers."""
+ await self._allow_start.wait()
+ retries = 50
+ while retries:
+ retries -= 1
+ await asyncio.sleep(0.2)
+ if len(self._subscribed_players) != len(self.expected_players):
+ continue
+ await asyncio.sleep(0.2)
+ if len(self._subscribed_players) != len(self.expected_players):
+ continue
+ break
+
+ self.logger.debug(
+ "Starting multi client stream job %s with %s out of %s connected clients",
+ self.job_id,
+ len(self._subscribed_players),
+ len(self.expected_players),
+ )
async for chunk in self.pcm_audio_source:
async with asyncio.TaskGroup() as tg:
for listener_queue in list(self._subscribed_players.values()):
tg.create_task(listener_queue.put(chunk))
- self._finished.set()
+ self._finished = True
def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
version,
"with libsoxr support" if libsoxr_support else "",
)
+ # copy log level to audio module
+ AUDIO_LOGGER.setLevel(self.logger.level)
# start the webserver
self.publish_port = config.get_value(CONF_BIND_PORT)
self.publish_ip = config.get_value(CONF_PUBLISH_IP)
pcm_bit_depth: int = 24,
pcm_sample_rate: int = 48000,
expected_players: set[str] | None = None,
- auto_start: bool = True,
) -> MultiClientQueueStreamJob:
"""
Create a MultiClientQueueStreamJob for the given queue..
),
pcm_format=pcm_format,
expected_players=expected_players or set(),
- auto_start=auto_start,
)
return stream_job
queue.display_name,
queue_track.streamdetails.seconds_streamed,
)
+
# end of queue flow: make sure we yield the last_fadeout_part
if last_fadeout_part:
yield last_fadeout_part
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueOption
from music_assistant.common.models.enums import ConfigEntryType
from music_assistant.common.models.errors import InvalidCommand
-from music_assistant.constants import CONF_BIND_IP, CONF_BIND_PORT
+from music_assistant.constants import CONF_BIND_IP, CONF_BIND_PORT, VERBOSE_LOG_LEVEL
from music_assistant.server.helpers.api import APICommandHandler, parse_arguments
from music_assistant.server.helpers.audio import get_preview_stream
from music_assistant.server.helpers.util import get_ips
DEFAULT_SERVER_PORT = 8095
CONF_BASE_URL = "base_url"
CONF_EXPOSE_SERVER = "expose_server"
-DEBUG = False # Set to True to enable very verbose logging of all incoming/outgoing messages
MAX_PENDING_MSG = 512
CANCELLATION_ERRORS: Final = (asyncio.CancelledError, futures.CancelledError)
return web.Response(text=log_data, content_type="text/text")
-class WebSocketLogAdapter(logging.LoggerAdapter):
- """Add connection id to websocket log messages."""
-
- def process(self, msg: str, kwargs: Any) -> tuple[str, Any]:
- """Add connid to websocket log messages."""
- return f'[{self.extra["connid"]}] {msg}', kwargs
-
-
class WebsocketClientHandler:
"""Handle an active websocket client connection."""
self._to_write: asyncio.Queue = asyncio.Queue(maxsize=MAX_PENDING_MSG)
self._handle_task: asyncio.Task | None = None
self._writer_task: asyncio.Task | None = None
- self.log_level = webserver.log_level
- self._logger = WebSocketLogAdapter(webserver.logger, {"connid": id(self)})
+ self._logger = webserver.logger
async def disconnect(self) -> None:
"""Disconnect client."""
self._logger.warning("Timeout preparing request from %s", request.remote)
return wsock
- self._logger.debug("Connection from %s", request.remote)
+ self._logger.log(VERBOSE_LOG_LEVEL, "Connection from %s", request.remote)
self._handle_task = asyncio.current_task()
self._writer_task = asyncio.create_task(self._writer())
disconnect_warn = "Received non-Text message."
break
- if DEBUG:
- self._logger.debug("Received: %s", msg.data)
+ self._logger.log(VERBOSE_LOG_LEVEL, "Received: %s", msg.data)
try:
command_msg = CommandMessage.from_json(msg.data)
finally:
# Handle connection shutting down.
unsub_callback()
- self._logger.debug("Unsubscribed from events")
+ self._logger.log(VERBOSE_LOG_LEVEL, "Unsubscribed from events")
try:
self._to_write.put_nowait(None)
finally:
if disconnect_warn is None:
- self._logger.debug("Disconnected")
+ self._logger.log(VERBOSE_LOG_LEVEL, "Disconnected")
else:
self._logger.warning("Disconnected: %s", disconnect_warn)
message: str = process()
else:
message = process
- if DEBUG:
- self._logger.debug("Writing: %s", message)
+ self._logger.log(VERBOSE_LOG_LEVEL, "Writing: %s", message)
await self.wsock.send_str(message)
def _send_message(self, message: MessageType) -> None:
import aiofiles
from aiohttp import ClientResponseError, ClientTimeout
+from music_assistant.common.helpers.global_cache import (
+ get_global_cache_value,
+ set_global_cache_values,
+)
from music_assistant.common.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
from music_assistant.common.models.errors import (
AudioError,
CONF_VOLUME_NORMALIZATION,
CONF_VOLUME_NORMALIZATION_TARGET,
ROOT_LOGGER_NAME,
+ VERBOSE_LOG_LEVEL,
)
from music_assistant.server.helpers.playlists import fetch_playlist
async with AsyncProcess(args, True) as proc:
crossfade_data, _ = await proc.communicate(fade_in_part)
if crossfade_data:
- LOGGER.debug(
+ LOGGER.log(
+ 5,
"crossfaded 2 pcm chunks. fade_in_part: %s - "
"fade_out_part: %s - fade_length: %s seconds",
len(fade_in_part),
# return stripped audio
bytes_stripped = len(audio_data) - len(stripped_data)
- if LOGGER.isEnabledFor(logging.DEBUG):
+ if LOGGER.isEnabledFor(5):
pcm_sample_size = int(sample_rate * (bit_depth / 8) * 2)
seconds_stripped = round(bytes_stripped / pcm_sample_size, 2)
location = "end" if reverse else "begin"
- LOGGER.debug(
+ LOGGER.log(
+ 5,
"stripped %s seconds of silence from %s of pcm audio. bytes stripped: %s",
seconds_stripped,
location,
item_name = f"{streamdetails.provider}/{streamdetails.item_id}"
LOGGER.debug("Start analyzing EBU R128 loudness for %s", item_name)
# calculate EBU R128 integrated loudness with ffmpeg
- input_file = streamdetails.direct or "-"
- proc_args = [
- "ffmpeg",
- "-protocol_whitelist",
- "file,http,https,tcp,tls,crypto,pipe,fd",
- "-t",
- "600", # limit to 10 minutes to prevent OOM
- "-i",
- input_file,
- "-f",
- streamdetails.audio_format.content_type,
- "-af",
- "loudnorm=print_format=json",
- "-f",
- "null",
- "-",
- ]
+ ffmpeg_args = _get_ffmpeg_args(
+ input_format=streamdetails.audio_format,
+ output_format=streamdetails.audio_format,
+ filter_params=["loudnorm=print_format=json"],
+ extra_args=["-t", "600"], # limit to 10 minutes to prevent OOM
+ input_path=streamdetails.direct or "-",
+ output_path="NULL",
+ )
async with AsyncProcess(
- proc_args,
+ ffmpeg_args,
enable_stdin=streamdetails.direct is None,
enable_stdout=False,
enable_stderr=True,
filter_params.append(filter_rule)
if fade_in:
filter_params.append("afade=type=in:start_time=0:duration=3")
- ffmpeg_args = await _get_ffmpeg_args(
+ ffmpeg_args = _get_ffmpeg_args(
input_format=streamdetails.audio_format,
output_format=pcm_format,
filter_params=filter_params,
async def writer() -> None:
"""Task that grabs the source audio and feeds it to ffmpeg."""
- logger.debug("writer started for %s", streamdetails.uri)
+ logger.log(VERBOSE_LOG_LEVEL, "writer started for %s", streamdetails.uri)
music_prov = mass.get_provider(streamdetails.provider)
seek_pos = seek_position if streamdetails.can_seek else 0
async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos):
await ffmpeg_proc.write(audio_chunk)
# write eof when last packet is received
ffmpeg_proc.write_eof()
- logger.debug("writer finished for %s", streamdetails.uri)
+ logger.log(VERBOSE_LOG_LEVEL, "writer finished for %s", streamdetails.uri)
if streamdetails.direct is None:
writer_task = asyncio.create_task(writer())
_, stderr = await ffmpeg_proc.communicate()
if ffmpeg_proc.returncode != 0:
# ffmpeg has a non zero returncode meaning trouble
- logger.warning("STREAM ERROR on %s", streamdetails.uri)
+ logger.warning("stream error on %s", streamdetails.uri)
logger.warning(stderr.decode())
+ finished = False
elif loudness_details := _parse_loudnorm(stderr):
+ logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
required_seconds = 300 if streamdetails.media_type == MediaType.RADIO else 60
if finished or seconds_streamed >= required_seconds:
LOGGER.debug("Loudness measurement for %s: %s", streamdetails.uri, loudness_details)
streamdetails.item_id, streamdetails.provider, loudness_details
)
else:
- logger.debug(stderr.decode())
+ logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
# report playback
if finished or seconds_streamed > 30:
yield chunk
return
- ffmpeg_args = await _get_ffmpeg_args(
+ ffmpeg_args = _get_ffmpeg_args(
input_format=input_format,
output_format=output_format,
filter_params=filter_params or [],
# ffmpeg has a non zero returncode meaning trouble
logger.warning("FFMPEG ERROR\n%s", stderr.decode())
else:
- logger.debug(stderr.decode())
+ logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
async def check_audio_support() -> tuple[bool, bool, str]:
"""Check if ffmpeg is present (with/without libsoxr support)."""
- cache_key = "audio_support_cache"
- if cache := globals().get(cache_key):
- return cache
-
# check for FFmpeg presence
returncode, output = await check_output("ffmpeg -version")
ffmpeg_present = returncode == 0 and "FFmpeg" in output.decode()
version = output.decode().split("ffmpeg version ")[1].split(" ")[0].split("-")[0]
libsoxr_support = "enable-libsoxr" in output.decode()
result = (ffmpeg_present, libsoxr_support, version)
- globals()[cache_key] = result
+ # store in global cache for easy access by '_get_ffmpeg_args'
+ await set_global_cache_values({"ffmpeg_support": result})
return result
return filter_params
-async def _get_ffmpeg_args(
+def _get_ffmpeg_args(
input_format: AudioFormat,
output_format: AudioFormat,
filter_params: list[str],
output_path: str = "-",
) -> list[str]:
"""Collect all args to send to the ffmpeg process."""
- ffmpeg_present, libsoxr_support, version = await check_audio_support()
+ ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support")
if not ffmpeg_present:
msg = (
input_args += ["-i", input_path]
# collect output args
- output_args = [
- "-acodec",
- output_format.content_type.name.lower(),
- "-f",
- output_format.content_type.value,
- "-ac",
- str(output_format.channels),
- "-ar",
- str(output_format.sample_rate),
- output_path,
- ]
+ if output_path.upper() == "NULL":
+ output_args = ["-f", "null", "-"]
+ elif output_format.content_type == ContentType.UNKNOWN:
+ output_args = [output_path]
+ else:
+ output_args = [
+ "-acodec",
+ output_format.content_type.name.lower(),
+ "-f",
+ output_format.content_type.value,
+ "-ac",
+ str(output_format.channels),
+ "-ar",
+ str(output_format.sample_rate),
+ output_path,
+ ]
# prefer libsoxr high quality resampler (if present) for sample rate conversions
if input_format.sample_rate != output_format.sample_rate and libsoxr_support:
log_level = self.mass.config.get_raw_core_config_value(
self.domain, CONF_LOG_LEVEL, "GLOBAL"
)
- self.log_level = log_level
if log_level == "GLOBAL":
self.logger.setLevel(mass_logger.level)
- else:
- self.logger.setLevel("DEBUG" if log_level == "VERBOSE" else log_level)
+ elif logging.getLogger().level > self.logger.level:
# if the root logger's level is higher, we need to adjust that too
- if logging.getLogger().level > self.logger.level:
- logging.getLogger().setLevel(self.logger.level)
+ logging.getLogger().setLevel(self.logger.level)
self.config = config
mass_logger = logging.getLogger(ROOT_LOGGER_NAME)
self.logger = mass_logger.getChild(f"providers.{self.domain}")
- self.log_level = log_level = config.get_value(CONF_LOG_LEVEL)
+ log_level = config.get_value(CONF_LOG_LEVEL)
if log_level == "GLOBAL":
self.logger.setLevel(mass_logger.level)
- else:
- self.logger.setLevel("DEBUG" if log_level == "VERBOSE" else log_level)
+ elif logging.getLogger().level > self.logger.level:
# if the root logger's level is higher, we need to adjust that too
- if logging.getLogger().level > self.logger.level:
- logging.getLogger().setLevel(self.logger.level)
+ logging.getLogger().setLevel(self.logger.level)
self.logger.debug("Log level configured to %s", log_level)
self.cache = mass.cache
self.available = False
"""Return the features supported by this Provider."""
return ()
+ @property
+ def lookup_key(self) -> str:
+ """Return instance_id if multi_instance capable or domain otherwise."""
+ return self.instance_id if self.manifest.multi_instance else self.domain
+
async def loaded_in_mass(self) -> None:
"""Call after the provider has been loaded."""
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.player_queue import PlayerQueue
-from music_assistant.constants import CONF_SYNC_ADJUST
+from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params
from music_assistant.server.helpers.process import check_output
from music_assistant.server.models.player_provider import PlayerProvider
player_id, CONF_PASSWORD, None
):
extra_args += ["-password", device_password]
- if self.prov.log_level == "DEBUG":
+ if self.prov.logger.isEnabledFor(logging.DEBUG):
extra_args += ["-debug", "5"]
- elif self.prov.log_level == "VERBOSE":
+ elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
extra_args += ["-debug", "10"]
args = [
with open(named_pipe, "w") as f:
f.write(command)
- if self.prov.log_level == "VERBOSE":
- self.airplay_player.logger.debug("sending command %s", command)
+ self.airplay_player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
await self.mass.create_task(send_data)
- async def _log_watcher(self) -> None: # noqa: PLR0915
+ async def _log_watcher(self) -> None:
"""Monitor stderr for the running CLIRaop process."""
airplay_player = self.airplay_player
mass_player = self.mass.players.get(airplay_player.player_id)
continue
if "lost packet out of backlog" in line:
lost_packets += 1
- if lost_packets == 10:
- logger.warning("Packet loss detected, resuming playback...")
+ if lost_packets == 30:
+ logger.warning("Packet loss detected, restart playback...")
queue = self.mass.player_queues.get_active_queue(mass_player.player_id)
await self.mass.player_queues.resume(queue.queue_id)
else:
logger.debug(line)
continue
- # debug log everything else
- if self.prov.log_level == "VERBOSE":
- logger.debug(line)
+ # verbose log everything else
+ logger.log(VERBOSE_LOG_LEVEL, line)
# if we reach this point, the process exited
logger.debug(
# always stop existing stream first
for airplay_player in self._get_sync_clients(player_id):
if airplay_player.active_stream and airplay_player.active_stream.running:
- self.mass.create_task(airplay_player.active_stream.stop(force=True))
+ await airplay_player.active_stream.stop(force=True)
pcm_format = AudioFormat(
content_type=ContentType.PCM_S16LE,
sample_rate=44100,
)
airplay_player.active_stream = AirplayStreamJob(self, airplay_player)
tg.create_task(airplay_player.active_stream.start(start_ntp, audio_iterator))
+ if queue_item.queue_item_id != "flow":
+ stream_job.start()
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player.
address = get_primary_ip_address(info)
if address is None:
return
- # some guards if our info is valid/complete
- if "md" not in info.decoded_properties:
- return
- if "et" not in info.decoded_properties:
- return
self.logger.debug("Discovered Airplay device %s on %s", display_name, address)
self._players[player_id] = AirPlayPlayer(
player_id, discovery_info=info, address=address, logger=self.logger.getChild(player_id)
break
request = raw_request.decode("UTF-8")
- headers_raw, body = request.split("\r\n\r\n", 1)
+ if "\r\n\r\n" in request:
+ headers_raw, body = request.split("\r\n\r\n", 1)
+ else:
+ headers_raw = request
+ body = ""
headers_raw = headers_raw.split("\r\n")
headers = {}
for line in headers_raw[1:]:
+ if ":" not in line:
+ continue
x, y = line.split(":", 1)
headers[x.strip()] = y.strip()
active_remote = headers.get("Active-Remote")
import time
from dataclasses import dataclass
from logging import Logger
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
from uuid import UUID
import pychromecast
)
from music_assistant.common.models.errors import PlayerUnavailableError
from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.constants import CONF_CROSSFADE, CONF_FLOW_MODE, CONF_PLAYERS, MASS_LOGO_ONLINE
+from music_assistant.constants import (
+ CONF_CROSSFADE,
+ CONF_FLOW_MODE,
+ CONF_PLAYERS,
+ VERBOSE_LOG_LEVEL,
+)
from music_assistant.server.models.player_provider import PlayerProvider
from .helpers import CastStatusListener, ChromecastInfo
CONF_ENTRY_CROSSFADE_DURATION,
)
+DEFAULT_APP_ID = "CC1AD845"
+ALT_APP_ID = "46C1A819"
+
# Monkey patch the Media controller here to store the queue items
_patched_process_media_status_org = MediaController._process_media_status
status_listener: CastStatusListener | None = None
mz_controller: MultizoneController | None = None
active_group: str | None = None
- current_queue_item_id: str | None = None
class ChromecastProvider(PlayerProvider):
self.mass.aiozc.zeroconf,
)
# set-up pychromecast logging
- if self.log_level == "VERBOSE":
+ if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
logging.getLogger("pychromecast").setLevel(logging.DEBUG)
else:
logging.getLogger("pychromecast").setLevel(self.logger.level + 10)
fade_in=fade_in,
flow_mode=use_flow_mode,
)
- if use_flow_mode:
- # In flow mode, all queue tracks are sent to the player as continuous stream.
- # This comes at the cost of metadata (cast does not support ICY metadata).
- cc_queue_items = [
- self._create_cc_queue_item(None, url),
- # add a special 'command' item to the queue
- # this allows for on-player next buttons/commands to still work
- self._create_cc_queue_item(
- None, self.mass.streams.get_command_url(queue_item.queue_id, "next")
- ),
- ]
- else:
- # handle normal playback using the chromecast queue to play items one by one
- cc_queue_items = [
- self._create_cc_queue_item(queue_item, url),
- ]
queuedata = {
- "type": "QUEUE_LOAD",
- "repeatMode": "REPEAT_OFF", # handled by our queue controller
- "shuffle": False, # handled by our queue controller
- "queueType": "PLAYLIST",
- "startIndex": 0, # Item index to play after this request or keep same item if undefined
- "items": cc_queue_items,
+ "type": "LOAD",
+ "media": self._create_cc_media_item(queue_item, url),
}
# make sure that the media controller app is launched
- await self._launch_app(castplayer)
+ app_id = ALT_APP_ID if use_flow_mode else DEFAULT_APP_ID
+ await self._launch_app(castplayer, app_id)
# send queue info to the CC
media_controller = castplayer.cc.media_controller
await asyncio.to_thread(media_controller.send_message, queuedata, True)
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
"""Handle enqueuing of the next queue item on the player."""
castplayer = self.castplayers[player_id]
- url = await self.mass.streams.resolve_stream_url(
- player_id,
- queue_item=queue_item,
- output_codec=ContentType.FLAC,
- )
+ if isinstance(queue_item, str):
+ url = self.mass.streams.get_command_url(queue_item, "next")
+ queue_item = None
+ else:
+ url = await self.mass.streams.resolve_stream_url(
+ player_id,
+ queue_item=queue_item,
+ output_codec=ContentType.FLAC,
+ )
next_item_id = None
status = castplayer.cc.media_controller.status
# lookup position of current track in cast queue
continue
next_item_id = item["itemId"]
# check if the next queue item isn't already queued
- if (
- item.get("media", {}).get("customData", {}).get("queue_item_id")
- == queue_item.queue_item_id
- ):
+ if item.get("media", {}).get("customData", {}).get("uri") == url:
return
queuedata = {
"type": "QUEUE_INSERT",
"insertBefore": next_item_id,
- "items": [self._create_cc_queue_item(queue_item, url)],
+ "items": [
+ {
+ "autoplay": True,
+ "startTime": 0,
+ "preloadTime": 0,
+ "media": self._create_cc_media_item(queue_item, url),
+ }
+ ],
}
media_controller = castplayer.cc.media_controller
queuedata["mediaSessionId"] = media_controller.status.media_session_id
return
try:
await asyncio.to_thread(castplayer.cc.media_controller.update_status)
+ await self.update_flow_metadata(castplayer)
except ConnectionResetError as err:
raise PlayerUnavailableError from err
if exclude.lower() in cast_info.friendly_name.lower():
enabled_by_default = False
+ if cast_info.is_audio_group and cast_info.is_multichannel_group:
+ player_type = PlayerType.STEREO_PAIR
+ elif cast_info.is_audio_group:
+ player_type = PlayerType.GROUP
+ else:
+ player_type = PlayerType.PLAYER
# Instantiate chromecast object
castplayer = CastPlayer(
player_id,
player=Player(
player_id=player_id,
provider=self.instance_id,
- type=(PlayerType.GROUP if cast_info.is_audio_group else PlayerType.PLAYER),
+ type=player_type,
name=cast_info.friendly_name,
available=False,
powered=False,
PlayerFeature.ENQUEUE_NEXT,
PlayerFeature.PAUSE,
),
- max_sample_rate=96000,
+ # originally/officially cast supports 96k sample rate
+ # but it seems a (recent?) update broke this
+ # for now use 48k as max sample rate to play safe
+ max_sample_rate=48000,
supports_24bit=True,
enabled_by_default=enabled_by_default,
),
)
# handle stereo pairs
if castplayer.cast_info.is_multichannel_group:
- castplayer.player.type = PlayerType.PLAYER
+ castplayer.player.type = PlayerType.STEREO_PAIR
castplayer.player.group_childs = set()
# handle cast groups
if castplayer.cast_info.is_audio_group and not castplayer.cast_info.is_multichannel_group:
### Helpers / utils
- async def _launch_app(self, castplayer: CastPlayer) -> None:
+ async def _launch_app(self, castplayer: CastPlayer, app_id: str = DEFAULT_APP_ID) -> None:
"""Launch the default Media Receiver App on a Chromecast."""
event = asyncio.Event()
- app_id = pychromecast.config.APP_MEDIA_RECEIVER
if castplayer.cc.app_id == app_id:
return # already active
# Quit the previous app before starting splash screen or media player
if castplayer.cc.app_id is not None:
castplayer.cc.quit_app()
- castplayer.logger.debug("Launching Default Media Receiver (%s) as active app.", app_id)
- castplayer.cc.media_controller.launch(launched_callback)
+ castplayer.logger.debug("Launching App %s.", app_id)
+ castplayer.cc.socket_client.receiver_controller.launch_app(
+ app_id,
+ force_launch=True,
+ callback_function=launched_callback,
+ )
await self.mass.loop.run_in_executor(None, launch)
await event.wait()
castplayer.status_listener = None
self.castplayers.pop(castplayer.player_id, None)
- def _create_cc_queue_item(self, queue_item: QueueItem | None, stream_url: str):
- """Create CC queue item from MA QueueItem."""
- if queue_item is None:
- # flow mode or other special type
- return {
- "autoplay": True,
- "preloadTime": 10,
- "startTime": 0,
- "activeTrackIds": [],
- "media": {
- "contentId": stream_url,
- "customData": {
- "uri": stream_url,
- "queue_item_id": stream_url,
- },
- "contentType": "audio/flac",
- "streamType": STREAM_TYPE_LIVE,
- "metadata": {
- "metadataType": 0,
- "title": "Music Assistant",
- "images": [{"url": MASS_LOGO_ONLINE}],
- },
- "duration": None,
- },
- }
+ def _create_cc_media_item(self, queue_item: QueueItem, stream_url: str) -> dict[str, Any]:
+ """Create CC media item from MA QueueItem."""
duration = int(queue_item.duration) if queue_item.duration else None
image_url = self.mass.metadata.get_image_url(queue_item.image) if queue_item.image else ""
if queue_item.media_type == MediaType.TRACK and queue_item.media_item:
"title": queue_item.media_item.name,
"images": [{"url": image_url}] if image_url else None,
}
+ elif queue_item.streamdetails and queue_item.streamdetails.stream_title:
+ stream_type = STREAM_TYPE_LIVE
+ metadata = {
+ "metadataType": 3,
+ "songName": queue_item.streamdetails.stream_title.split(" - ")[-1],
+ "artist": queue_item.streamdetails.stream_title.split(" - ")[0],
+ "albumName": queue_item.name,
+ "images": [{"url": image_url}] if image_url else None,
+ "title": queue_item.streamdetails.stream_title.split(" - ")[-1],
+ }
else:
stream_type = STREAM_TYPE_LIVE
metadata = {
"images": [{"url": image_url}] if image_url else None,
}
return {
- "autoplay": True,
- "preloadTime": 10,
- "playbackDuration": duration,
- "startTime": 0,
- "activeTrackIds": [],
- "media": {
- "contentId": stream_url,
- "customData": {
- "uri": queue_item.uri,
- "queue_item_id": queue_item.queue_item_id,
- },
- "contentType": "audio/flac",
- "streamType": stream_type,
- "metadata": metadata,
- "duration": duration,
+ "contentId": stream_url,
+ "customData": {
+ "uri": queue_item.uri,
+ "queue_item_id": queue_item.queue_item_id,
+ "deviceName": "Music Assistant",
},
+ "contentType": "audio/flac",
+ "streamType": stream_type,
+ "metadata": metadata,
+ "duration": duration,
}
+
+ async def update_flow_metadata(self, castplayer: CastPlayer) -> None:
+ """Update the metadata of a cast player running the flow stream."""
+ if not castplayer.player.powered:
+ return
+ if not castplayer.cc.media_controller.status.player_is_playing:
+ return
+ if castplayer.player.state != PlayerState.PLAYING:
+ return
+ if castplayer.cc.app_id != ALT_APP_ID:
+ return
+ queue = self.mass.player_queues.get_active_queue(castplayer.player_id)
+ if not (current_item := queue.current_item):
+ return
+ media_controller = castplayer.cc.media_controller
+ # update metadata of current item chromecast
+ if media_controller.status.media_custom_data["queue_item_id"] != current_item.queue_item_id:
+ cc_item = self._create_cc_media_item(current_item, "")
+ queuedata = {
+ "type": "PLAY",
+ "mediaSessionId": media_controller.status.media_session_id,
+ "customData": {
+ "metadata": cc_item["metadata"],
+ },
+ }
+ self.mass.create_task(media_controller.send_message, queuedata, True)
+
+ if len(getattr(media_controller.status, "items", [])) < 2:
+ # In flow mode, all queue tracks are sent to the player as continuous stream.
+ # add a special 'command' item to the queue
+ # this allows for on-player next buttons/commands to still work
+ cmd_next_url = self.mass.streams.get_command_url(queue.queue_id, "next")
+ msg = {
+ "type": "QUEUE_INSERT",
+ "mediaSessionId": media_controller.status.media_session_id,
+ "items": [
+ {
+ "media": {
+ "contentId": cmd_next_url,
+ "customData": {
+ "uri": cmd_next_url,
+ "queue_item_id": cmd_next_url,
+ "deviceName": "Music Assistant",
+ },
+ "contentType": "audio/flac",
+ "streamType": STREAM_TYPE_LIVE,
+ "metadata": {},
+ },
+ "autoplay": True,
+ "startTime": 0,
+ "preloadTime": 0,
+ }
+ ],
+ }
+ self.mass.create_task(media_controller.send_message, msg, inc_session_id=True)
)
from music_assistant.common.models.errors import PlayerUnavailableError
from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.constants import CONF_CROSSFADE, CONF_ENFORCE_MP3, CONF_FLOW_MODE, CONF_PLAYERS
+from music_assistant.constants import (
+ CONF_CROSSFADE,
+ CONF_ENFORCE_MP3,
+ CONF_FLOW_MODE,
+ CONF_PLAYERS,
+ VERBOSE_LOG_LEVEL,
+)
from music_assistant.server.helpers.didl_lite import create_didl_metadata
from music_assistant.server.models.player_provider import PlayerProvider
player_id = kwargs["player_id"] if "player_id" in kwargs else args[0]
dlna_player = self.dlnaplayers[player_id]
dlna_player.last_command = time.time()
- if self.log_level == "VERBOSE":
+ if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
self.logger.debug(
"Handling command %s for player %s",
func.__name__,
return await func(self, *args, **kwargs)
except UpnpError as err:
dlna_player.force_poll = True
- if self.log_level == "VERBOSE":
+ if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
self.logger.exception("Error during call %s: %r", func.__name__, err)
else:
self.logger.error("Error during call %s: %r", func.__name__, str(err))
self.dlnaplayers = {}
self.lock = asyncio.Lock()
# silence the async_upnp_client logger
- if self.log_level == "VERBOSE":
+ if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
logging.getLogger("async_upnp_client").setLevel(logging.DEBUG)
else:
logging.getLogger("async_upnp_client").setLevel(self.logger.level + 10)
CONF_PORT,
CONF_SYNC_ADJUST,
MASS_LOGO_ONLINE,
+ VERBOSE_LOG_LEVEL,
)
from music_assistant.server.models.player_provider import PlayerProvider
REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
# sync constants
-MIN_DEVIATION_ADJUST = 8 # 8 milliseconds
-MIN_REQ_PLAYPOINTS = 3 # we need at least 3 measurements
+MIN_DEVIATION_ADJUST = 6 # 6 milliseconds
+MIN_REQ_PLAYPOINTS = 8 # we need at least 8 measurements
DEVIATION_JUMP_IGNORE = 2000 # ignore a sudden unrealistic jump
MAX_SKIP_AHEAD_MS = 500 # 0.5 seconds
control_port = self.config.get_value(CONF_PORT)
telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT)
json_port = self.config.get_value(CONF_CLI_JSON_PORT)
- logging.getLogger("aioslimproto").setLevel(self.logger.level)
+ # silence aioslimproto logger a bit
+ if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
+ logging.getLogger("aioslimproto").setLevel(logging.DEBUG)
+ else:
+ logging.getLogger("aioslimproto").setLevel(self.logger.level + 10)
self.slimproto = SlimServer(
cli_port=telnet_port or None,
cli_port_json=json_port or None,
self._handle_play_url(
slimplayer,
url=stream_job.resolve_stream_url(
- player_id,
slimplayer.player_id,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
),
auto_play=False,
)
)
+ if queue_item.queue_item_id != "flow":
+ stream_job.start()
else:
# regular, single player playback
slimplayer = self.slimproto.get_player(player_id)
)
from music_assistant.common.models.errors import PlayerCommandFailed, PlayerUnavailableError
from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.constants import CONF_CROSSFADE
+from music_assistant.constants import CONF_CROSSFADE, VERBOSE_LOG_LEVEL
from music_assistant.server.helpers.didl_lite import create_didl_metadata
from music_assistant.server.models.player_provider import PlayerProvider
zonegroupstate.EVENT_CACHE_TIMEOUT = SUBSCRIPTION_TIMEOUT
prov = SonosPlayerProvider(mass, manifest, config)
# set-up soco logging
- if prov.log_level == "VERBOSE":
+ if prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
logging.getLogger("soco").setLevel(logging.DEBUG)
logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO)
else:
from music_assistant.common.models.enums import PlayerFeature, PlayerState
from music_assistant.common.models.errors import PlayerCommandFailed
from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.constants import VERBOSE_LOG_LEVEL
from .helpers import SonosUpdateError, soco_error
]
if not subscriptions:
return
- self.logger.debug("Creating subscriptions for %s", self.zone_name)
+ self.logger.log(VERBOSE_LOG_LEVEL, "Creating subscriptions for %s", self.zone_name)
results = await asyncio.gather(*subscriptions, return_exceptions=True)
for result in results:
self.log_subscription_result(result, "Creating subscription", logging.WARNING)
"""Cancel all subscriptions."""
if not self._subscriptions:
return
- self.logger.debug("Unsubscribing from events for %s", self.zone_name)
+ self.logger.log(VERBOSE_LOG_LEVEL, "Unsubscribing from events for %s", self.zone_name)
results = await asyncio.gather(
*(subscription.unsubscribe() for subscription in self._subscriptions),
return_exceptions=True,
"""Validate availability of the speaker based on recent activity."""
if not self.should_poll:
return
- self.logger.debug("Polling player for availability...")
+ self.logger.log(VERBOSE_LOG_LEVEL, "Polling player for availability...")
try:
await asyncio.to_thread(self.ping)
self._speaker_activity("ping")
"""Handle updated IP of a Sonos player (NOT async friendly)."""
if self.available:
return
- self.logger.info(
+ self.logger.debug(
"Player IP-address changed from %s to %s", self.soco.ip_address, ip_address
)
try:
if av_transport_uri == current_track_uri and av_transport_uri.startswith("x-rincon:"):
new_coordinator_uid = av_transport_uri.split(":")[-1]
if new_coordinator_speaker := self.sonos_prov.sonosplayers.get(new_coordinator_uid):
- self.logger.debug(
+ self.logger.log(
+ 5,
"Media update coordinator (%s) received for %s",
new_coordinator_speaker.zone_name,
self.zone_name,
return
self._resub_cooldown_expires_at = None
- self.logger.debug("Activity on %s from %s", self.zone_name, source)
+ self.logger.log(VERBOSE_LOG_LEVEL, "Activity on %s from %s", self.zone_name, source)
self._last_activity = time.monotonic()
was_available = self.available
self.available = True
# create a multi-client stream job - all (direct) child's of this UGP group
# will subscribe to this multi client queue stream
- await self.mass.streams.create_multi_client_stream_job(
+ stream_job = await self.mass.streams.create_multi_client_stream_job(
player_id,
start_queue_item=queue_item,
seek_position=seek_position,
if member is None:
continue
tg.create_task(player_prov.play_media(member.player_id, ugp_queue_item, 0, False))
+ stream_job.start()
async def poll_player(self, player_id: str) -> None:
"""Poll player for state updates."""
from zeroconf import IPVersion, NonUniqueNameException, ServiceStateChange, Zeroconf
from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf
+from music_assistant.common.helpers.global_cache import set_global_cache_values
from music_assistant.common.helpers.util import get_ip_pton
from music_assistant.common.models.api import ServerInfoMessage
from music_assistant.common.models.enums import EventType, ProviderType
self.create_task(provider.loaded_in_mass())
self.config.set(f"{CONF_PROVIDERS}/{conf.instance_id}/last_error", None)
self.signal_event(EventType.PROVIDERS_UPDATED, data=self.get_providers())
+ await self._update_available_providers_cache()
# if this is a music provider, start sync
if provider.type == ProviderType.MUSIC:
self.music.start_sync(providers=[provider.instance_id])
LOGGER.warning("Error while unload provider %s: %s", provider.name, str(err))
finally:
self._providers.pop(instance_id, None)
+ await self._update_available_providers_cache()
self.signal_event(EventType.PROVIDERS_UPDATED, data=self.get_providers())
def _register_api_commands(self) -> None:
if exc_val:
raise exc_val
return exc_type
+
+ async def _update_available_providers_cache(self) -> None:
+ """Update the global cache variable of loaded/available providers."""
+ await set_global_cache_values(
+ {
+ "provider_domains": {x.domain for x in self.providers},
+ "provider_instance_ids": {x.instance_id for x in self.providers},
+ "available_providers": {
+ *{x.domain for x in self.providers},
+ *{x.instance_id for x in self.providers},
+ },
+ "unique_providers": {x.lookup_key for x in self.providers},
+ }
+ )