From: Marcel van der Veldt Date: Sat, 16 Mar 2024 01:11:46 +0000 (+0100) Subject: Fix playback on Google cast and grouped players (#1146) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=9e2614b33ef52c60e8725aa2df40236ad7f60835;p=music-assistant-server.git Fix playback on Google cast and grouped players (#1146) --- diff --git a/music_assistant/__main__.py b/music_assistant/__main__.py index 3a4b0f49..7d1cf1f1 100644 --- a/music_assistant/__main__.py +++ b/music_assistant/__main__.py @@ -9,15 +9,16 @@ import os import subprocess import sys import threading +import traceback from contextlib import suppress from logging.handlers import RotatingFileHandler -from typing import Final +from typing import Any, Final from aiorun import run from colorlog import ColoredFormatter from music_assistant.common.helpers.json import json_loads -from music_assistant.constants import ROOT_LOGGER_NAME +from music_assistant.constants import ROOT_LOGGER_NAME, VERBOSE_LOG_LEVEL from music_assistant.server import MusicAssistant from music_assistant.server.helpers.logging import activate_log_queue_handler @@ -28,6 +29,14 @@ MAX_LOG_FILESIZE = 1000000 * 10 # 10 MB ALPINE_RELEASE_FILE = "/etc/alpine-release" +class VerboseLogger(logging.Logger): + """Custom python logger with included verbose log level.""" + + def verbose(self, msg, *args, **kwargs): + """Log a verbose message.""" + self.log(VERBOSE_LOG_LEVEL, msg, *args, **kwargs) + + def get_arguments(): """Arguments handling.""" parser = argparse.ArgumentParser(description="MusicAssistant") @@ -68,6 +77,7 @@ def setup_logger(data_path: str, level: str = "DEBUG"): datefmt=FORMAT_DATETIME, reset=True, log_colors={ + "VERBOSE": "light_black", "DEBUG": "cyan", "INFO": "green", "WARNING": "yellow", @@ -89,10 +99,11 @@ def setup_logger(data_path: str, level: str = "DEBUG"): with suppress(OSError): file_handler.doRollover() file_handler.setFormatter(logging.Formatter(log_fmt, datefmt=FORMAT_DATETIME)) - # file_handler.setLevel(logging.INFO) logger = logging.getLogger() logger.addHandler(file_handler) + logging.addLevelName(VERBOSE_LOG_LEVEL, "VERBOSE") + logging.setLoggerClass(VerboseLogger) # apply the configured global log level to the (root) music assistant logger logging.getLogger(ROOT_LOGGER_NAME).setLevel(level) @@ -136,6 +147,30 @@ def _enable_posix_spawn() -> None: subprocess._USE_POSIX_SPAWN = os.path.exists(ALPINE_RELEASE_FILE) +def _global_loop_exception_handler(_: Any, context: dict[str, Any]) -> None: + """Handle all exception inside the core loop.""" + kwargs = {} + if exception := context.get("exception"): + kwargs["exc_info"] = (type(exception), exception, exception.__traceback__) + + logger = logging.getLogger(__package__) + if source_traceback := context.get("source_traceback"): + stack_summary = "".join(traceback.format_list(source_traceback)) + logger.error( + "Error doing job: %s: %s", + context["message"], + stack_summary, + **kwargs, # type: ignore[arg-type] + ) + return + + logger.error( + "Error doing task: %s", + context["message"], + **kwargs, # type: ignore[arg-type] + ) + + def main() -> None: """Start MusicAssistant.""" # parse arguments @@ -172,13 +207,14 @@ def main() -> None: activate_log_queue_handler() if dev_mode or log_level == "DEBUG": loop.set_debug(True) + loop.set_exception_handler(_global_loop_exception_handler) await mass.start() run( start_mass(), use_uvloop=enable_uvloop, shutdown_callback=on_shutdown, - executor_workers=64, + executor_workers=32, ) diff --git a/music_assistant/common/helpers/global_cache.py b/music_assistant/common/helpers/global_cache.py new file mode 100644 index 00000000..719b2a09 --- /dev/null +++ b/music_assistant/common/helpers/global_cache.py @@ -0,0 +1,29 @@ +"""Provides a simple global memory cache.""" + + +from __future__ import annotations + +import asyncio +from typing import Any + +# global cache - we use this on a few places (as limited as possible) +# where we have no other options +_global_cache_lock = asyncio.Lock() +_global_cache = {} + + +def get_global_cache_value(key: str, default: Any = None) -> Any: + """Get a value from the global cache.""" + return _global_cache.get(key, default) + + +async def set_global_cache_values(values: dict[str, Any]) -> Any: + """Set a value in the global cache (without locking).""" + async with _global_cache_lock: + for key, value in values.items(): + _set_global_cache_value(key, value) + + +def _set_global_cache_value(key: str, value: Any) -> Any: + """Set a value in the global cache (without locking).""" + _global_cache[key] = value diff --git a/music_assistant/common/models/enums.py b/music_assistant/common/models/enums.py index be9fd483..f9193f58 100644 --- a/music_assistant/common/models/enums.py +++ b/music_assistant/common/models/enums.py @@ -202,11 +202,13 @@ class PlayerType(StrEnum): """Enum with possible Player Types. player: A regular player. + stereo_pair: Same as player but a dedicated stereo pair of 2 speakers. group: A (dedicated) group player or (universal) playergroup. sync_group: A group/preset of players that can be synced together. """ PLAYER = "player" + STEREO_PAIR = "stereo_pair" GROUP = "group" SYNC_GROUP = "sync_group" diff --git a/music_assistant/common/models/media_items.py b/music_assistant/common/models/media_items.py index 2727d7b2..577b879b 100644 --- a/music_assistant/common/models/media_items.py +++ b/music_assistant/common/models/media_items.py @@ -3,10 +3,11 @@ from __future__ import annotations from dataclasses import dataclass, field, fields -from typing import Any, Self +from typing import TYPE_CHECKING, Any, Self, cast from mashumaro import DataClassDictMixin +from music_assistant.common.helpers.global_cache import get_global_cache_value from music_assistant.common.helpers.uri import create_uri from music_assistant.common.helpers.util import create_sort_name, is_valid_uuid, merge_lists from music_assistant.common.models.enums import ( @@ -67,7 +68,7 @@ class AudioFormat(DataClassDictMixin): return self.output_format_str == other.output_format_str -@dataclass(frozen=True, kw_only=True) +@dataclass(kw_only=True) class ProviderMapping(DataClassDictMixin): """Model for a MediaItem's provider mapping details.""" @@ -87,6 +88,16 @@ class ProviderMapping(DataClassDictMixin): """Return quality score.""" return self.audio_format.quality + def __post_init__(self): + """Call after init.""" + # having items for unavailable providers can have all sorts + # of unpredictable results so ensure we have accurate availability status + if available_providers := get_global_cache_value("unique_providers"): + if TYPE_CHECKING: + available_providers = cast(set[str], available_providers) + if not available_providers.intersection({self.provider_domain, self.provider_instance}): + self.available = False + def __hash__(self) -> int: """Return custom hash.""" return hash((self.provider_instance, self.item_id)) diff --git a/music_assistant/constants.py b/music_assistant/constants.py index 7bb9b9a4..6b463568 100644 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -90,3 +90,4 @@ CONFIGURABLE_CORE_CONTROLLERS = ( "player_queues", ) SYNCGROUP_PREFIX: Final[str] = "syncgroup_" +VERBOSE_LOG_LEVEL: Final[int] = 5 diff --git a/music_assistant/server/controllers/config.py b/music_assistant/server/controllers/config.py index c6c99bd7..b06a9493 100644 --- a/music_assistant/server/controllers/config.py +++ b/music_assistant/server/controllers/config.py @@ -14,6 +14,7 @@ import shortuuid from aiofiles.os import wrap from cryptography.fernet import Fernet, InvalidToken +from music_assistant.common.helpers.global_cache import get_global_cache_value from music_assistant.common.helpers.json import JSON_DECODE_EXCEPTIONS, json_dumps, json_loads from music_assistant.common.models import config_entries from music_assistant.common.models.config_entries import ( @@ -324,14 +325,13 @@ class ConfigController: self, provider: str | None = None, include_values: bool = False ) -> list[PlayerConfig]: """Return all known player configurations, optionally filtered by provider domain.""" - available_providers = {x.instance_id for x in self.mass.providers} return [ await self.get_player_config(raw_conf["player_id"]) if include_values else PlayerConfig.parse([], raw_conf) for raw_conf in list(self.get(CONF_PLAYERS, {}).values()) # filter out unavailable providers - if raw_conf["provider"] in available_providers + if raw_conf["provider"] in get_global_cache_value("available_providers", []) # optional provider filter and (provider in (None, raw_conf["provider"])) ] diff --git a/music_assistant/server/controllers/metadata.py b/music_assistant/server/controllers/metadata.py index 42e52046..64ae6702 100644 --- a/music_assistant/server/controllers/metadata.py +++ b/music_assistant/server/controllers/metadata.py @@ -16,7 +16,7 @@ import aiofiles from aiohttp import web from music_assistant.common.models.enums import ImageType, MediaType, ProviderFeature, ProviderType -from music_assistant.common.models.errors import MediaNotFoundError +from music_assistant.common.models.errors import MediaNotFoundError, ProviderUnavailableError from music_assistant.common.models.media_items import ( Album, Artist, @@ -357,6 +357,8 @@ class MetaDataController(CoreController): image_format: str = "png", ) -> bytes | str: """Get/create thumbnail image for path (image url or local path).""" + if provider != "url" and not self.mass.get_provider(provider): + raise ProviderUnavailableError thumbnail = await get_image_thumb( self.mass, path, size=size, provider=provider, image_format=image_format ) @@ -371,10 +373,11 @@ class MetaDataController(CoreController): provider = request.query.get("provider", "url") size = int(request.query.get("size", "0")) image_format = request.query.get("fmt", "png") + if provider != "url" and not self.mass.get_provider(provider): + return web.Response(status=404) if "%" in path: # assume (double) encoded url, decode it path = urllib.parse.unquote(path) - with suppress(FileNotFoundError): image_data = await self.get_thumbnail( path, size=size, provider=provider, image_format=image_format diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index 812c27d9..74a46398 100644 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -860,10 +860,10 @@ class PlayerController(CoreController): # Poll player; # - every 120 seconds if the player if not powered # - every 30 seconds if the player is powered - # - every 10 seconds if the player is playing + # - every 5 seconds if the player is playing if ( (player.powered and count % 30 == 0) - or (player_playing and count % 10 == 0) + or (player_playing and count % 5 == 0) or count % 120 == 0 ) and (player_prov := self.get_player_provider(player_id)): try: diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index d5dfaf97..e9db6aae 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -36,6 +36,7 @@ from music_assistant.constants import ( CONF_PUBLISH_IP, SILENCE_FILE, ) +from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER from music_assistant.server.helpers.audio import ( check_audio_support, crossfade_pcm_parts, @@ -91,7 +92,6 @@ class MultiClientQueueStreamJob: pcm_audio_source: AsyncGenerator[bytes, None], pcm_format: AudioFormat, expected_players: set[str], - auto_start: bool = True, ) -> None: """Initialize MultiClientQueueStreamJob instance.""" self.mass = mass @@ -99,17 +99,18 @@ class MultiClientQueueStreamJob: self.pcm_format = pcm_format self.expected_players = expected_players self.job_id = shortuuid.uuid() - self.auto_start = auto_start self.bytes_streamed: int = 0 self.logger = self.mass.streams.logger.getChild(f"stream_job.{self.job_id}") self._subscribed_players: dict[str, asyncio.Queue] = {} - self._finished = asyncio.Event() - self._audio_task: asyncio.Task | None = None + self._finished = False + self._running = False + self._allow_start = asyncio.Event() + self._audio_task = asyncio.create_task(self._stream_job_runner()) @property def finished(self) -> bool: """Return if this StreamJob is finished.""" - return self._finished.is_set() or self._audio_task and self._audio_task.done() + return self._finished or self._audio_task and self._audio_task.done() @property def pending(self) -> bool: @@ -119,21 +120,13 @@ class MultiClientQueueStreamJob: @property def running(self) -> bool: """Return if this Job is running.""" - return self._audio_task and not self._audio_task.done() + return self._running and self._audio_task and not self._audio_task.done() def start(self) -> None: """Start running (send audio chunks to connected players).""" - if self.running: - return if self.finished: raise RuntimeError("Task is already finished") - self.logger.debug( - "Starting multi client stream job %s with %s out of %s connected clients", - self.job_id, - len(self._subscribed_players), - len(self.expected_players), - ) - self._audio_task = asyncio.create_task(self._stream_job_runner()) + self._allow_start.set() def stop(self) -> None: """Stop running this job.""" @@ -141,7 +134,7 @@ class MultiClientQueueStreamJob: return if self._audio_task: self._audio_task.cancel() - self._finished.set() + self._finished = True def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str: """Resolve the childplayer specific stream URL to this streamjob.""" @@ -180,7 +173,7 @@ class MultiClientQueueStreamJob: async def _subscribe_pcm(self, player_id: str) -> AsyncGenerator[bytes, None]: """Subscribe consumer and iterate incoming (raw pcm) chunks on the queue.""" try: - self._subscribed_players[player_id] = queue = asyncio.Queue(1) + self._subscribed_players[player_id] = queue = asyncio.Queue(2) if self.running: # client subscribes while we're already started @@ -191,17 +184,8 @@ class MultiClientQueueStreamJob: else: self.logger.debug("Subscribed player %s", player_id) - await asyncio.sleep(0.2) # debounce - if ( - self.auto_start - and not self.running - and len(self._subscribed_players) == len(self.expected_players) - ): - # we reached the number of expected subscribers, set event - # so that chunks can be pushed - self.start() # yield from queue until finished - while not self._finished.is_set(): + while not self._finished: yield await queue.get() finally: if sub_queue := self._subscribed_players.pop(player_id, None): @@ -215,11 +199,29 @@ class MultiClientQueueStreamJob: async def _stream_job_runner(self) -> None: """Feed audio chunks to StreamJob subscribers.""" + await self._allow_start.wait() + retries = 50 + while retries: + retries -= 1 + await asyncio.sleep(0.2) + if len(self._subscribed_players) != len(self.expected_players): + continue + await asyncio.sleep(0.2) + if len(self._subscribed_players) != len(self.expected_players): + continue + break + + self.logger.debug( + "Starting multi client stream job %s with %s out of %s connected clients", + self.job_id, + len(self._subscribed_players), + len(self.expected_players), + ) async for chunk in self.pcm_audio_source: async with asyncio.TaskGroup() as tg: for listener_queue in list(self._subscribed_players.values()): tg.create_task(listener_queue.put(chunk)) - self._finished.set() + self._finished = True def parse_pcm_info(content_type: str) -> tuple[int, int, int]: @@ -319,6 +321,8 @@ class StreamsController(CoreController): version, "with libsoxr support" if libsoxr_support else "", ) + # copy log level to audio module + AUDIO_LOGGER.setLevel(self.logger.level) # start the webserver self.publish_port = config.get_value(CONF_BIND_PORT) self.publish_ip = config.get_value(CONF_PUBLISH_IP) @@ -395,7 +399,6 @@ class StreamsController(CoreController): pcm_bit_depth: int = 24, pcm_sample_rate: int = 48000, expected_players: set[str] | None = None, - auto_start: bool = True, ) -> MultiClientQueueStreamJob: """ Create a MultiClientQueueStreamJob for the given queue.. @@ -426,7 +429,6 @@ class StreamsController(CoreController): ), pcm_format=pcm_format, expected_players=expected_players or set(), - auto_start=auto_start, ) return stream_job @@ -845,6 +847,7 @@ class StreamsController(CoreController): queue.display_name, queue_track.streamdetails.seconds_streamed, ) + # end of queue flow: make sure we yield the last_fadeout_part if last_fadeout_part: yield last_fadeout_part diff --git a/music_assistant/server/controllers/webserver.py b/music_assistant/server/controllers/webserver.py index dd726900..095ccca6 100644 --- a/music_assistant/server/controllers/webserver.py +++ b/music_assistant/server/controllers/webserver.py @@ -31,7 +31,7 @@ from music_assistant.common.models.api import ( from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueOption from music_assistant.common.models.enums import ConfigEntryType from music_assistant.common.models.errors import InvalidCommand -from music_assistant.constants import CONF_BIND_IP, CONF_BIND_PORT +from music_assistant.constants import CONF_BIND_IP, CONF_BIND_PORT, VERBOSE_LOG_LEVEL from music_assistant.server.helpers.api import APICommandHandler, parse_arguments from music_assistant.server.helpers.audio import get_preview_stream from music_assistant.server.helpers.util import get_ips @@ -47,7 +47,6 @@ if TYPE_CHECKING: DEFAULT_SERVER_PORT = 8095 CONF_BASE_URL = "base_url" CONF_EXPOSE_SERVER = "expose_server" -DEBUG = False # Set to True to enable very verbose logging of all incoming/outgoing messages MAX_PENDING_MSG = 512 CANCELLATION_ERRORS: Final = (asyncio.CancelledError, futures.CancelledError) @@ -229,14 +228,6 @@ class WebserverController(CoreController): return web.Response(text=log_data, content_type="text/text") -class WebSocketLogAdapter(logging.LoggerAdapter): - """Add connection id to websocket log messages.""" - - def process(self, msg: str, kwargs: Any) -> tuple[str, Any]: - """Add connid to websocket log messages.""" - return f'[{self.extra["connid"]}] {msg}', kwargs - - class WebsocketClientHandler: """Handle an active websocket client connection.""" @@ -248,8 +239,7 @@ class WebsocketClientHandler: self._to_write: asyncio.Queue = asyncio.Queue(maxsize=MAX_PENDING_MSG) self._handle_task: asyncio.Task | None = None self._writer_task: asyncio.Task | None = None - self.log_level = webserver.log_level - self._logger = WebSocketLogAdapter(webserver.logger, {"connid": id(self)}) + self._logger = webserver.logger async def disconnect(self) -> None: """Disconnect client.""" @@ -269,7 +259,7 @@ class WebsocketClientHandler: self._logger.warning("Timeout preparing request from %s", request.remote) return wsock - self._logger.debug("Connection from %s", request.remote) + self._logger.log(VERBOSE_LOG_LEVEL, "Connection from %s", request.remote) self._handle_task = asyncio.current_task() self._writer_task = asyncio.create_task(self._writer()) @@ -295,8 +285,7 @@ class WebsocketClientHandler: disconnect_warn = "Received non-Text message." break - if DEBUG: - self._logger.debug("Received: %s", msg.data) + self._logger.log(VERBOSE_LOG_LEVEL, "Received: %s", msg.data) try: command_msg = CommandMessage.from_json(msg.data) @@ -315,7 +304,7 @@ class WebsocketClientHandler: finally: # Handle connection shutting down. unsub_callback() - self._logger.debug("Unsubscribed from events") + self._logger.log(VERBOSE_LOG_LEVEL, "Unsubscribed from events") try: self._to_write.put_nowait(None) @@ -327,7 +316,7 @@ class WebsocketClientHandler: finally: if disconnect_warn is None: - self._logger.debug("Disconnected") + self._logger.log(VERBOSE_LOG_LEVEL, "Disconnected") else: self._logger.warning("Disconnected: %s", disconnect_warn) @@ -395,8 +384,7 @@ class WebsocketClientHandler: message: str = process() else: message = process - if DEBUG: - self._logger.debug("Writing: %s", message) + self._logger.log(VERBOSE_LOG_LEVEL, "Writing: %s", message) await self.wsock.send_str(message) def _send_message(self, message: MessageType) -> None: diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 529bd4f6..9550cace 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -15,6 +15,10 @@ from typing import TYPE_CHECKING import aiofiles from aiohttp import ClientResponseError, ClientTimeout +from music_assistant.common.helpers.global_cache import ( + get_global_cache_value, + set_global_cache_values, +) from music_assistant.common.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads from music_assistant.common.models.errors import ( AudioError, @@ -32,6 +36,7 @@ from music_assistant.constants import ( CONF_VOLUME_NORMALIZATION, CONF_VOLUME_NORMALIZATION_TARGET, ROOT_LOGGER_NAME, + VERBOSE_LOG_LEVEL, ) from music_assistant.server.helpers.playlists import fetch_playlist @@ -102,7 +107,8 @@ async def crossfade_pcm_parts( async with AsyncProcess(args, True) as proc: crossfade_data, _ = await proc.communicate(fade_in_part) if crossfade_data: - LOGGER.debug( + LOGGER.log( + 5, "crossfaded 2 pcm chunks. fade_in_part: %s - " "fade_out_part: %s - fade_length: %s seconds", len(fade_in_part), @@ -160,11 +166,12 @@ async def strip_silence( # return stripped audio bytes_stripped = len(audio_data) - len(stripped_data) - if LOGGER.isEnabledFor(logging.DEBUG): + if LOGGER.isEnabledFor(5): pcm_sample_size = int(sample_rate * (bit_depth / 8) * 2) seconds_stripped = round(bytes_stripped / pcm_sample_size, 2) location = "end" if reverse else "begin" - LOGGER.debug( + LOGGER.log( + 5, "stripped %s seconds of silence from %s of pcm audio. bytes stripped: %s", seconds_stripped, location, @@ -185,25 +192,16 @@ async def analyze_loudness(mass: MusicAssistant, streamdetails: StreamDetails) - item_name = f"{streamdetails.provider}/{streamdetails.item_id}" LOGGER.debug("Start analyzing EBU R128 loudness for %s", item_name) # calculate EBU R128 integrated loudness with ffmpeg - input_file = streamdetails.direct or "-" - proc_args = [ - "ffmpeg", - "-protocol_whitelist", - "file,http,https,tcp,tls,crypto,pipe,fd", - "-t", - "600", # limit to 10 minutes to prevent OOM - "-i", - input_file, - "-f", - streamdetails.audio_format.content_type, - "-af", - "loudnorm=print_format=json", - "-f", - "null", - "-", - ] + ffmpeg_args = _get_ffmpeg_args( + input_format=streamdetails.audio_format, + output_format=streamdetails.audio_format, + filter_params=["loudnorm=print_format=json"], + extra_args=["-t", "600"], # limit to 10 minutes to prevent OOM + input_path=streamdetails.direct or "-", + output_path="NULL", + ) async with AsyncProcess( - proc_args, + ffmpeg_args, enable_stdin=streamdetails.direct is None, enable_stdout=False, enable_stderr=True, @@ -413,7 +411,7 @@ async def get_media_stream( # noqa: PLR0915 filter_params.append(filter_rule) if fade_in: filter_params.append("afade=type=in:start_time=0:duration=3") - ffmpeg_args = await _get_ffmpeg_args( + ffmpeg_args = _get_ffmpeg_args( input_format=streamdetails.audio_format, output_format=pcm_format, filter_params=filter_params, @@ -432,14 +430,14 @@ async def get_media_stream( # noqa: PLR0915 async def writer() -> None: """Task that grabs the source audio and feeds it to ffmpeg.""" - logger.debug("writer started for %s", streamdetails.uri) + logger.log(VERBOSE_LOG_LEVEL, "writer started for %s", streamdetails.uri) music_prov = mass.get_provider(streamdetails.provider) seek_pos = seek_position if streamdetails.can_seek else 0 async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos): await ffmpeg_proc.write(audio_chunk) # write eof when last packet is received ffmpeg_proc.write_eof() - logger.debug("writer finished for %s", streamdetails.uri) + logger.log(VERBOSE_LOG_LEVEL, "writer finished for %s", streamdetails.uri) if streamdetails.direct is None: writer_task = asyncio.create_task(writer()) @@ -521,9 +519,11 @@ async def get_media_stream( # noqa: PLR0915 _, stderr = await ffmpeg_proc.communicate() if ffmpeg_proc.returncode != 0: # ffmpeg has a non zero returncode meaning trouble - logger.warning("STREAM ERROR on %s", streamdetails.uri) + logger.warning("stream error on %s", streamdetails.uri) logger.warning(stderr.decode()) + finished = False elif loudness_details := _parse_loudnorm(stderr): + logger.log(VERBOSE_LOG_LEVEL, stderr.decode()) required_seconds = 300 if streamdetails.media_type == MediaType.RADIO else 60 if finished or seconds_streamed >= required_seconds: LOGGER.debug("Loudness measurement for %s: %s", streamdetails.uri, loudness_details) @@ -532,7 +532,7 @@ async def get_media_stream( # noqa: PLR0915 streamdetails.item_id, streamdetails.provider, loudness_details ) else: - logger.debug(stderr.decode()) + logger.log(VERBOSE_LOG_LEVEL, stderr.decode()) # report playback if finished or seconds_streamed > 30: @@ -723,7 +723,7 @@ async def get_ffmpeg_stream( yield chunk return - ffmpeg_args = await _get_ffmpeg_args( + ffmpeg_args = _get_ffmpeg_args( input_format=input_format, output_format=output_format, filter_params=filter_params or [], @@ -768,15 +768,11 @@ async def get_ffmpeg_stream( # ffmpeg has a non zero returncode meaning trouble logger.warning("FFMPEG ERROR\n%s", stderr.decode()) else: - logger.debug(stderr.decode()) + logger.log(VERBOSE_LOG_LEVEL, stderr.decode()) async def check_audio_support() -> tuple[bool, bool, str]: """Check if ffmpeg is present (with/without libsoxr support).""" - cache_key = "audio_support_cache" - if cache := globals().get(cache_key): - return cache - # check for FFmpeg presence returncode, output = await check_output("ffmpeg -version") ffmpeg_present = returncode == 0 and "FFmpeg" in output.decode() @@ -785,7 +781,8 @@ async def check_audio_support() -> tuple[bool, bool, str]: version = output.decode().split("ffmpeg version ")[1].split(" ")[0].split("-")[0] libsoxr_support = "enable-libsoxr" in output.decode() result = (ffmpeg_present, libsoxr_support, version) - globals()[cache_key] = result + # store in global cache for easy access by '_get_ffmpeg_args' + await set_global_cache_values({"ffmpeg_support": result}) return result @@ -932,7 +929,7 @@ def get_player_filter_params( return filter_params -async def _get_ffmpeg_args( +def _get_ffmpeg_args( input_format: AudioFormat, output_format: AudioFormat, filter_params: list[str], @@ -941,7 +938,7 @@ async def _get_ffmpeg_args( output_path: str = "-", ) -> list[str]: """Collect all args to send to the ffmpeg process.""" - ffmpeg_present, libsoxr_support, version = await check_audio_support() + ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support") if not ffmpeg_present: msg = ( @@ -996,17 +993,22 @@ async def _get_ffmpeg_args( input_args += ["-i", input_path] # collect output args - output_args = [ - "-acodec", - output_format.content_type.name.lower(), - "-f", - output_format.content_type.value, - "-ac", - str(output_format.channels), - "-ar", - str(output_format.sample_rate), - output_path, - ] + if output_path.upper() == "NULL": + output_args = ["-f", "null", "-"] + elif output_format.content_type == ContentType.UNKNOWN: + output_args = [output_path] + else: + output_args = [ + "-acodec", + output_format.content_type.name.lower(), + "-f", + output_format.content_type.value, + "-ac", + str(output_format.channels), + "-ar", + str(output_format.sample_rate), + output_path, + ] # prefer libsoxr high quality resampler (if present) for sample rate conversions if input_format.sample_rate != output_format.sample_rate and libsoxr_support: diff --git a/music_assistant/server/models/core_controller.py b/music_assistant/server/models/core_controller.py index 5e1adbae..00bf37c3 100644 --- a/music_assistant/server/models/core_controller.py +++ b/music_assistant/server/models/core_controller.py @@ -68,11 +68,8 @@ class CoreController: log_level = self.mass.config.get_raw_core_config_value( self.domain, CONF_LOG_LEVEL, "GLOBAL" ) - self.log_level = log_level if log_level == "GLOBAL": self.logger.setLevel(mass_logger.level) - else: - self.logger.setLevel("DEBUG" if log_level == "VERBOSE" else log_level) + elif logging.getLogger().level > self.logger.level: # if the root logger's level is higher, we need to adjust that too - if logging.getLogger().level > self.logger.level: - logging.getLogger().setLevel(self.logger.level) + logging.getLogger().setLevel(self.logger.level) diff --git a/music_assistant/server/models/provider.py b/music_assistant/server/models/provider.py index f2a3510f..33562065 100644 --- a/music_assistant/server/models/provider.py +++ b/music_assistant/server/models/provider.py @@ -29,14 +29,12 @@ class Provider: self.config = config mass_logger = logging.getLogger(ROOT_LOGGER_NAME) self.logger = mass_logger.getChild(f"providers.{self.domain}") - self.log_level = log_level = config.get_value(CONF_LOG_LEVEL) + log_level = config.get_value(CONF_LOG_LEVEL) if log_level == "GLOBAL": self.logger.setLevel(mass_logger.level) - else: - self.logger.setLevel("DEBUG" if log_level == "VERBOSE" else log_level) + elif logging.getLogger().level > self.logger.level: # if the root logger's level is higher, we need to adjust that too - if logging.getLogger().level > self.logger.level: - logging.getLogger().setLevel(self.logger.level) + logging.getLogger().setLevel(self.logger.level) self.logger.debug("Log level configured to %s", log_level) self.cache = mass.cache self.available = False @@ -46,6 +44,11 @@ class Provider: """Return the features supported by this Provider.""" return () + @property + def lookup_key(self) -> str: + """Return instance_id if multi_instance capable or domain otherwise.""" + return self.instance_id if self.manifest.multi_instance else self.domain + async def loaded_in_mass(self) -> None: """Call after the provider has been loaded.""" diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 263b839d..67b04858 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -40,7 +40,7 @@ from music_assistant.common.models.enums import ( from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.common.models.player_queue import PlayerQueue -from music_assistant.constants import CONF_SYNC_ADJUST +from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params from music_assistant.server.helpers.process import check_output from music_assistant.server.models.player_provider import PlayerProvider @@ -220,9 +220,9 @@ class AirplayStreamJob: player_id, CONF_PASSWORD, None ): extra_args += ["-password", device_password] - if self.prov.log_level == "DEBUG": + if self.prov.logger.isEnabledFor(logging.DEBUG): extra_args += ["-debug", "5"] - elif self.prov.log_level == "VERBOSE": + elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL): extra_args += ["-debug", "10"] args = [ @@ -291,11 +291,10 @@ class AirplayStreamJob: with open(named_pipe, "w") as f: f.write(command) - if self.prov.log_level == "VERBOSE": - self.airplay_player.logger.debug("sending command %s", command) + self.airplay_player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command) await self.mass.create_task(send_data) - async def _log_watcher(self) -> None: # noqa: PLR0915 + async def _log_watcher(self) -> None: """Monitor stderr for the running CLIRaop process.""" airplay_player = self.airplay_player mass_player = self.mass.players.get(airplay_player.player_id) @@ -337,16 +336,15 @@ class AirplayStreamJob: continue if "lost packet out of backlog" in line: lost_packets += 1 - if lost_packets == 10: - logger.warning("Packet loss detected, resuming playback...") + if lost_packets == 30: + logger.warning("Packet loss detected, restart playback...") queue = self.mass.player_queues.get_active_queue(mass_player.player_id) await self.mass.player_queues.resume(queue.queue_id) else: logger.debug(line) continue - # debug log everything else - if self.prov.log_level == "VERBOSE": - logger.debug(line) + # verbose log everything else + logger.log(VERBOSE_LOG_LEVEL, line) # if we reach this point, the process exited logger.debug( @@ -633,7 +631,7 @@ class AirplayProvider(PlayerProvider): # always stop existing stream first for airplay_player in self._get_sync_clients(player_id): if airplay_player.active_stream and airplay_player.active_stream.running: - self.mass.create_task(airplay_player.active_stream.stop(force=True)) + await airplay_player.active_stream.stop(force=True) pcm_format = AudioFormat( content_type=ContentType.PCM_S16LE, sample_rate=44100, @@ -693,6 +691,8 @@ class AirplayProvider(PlayerProvider): ) airplay_player.active_stream = AirplayStreamJob(self, airplay_player) tg.create_task(airplay_player.active_stream.start(start_ntp, audio_iterator)) + if queue_item.queue_item_id != "flow": + stream_job.start() async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player. @@ -823,11 +823,6 @@ class AirplayProvider(PlayerProvider): address = get_primary_ip_address(info) if address is None: return - # some guards if our info is valid/complete - if "md" not in info.decoded_properties: - return - if "et" not in info.decoded_properties: - return self.logger.debug("Discovered Airplay device %s on %s", display_name, address) self._players[player_id] = AirPlayPlayer( player_id, discovery_info=info, address=address, logger=self.logger.getChild(player_id) @@ -888,10 +883,16 @@ class AirplayProvider(PlayerProvider): break request = raw_request.decode("UTF-8") - headers_raw, body = request.split("\r\n\r\n", 1) + if "\r\n\r\n" in request: + headers_raw, body = request.split("\r\n\r\n", 1) + else: + headers_raw = request + body = "" headers_raw = headers_raw.split("\r\n") headers = {} for line in headers_raw[1:]: + if ":" not in line: + continue x, y = line.split(":", 1) headers[x.strip()] = y.strip() active_remote = headers.get("Active-Remote") diff --git a/music_assistant/server/providers/chromecast/__init__.py b/music_assistant/server/providers/chromecast/__init__.py index 8019a1e7..719e830a 100644 --- a/music_assistant/server/providers/chromecast/__init__.py +++ b/music_assistant/server/providers/chromecast/__init__.py @@ -9,7 +9,7 @@ import threading import time from dataclasses import dataclass from logging import Logger -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from uuid import UUID import pychromecast @@ -34,7 +34,12 @@ from music_assistant.common.models.enums import ( ) from music_assistant.common.models.errors import PlayerUnavailableError from music_assistant.common.models.player import DeviceInfo, Player -from music_assistant.constants import CONF_CROSSFADE, CONF_FLOW_MODE, CONF_PLAYERS, MASS_LOGO_ONLINE +from music_assistant.constants import ( + CONF_CROSSFADE, + CONF_FLOW_MODE, + CONF_PLAYERS, + VERBOSE_LOG_LEVEL, +) from music_assistant.server.models.player_provider import PlayerProvider from .helpers import CastStatusListener, ChromecastInfo @@ -68,6 +73,9 @@ PLAYER_CONFIG_ENTRIES = ( CONF_ENTRY_CROSSFADE_DURATION, ) +DEFAULT_APP_ID = "CC1AD845" +ALT_APP_ID = "46C1A819" + # Monkey patch the Media controller here to store the queue items _patched_process_media_status_org = MediaController._process_media_status @@ -121,7 +129,6 @@ class CastPlayer: status_listener: CastStatusListener | None = None mz_controller: MultizoneController | None = None active_group: str | None = None - current_queue_item_id: str | None = None class ChromecastProvider(PlayerProvider): @@ -149,7 +156,7 @@ class ChromecastProvider(PlayerProvider): self.mass.aiozc.zeroconf, ) # set-up pychromecast logging - if self.log_level == "VERBOSE": + if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL): logging.getLogger("pychromecast").setLevel(logging.DEBUG) else: logging.getLogger("pychromecast").setLevel(self.logger.level + 10) @@ -256,32 +263,13 @@ class ChromecastProvider(PlayerProvider): fade_in=fade_in, flow_mode=use_flow_mode, ) - if use_flow_mode: - # In flow mode, all queue tracks are sent to the player as continuous stream. - # This comes at the cost of metadata (cast does not support ICY metadata). - cc_queue_items = [ - self._create_cc_queue_item(None, url), - # add a special 'command' item to the queue - # this allows for on-player next buttons/commands to still work - self._create_cc_queue_item( - None, self.mass.streams.get_command_url(queue_item.queue_id, "next") - ), - ] - else: - # handle normal playback using the chromecast queue to play items one by one - cc_queue_items = [ - self._create_cc_queue_item(queue_item, url), - ] queuedata = { - "type": "QUEUE_LOAD", - "repeatMode": "REPEAT_OFF", # handled by our queue controller - "shuffle": False, # handled by our queue controller - "queueType": "PLAYLIST", - "startIndex": 0, # Item index to play after this request or keep same item if undefined - "items": cc_queue_items, + "type": "LOAD", + "media": self._create_cc_media_item(queue_item, url), } # make sure that the media controller app is launched - await self._launch_app(castplayer) + app_id = ALT_APP_ID if use_flow_mode else DEFAULT_APP_ID + await self._launch_app(castplayer, app_id) # send queue info to the CC media_controller = castplayer.cc.media_controller await asyncio.to_thread(media_controller.send_message, queuedata, True) @@ -289,11 +277,15 @@ class ChromecastProvider(PlayerProvider): async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: """Handle enqueuing of the next queue item on the player.""" castplayer = self.castplayers[player_id] - url = await self.mass.streams.resolve_stream_url( - player_id, - queue_item=queue_item, - output_codec=ContentType.FLAC, - ) + if isinstance(queue_item, str): + url = self.mass.streams.get_command_url(queue_item, "next") + queue_item = None + else: + url = await self.mass.streams.resolve_stream_url( + player_id, + queue_item=queue_item, + output_codec=ContentType.FLAC, + ) next_item_id = None status = castplayer.cc.media_controller.status # lookup position of current track in cast queue @@ -308,15 +300,19 @@ class ChromecastProvider(PlayerProvider): continue next_item_id = item["itemId"] # check if the next queue item isn't already queued - if ( - item.get("media", {}).get("customData", {}).get("queue_item_id") - == queue_item.queue_item_id - ): + if item.get("media", {}).get("customData", {}).get("uri") == url: return queuedata = { "type": "QUEUE_INSERT", "insertBefore": next_item_id, - "items": [self._create_cc_queue_item(queue_item, url)], + "items": [ + { + "autoplay": True, + "startTime": 0, + "preloadTime": 0, + "media": self._create_cc_media_item(queue_item, url), + } + ], } media_controller = castplayer.cc.media_controller queuedata["mediaSessionId"] = media_controller.status.media_session_id @@ -350,6 +346,7 @@ class ChromecastProvider(PlayerProvider): return try: await asyncio.to_thread(castplayer.cc.media_controller.update_status) + await self.update_flow_metadata(castplayer) except ConnectionResetError as err: raise PlayerUnavailableError from err @@ -401,6 +398,12 @@ class ChromecastProvider(PlayerProvider): if exclude.lower() in cast_info.friendly_name.lower(): enabled_by_default = False + if cast_info.is_audio_group and cast_info.is_multichannel_group: + player_type = PlayerType.STEREO_PAIR + elif cast_info.is_audio_group: + player_type = PlayerType.GROUP + else: + player_type = PlayerType.PLAYER # Instantiate chromecast object castplayer = CastPlayer( player_id, @@ -412,7 +415,7 @@ class ChromecastProvider(PlayerProvider): player=Player( player_id=player_id, provider=self.instance_id, - type=(PlayerType.GROUP if cast_info.is_audio_group else PlayerType.PLAYER), + type=player_type, name=cast_info.friendly_name, available=False, powered=False, @@ -428,7 +431,10 @@ class ChromecastProvider(PlayerProvider): PlayerFeature.ENQUEUE_NEXT, PlayerFeature.PAUSE, ), - max_sample_rate=96000, + # originally/officially cast supports 96k sample rate + # but it seems a (recent?) update broke this + # for now use 48k as max sample rate to play safe + max_sample_rate=48000, supports_24bit=True, enabled_by_default=enabled_by_default, ), @@ -473,7 +479,7 @@ class ChromecastProvider(PlayerProvider): ) # handle stereo pairs if castplayer.cast_info.is_multichannel_group: - castplayer.player.type = PlayerType.PLAYER + castplayer.player.type = PlayerType.STEREO_PAIR castplayer.player.group_childs = set() # handle cast groups if castplayer.cast_info.is_audio_group and not castplayer.cast_info.is_multichannel_group: @@ -559,10 +565,9 @@ class ChromecastProvider(PlayerProvider): ### Helpers / utils - async def _launch_app(self, castplayer: CastPlayer) -> None: + async def _launch_app(self, castplayer: CastPlayer, app_id: str = DEFAULT_APP_ID) -> None: """Launch the default Media Receiver App on a Chromecast.""" event = asyncio.Event() - app_id = pychromecast.config.APP_MEDIA_RECEIVER if castplayer.cc.app_id == app_id: return # already active @@ -574,8 +579,12 @@ class ChromecastProvider(PlayerProvider): # Quit the previous app before starting splash screen or media player if castplayer.cc.app_id is not None: castplayer.cc.quit_app() - castplayer.logger.debug("Launching Default Media Receiver (%s) as active app.", app_id) - castplayer.cc.media_controller.launch(launched_callback) + castplayer.logger.debug("Launching App %s.", app_id) + castplayer.cc.socket_client.receiver_controller.launch_app( + app_id, + force_launch=True, + callback_function=launched_callback, + ) await self.mass.loop.run_in_executor(None, launch) await event.wait() @@ -589,31 +598,8 @@ class ChromecastProvider(PlayerProvider): castplayer.status_listener = None self.castplayers.pop(castplayer.player_id, None) - def _create_cc_queue_item(self, queue_item: QueueItem | None, stream_url: str): - """Create CC queue item from MA QueueItem.""" - if queue_item is None: - # flow mode or other special type - return { - "autoplay": True, - "preloadTime": 10, - "startTime": 0, - "activeTrackIds": [], - "media": { - "contentId": stream_url, - "customData": { - "uri": stream_url, - "queue_item_id": stream_url, - }, - "contentType": "audio/flac", - "streamType": STREAM_TYPE_LIVE, - "metadata": { - "metadataType": 0, - "title": "Music Assistant", - "images": [{"url": MASS_LOGO_ONLINE}], - }, - "duration": None, - }, - } + def _create_cc_media_item(self, queue_item: QueueItem, stream_url: str) -> dict[str, Any]: + """Create CC media item from MA QueueItem.""" duration = int(queue_item.duration) if queue_item.duration else None image_url = self.mass.metadata.get_image_url(queue_item.image) if queue_item.image else "" if queue_item.media_type == MediaType.TRACK and queue_item.media_item: @@ -630,6 +616,16 @@ class ChromecastProvider(PlayerProvider): "title": queue_item.media_item.name, "images": [{"url": image_url}] if image_url else None, } + elif queue_item.streamdetails and queue_item.streamdetails.stream_title: + stream_type = STREAM_TYPE_LIVE + metadata = { + "metadataType": 3, + "songName": queue_item.streamdetails.stream_title.split(" - ")[-1], + "artist": queue_item.streamdetails.stream_title.split(" - ")[0], + "albumName": queue_item.name, + "images": [{"url": image_url}] if image_url else None, + "title": queue_item.streamdetails.stream_title.split(" - ")[-1], + } else: stream_type = STREAM_TYPE_LIVE metadata = { @@ -638,20 +634,69 @@ class ChromecastProvider(PlayerProvider): "images": [{"url": image_url}] if image_url else None, } return { - "autoplay": True, - "preloadTime": 10, - "playbackDuration": duration, - "startTime": 0, - "activeTrackIds": [], - "media": { - "contentId": stream_url, - "customData": { - "uri": queue_item.uri, - "queue_item_id": queue_item.queue_item_id, - }, - "contentType": "audio/flac", - "streamType": stream_type, - "metadata": metadata, - "duration": duration, + "contentId": stream_url, + "customData": { + "uri": queue_item.uri, + "queue_item_id": queue_item.queue_item_id, + "deviceName": "Music Assistant", }, + "contentType": "audio/flac", + "streamType": stream_type, + "metadata": metadata, + "duration": duration, } + + async def update_flow_metadata(self, castplayer: CastPlayer) -> None: + """Update the metadata of a cast player running the flow stream.""" + if not castplayer.player.powered: + return + if not castplayer.cc.media_controller.status.player_is_playing: + return + if castplayer.player.state != PlayerState.PLAYING: + return + if castplayer.cc.app_id != ALT_APP_ID: + return + queue = self.mass.player_queues.get_active_queue(castplayer.player_id) + if not (current_item := queue.current_item): + return + media_controller = castplayer.cc.media_controller + # update metadata of current item chromecast + if media_controller.status.media_custom_data["queue_item_id"] != current_item.queue_item_id: + cc_item = self._create_cc_media_item(current_item, "") + queuedata = { + "type": "PLAY", + "mediaSessionId": media_controller.status.media_session_id, + "customData": { + "metadata": cc_item["metadata"], + }, + } + self.mass.create_task(media_controller.send_message, queuedata, True) + + if len(getattr(media_controller.status, "items", [])) < 2: + # In flow mode, all queue tracks are sent to the player as continuous stream. + # add a special 'command' item to the queue + # this allows for on-player next buttons/commands to still work + cmd_next_url = self.mass.streams.get_command_url(queue.queue_id, "next") + msg = { + "type": "QUEUE_INSERT", + "mediaSessionId": media_controller.status.media_session_id, + "items": [ + { + "media": { + "contentId": cmd_next_url, + "customData": { + "uri": cmd_next_url, + "queue_item_id": cmd_next_url, + "deviceName": "Music Assistant", + }, + "contentType": "audio/flac", + "streamType": STREAM_TYPE_LIVE, + "metadata": {}, + }, + "autoplay": True, + "startTime": 0, + "preloadTime": 0, + } + ], + } + self.mass.create_task(media_controller.send_message, msg, inc_session_id=True) diff --git a/music_assistant/server/providers/dlna/__init__.py b/music_assistant/server/providers/dlna/__init__.py index 25606235..bfa3e378 100644 --- a/music_assistant/server/providers/dlna/__init__.py +++ b/music_assistant/server/providers/dlna/__init__.py @@ -39,7 +39,13 @@ from music_assistant.common.models.enums import ( ) from music_assistant.common.models.errors import PlayerUnavailableError from music_assistant.common.models.player import DeviceInfo, Player -from music_assistant.constants import CONF_CROSSFADE, CONF_ENFORCE_MP3, CONF_FLOW_MODE, CONF_PLAYERS +from music_assistant.constants import ( + CONF_CROSSFADE, + CONF_ENFORCE_MP3, + CONF_FLOW_MODE, + CONF_PLAYERS, + VERBOSE_LOG_LEVEL, +) from music_assistant.server.helpers.didl_lite import create_didl_metadata from music_assistant.server.models.player_provider import PlayerProvider @@ -144,7 +150,7 @@ def catch_request_errors( player_id = kwargs["player_id"] if "player_id" in kwargs else args[0] dlna_player = self.dlnaplayers[player_id] dlna_player.last_command = time.time() - if self.log_level == "VERBOSE": + if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL): self.logger.debug( "Handling command %s for player %s", func.__name__, @@ -157,7 +163,7 @@ def catch_request_errors( return await func(self, *args, **kwargs) except UpnpError as err: dlna_player.force_poll = True - if self.log_level == "VERBOSE": + if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL): self.logger.exception("Error during call %s: %r", func.__name__, err) else: self.logger.error("Error during call %s: %r", func.__name__, str(err)) @@ -276,7 +282,7 @@ class DLNAPlayerProvider(PlayerProvider): self.dlnaplayers = {} self.lock = asyncio.Lock() # silence the async_upnp_client logger - if self.log_level == "VERBOSE": + if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL): logging.getLogger("async_upnp_client").setLevel(logging.DEBUG) else: logging.getLogger("async_upnp_client").setLevel(self.logger.level + 10) diff --git a/music_assistant/server/providers/slimproto/__init__.py b/music_assistant/server/providers/slimproto/__init__.py index 93ae197c..d2cce790 100644 --- a/music_assistant/server/providers/slimproto/__init__.py +++ b/music_assistant/server/providers/slimproto/__init__.py @@ -52,6 +52,7 @@ from music_assistant.constants import ( CONF_PORT, CONF_SYNC_ADJUST, MASS_LOGO_ONLINE, + VERBOSE_LOG_LEVEL, ) from music_assistant.server.models.player_provider import PlayerProvider @@ -78,8 +79,8 @@ STATE_MAP = { REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2} # sync constants -MIN_DEVIATION_ADJUST = 8 # 8 milliseconds -MIN_REQ_PLAYPOINTS = 3 # we need at least 3 measurements +MIN_DEVIATION_ADJUST = 6 # 6 milliseconds +MIN_REQ_PLAYPOINTS = 8 # we need at least 8 measurements DEVIATION_JUMP_IGNORE = 2000 # ignore a sudden unrealistic jump MAX_SKIP_AHEAD_MS = 500 # 0.5 seconds @@ -228,7 +229,11 @@ class SlimprotoProvider(PlayerProvider): control_port = self.config.get_value(CONF_PORT) telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT) json_port = self.config.get_value(CONF_CLI_JSON_PORT) - logging.getLogger("aioslimproto").setLevel(self.logger.level) + # silence aioslimproto logger a bit + if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL): + logging.getLogger("aioslimproto").setLevel(logging.DEBUG) + else: + logging.getLogger("aioslimproto").setLevel(self.logger.level + 10) self.slimproto = SlimServer( cli_port=telnet_port or None, cli_port_json=json_port or None, @@ -358,7 +363,6 @@ class SlimprotoProvider(PlayerProvider): self._handle_play_url( slimplayer, url=stream_job.resolve_stream_url( - player_id, slimplayer.player_id, output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, ), @@ -367,6 +371,8 @@ class SlimprotoProvider(PlayerProvider): auto_play=False, ) ) + if queue_item.queue_item_id != "flow": + stream_job.start() else: # regular, single player playback slimplayer = self.slimproto.get_player(player_id) diff --git a/music_assistant/server/providers/sonos/__init__.py b/music_assistant/server/providers/sonos/__init__.py index af0c4135..3690e26d 100644 --- a/music_assistant/server/providers/sonos/__init__.py +++ b/music_assistant/server/providers/sonos/__init__.py @@ -32,7 +32,7 @@ from music_assistant.common.models.enums import ( ) from music_assistant.common.models.errors import PlayerCommandFailed, PlayerUnavailableError from music_assistant.common.models.player import DeviceInfo, Player -from music_assistant.constants import CONF_CROSSFADE +from music_assistant.constants import CONF_CROSSFADE, VERBOSE_LOG_LEVEL from music_assistant.server.helpers.didl_lite import create_didl_metadata from music_assistant.server.models.player_provider import PlayerProvider @@ -90,7 +90,7 @@ async def setup( zonegroupstate.EVENT_CACHE_TIMEOUT = SUBSCRIPTION_TIMEOUT prov = SonosPlayerProvider(mass, manifest, config) # set-up soco logging - if prov.log_level == "VERBOSE": + if prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL): logging.getLogger("soco").setLevel(logging.DEBUG) logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO) else: diff --git a/music_assistant/server/providers/sonos/player.py b/music_assistant/server/providers/sonos/player.py index 857575d3..4bfeb783 100644 --- a/music_assistant/server/providers/sonos/player.py +++ b/music_assistant/server/providers/sonos/player.py @@ -32,6 +32,7 @@ from music_assistant.common.helpers.datetime import utc from music_assistant.common.models.enums import PlayerFeature, PlayerState from music_assistant.common.models.errors import PlayerCommandFailed from music_assistant.common.models.player import DeviceInfo, Player +from music_assistant.constants import VERBOSE_LOG_LEVEL from .helpers import SonosUpdateError, soco_error @@ -255,7 +256,7 @@ class SonosPlayer: ] if not subscriptions: return - self.logger.debug("Creating subscriptions for %s", self.zone_name) + self.logger.log(VERBOSE_LOG_LEVEL, "Creating subscriptions for %s", self.zone_name) results = await asyncio.gather(*subscriptions, return_exceptions=True) for result in results: self.log_subscription_result(result, "Creating subscription", logging.WARNING) @@ -271,7 +272,7 @@ class SonosPlayer: """Cancel all subscriptions.""" if not self._subscriptions: return - self.logger.debug("Unsubscribing from events for %s", self.zone_name) + self.logger.log(VERBOSE_LOG_LEVEL, "Unsubscribing from events for %s", self.zone_name) results = await asyncio.gather( *(subscription.unsubscribe() for subscription in self._subscriptions), return_exceptions=True, @@ -284,7 +285,7 @@ class SonosPlayer: """Validate availability of the speaker based on recent activity.""" if not self.should_poll: return - self.logger.debug("Polling player for availability...") + self.logger.log(VERBOSE_LOG_LEVEL, "Polling player for availability...") try: await asyncio.to_thread(self.ping) self._speaker_activity("ping") @@ -301,7 +302,7 @@ class SonosPlayer: """Handle updated IP of a Sonos player (NOT async friendly).""" if self.available: return - self.logger.info( + self.logger.debug( "Player IP-address changed from %s to %s", self.soco.ip_address, ip_address ) try: @@ -427,7 +428,8 @@ class SonosPlayer: if av_transport_uri == current_track_uri and av_transport_uri.startswith("x-rincon:"): new_coordinator_uid = av_transport_uri.split(":")[-1] if new_coordinator_speaker := self.sonos_prov.sonosplayers.get(new_coordinator_uid): - self.logger.debug( + self.logger.log( + 5, "Media update coordinator (%s) received for %s", new_coordinator_speaker.zone_name, self.zone_name, @@ -828,7 +830,7 @@ class SonosPlayer: return self._resub_cooldown_expires_at = None - self.logger.debug("Activity on %s from %s", self.zone_name, source) + self.logger.log(VERBOSE_LOG_LEVEL, "Activity on %s from %s", self.zone_name, source) self._last_activity = time.monotonic() was_available = self.available self.available = True diff --git a/music_assistant/server/providers/ugp/__init__.py b/music_assistant/server/providers/ugp/__init__.py index 6aa5b164..a9a0f786 100644 --- a/music_assistant/server/providers/ugp/__init__.py +++ b/music_assistant/server/providers/ugp/__init__.py @@ -179,7 +179,7 @@ class UniversalGroupProvider(PlayerProvider): # create a multi-client stream job - all (direct) child's of this UGP group # will subscribe to this multi client queue stream - await self.mass.streams.create_multi_client_stream_job( + stream_job = await self.mass.streams.create_multi_client_stream_job( player_id, start_queue_item=queue_item, seek_position=seek_position, @@ -199,6 +199,7 @@ class UniversalGroupProvider(PlayerProvider): if member is None: continue tg.create_task(player_prov.play_media(member.player_id, ugp_queue_item, 0, False)) + stream_job.start() async def poll_player(self, player_id: str) -> None: """Poll player for state updates.""" diff --git a/music_assistant/server/server.py b/music_assistant/server/server.py index 9a2f2dc7..461f01b0 100644 --- a/music_assistant/server/server.py +++ b/music_assistant/server/server.py @@ -15,6 +15,7 @@ from aiohttp import ClientSession, TCPConnector from zeroconf import IPVersion, NonUniqueNameException, ServiceStateChange, Zeroconf from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf +from music_assistant.common.helpers.global_cache import set_global_cache_values from music_assistant.common.helpers.util import get_ip_pton from music_assistant.common.models.api import ServerInfoMessage from music_assistant.common.models.enums import EventType, ProviderType @@ -442,6 +443,7 @@ class MusicAssistant: self.create_task(provider.loaded_in_mass()) self.config.set(f"{CONF_PROVIDERS}/{conf.instance_id}/last_error", None) self.signal_event(EventType.PROVIDERS_UPDATED, data=self.get_providers()) + await self._update_available_providers_cache() # if this is a music provider, start sync if provider.type == ProviderType.MUSIC: self.music.start_sync(providers=[provider.instance_id]) @@ -468,6 +470,7 @@ class MusicAssistant: LOGGER.warning("Error while unload provider %s: %s", provider.name, str(err)) finally: self._providers.pop(instance_id, None) + await self._update_available_providers_cache() self.signal_event(EventType.PROVIDERS_UPDATED, data=self.get_providers()) def _register_api_commands(self) -> None: @@ -647,3 +650,17 @@ class MusicAssistant: if exc_val: raise exc_val return exc_type + + async def _update_available_providers_cache(self) -> None: + """Update the global cache variable of loaded/available providers.""" + await set_global_cache_values( + { + "provider_domains": {x.domain for x in self.providers}, + "provider_instance_ids": {x.instance_id for x in self.providers}, + "available_providers": { + *{x.domain for x in self.providers}, + *{x.instance_id for x in self.providers}, + }, + "unique_providers": {x.lookup_key for x in self.providers}, + } + )