Fix playback on Google cast and grouped players (#1146)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 16 Mar 2024 01:11:46 +0000 (02:11 +0100)
committerGitHub <noreply@github.com>
Sat, 16 Mar 2024 01:11:46 +0000 (02:11 +0100)
21 files changed:
music_assistant/__main__.py
music_assistant/common/helpers/global_cache.py [new file with mode: 0644]
music_assistant/common/models/enums.py
music_assistant/common/models/media_items.py
music_assistant/constants.py
music_assistant/server/controllers/config.py
music_assistant/server/controllers/metadata.py
music_assistant/server/controllers/players.py
music_assistant/server/controllers/streams.py
music_assistant/server/controllers/webserver.py
music_assistant/server/helpers/audio.py
music_assistant/server/models/core_controller.py
music_assistant/server/models/provider.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/chromecast/__init__.py
music_assistant/server/providers/dlna/__init__.py
music_assistant/server/providers/slimproto/__init__.py
music_assistant/server/providers/sonos/__init__.py
music_assistant/server/providers/sonos/player.py
music_assistant/server/providers/ugp/__init__.py
music_assistant/server/server.py

index 3a4b0f49443ad8192f4cb417004c0e9f74b2f1a7..7d1cf1f1ab1cf8d490f3213a51bc83a3adaef379 100644 (file)
@@ -9,15 +9,16 @@ import os
 import subprocess
 import sys
 import threading
+import traceback
 from contextlib import suppress
 from logging.handlers import RotatingFileHandler
-from typing import Final
+from typing import Any, Final
 
 from aiorun import run
 from colorlog import ColoredFormatter
 
 from music_assistant.common.helpers.json import json_loads
-from music_assistant.constants import ROOT_LOGGER_NAME
+from music_assistant.constants import ROOT_LOGGER_NAME, VERBOSE_LOG_LEVEL
 from music_assistant.server import MusicAssistant
 from music_assistant.server.helpers.logging import activate_log_queue_handler
 
@@ -28,6 +29,14 @@ MAX_LOG_FILESIZE = 1000000 * 10  # 10 MB
 ALPINE_RELEASE_FILE = "/etc/alpine-release"
 
 
+class VerboseLogger(logging.Logger):
+    """Custom python logger with included verbose log level."""
+
+    def verbose(self, msg, *args, **kwargs):
+        """Log a verbose message."""
+        self.log(VERBOSE_LOG_LEVEL, msg, *args, **kwargs)
+
+
 def get_arguments():
     """Arguments handling."""
     parser = argparse.ArgumentParser(description="MusicAssistant")
@@ -68,6 +77,7 @@ def setup_logger(data_path: str, level: str = "DEBUG"):
             datefmt=FORMAT_DATETIME,
             reset=True,
             log_colors={
+                "VERBOSE": "light_black",
                 "DEBUG": "cyan",
                 "INFO": "green",
                 "WARNING": "yellow",
@@ -89,10 +99,11 @@ def setup_logger(data_path: str, level: str = "DEBUG"):
     with suppress(OSError):
         file_handler.doRollover()
     file_handler.setFormatter(logging.Formatter(log_fmt, datefmt=FORMAT_DATETIME))
-    # file_handler.setLevel(logging.INFO)
 
     logger = logging.getLogger()
     logger.addHandler(file_handler)
+    logging.addLevelName(VERBOSE_LOG_LEVEL, "VERBOSE")
+    logging.setLoggerClass(VerboseLogger)
 
     # apply the configured global log level to the (root) music assistant logger
     logging.getLogger(ROOT_LOGGER_NAME).setLevel(level)
@@ -136,6 +147,30 @@ def _enable_posix_spawn() -> None:
     subprocess._USE_POSIX_SPAWN = os.path.exists(ALPINE_RELEASE_FILE)
 
 
+def _global_loop_exception_handler(_: Any, context: dict[str, Any]) -> None:
+    """Handle all exception inside the core loop."""
+    kwargs = {}
+    if exception := context.get("exception"):
+        kwargs["exc_info"] = (type(exception), exception, exception.__traceback__)
+
+    logger = logging.getLogger(__package__)
+    if source_traceback := context.get("source_traceback"):
+        stack_summary = "".join(traceback.format_list(source_traceback))
+        logger.error(
+            "Error doing job: %s: %s",
+            context["message"],
+            stack_summary,
+            **kwargs,  # type: ignore[arg-type]
+        )
+        return
+
+    logger.error(
+        "Error doing task: %s",
+        context["message"],
+        **kwargs,  # type: ignore[arg-type]
+    )
+
+
 def main() -> None:
     """Start MusicAssistant."""
     # parse arguments
@@ -172,13 +207,14 @@ def main() -> None:
         activate_log_queue_handler()
         if dev_mode or log_level == "DEBUG":
             loop.set_debug(True)
+        loop.set_exception_handler(_global_loop_exception_handler)
         await mass.start()
 
     run(
         start_mass(),
         use_uvloop=enable_uvloop,
         shutdown_callback=on_shutdown,
-        executor_workers=64,
+        executor_workers=32,
     )
 
 
diff --git a/music_assistant/common/helpers/global_cache.py b/music_assistant/common/helpers/global_cache.py
new file mode 100644 (file)
index 0000000..719b2a0
--- /dev/null
@@ -0,0 +1,29 @@
+"""Provides a simple global memory cache."""
+
+
+from __future__ import annotations
+
+import asyncio
+from typing import Any
+
+# global cache - we use this on a few places (as limited as possible)
+# where we have no other options
+_global_cache_lock = asyncio.Lock()
+_global_cache = {}
+
+
+def get_global_cache_value(key: str, default: Any = None) -> Any:
+    """Get a value from the global cache."""
+    return _global_cache.get(key, default)
+
+
+async def set_global_cache_values(values: dict[str, Any]) -> Any:
+    """Set a value in the global cache (without locking)."""
+    async with _global_cache_lock:
+        for key, value in values.items():
+            _set_global_cache_value(key, value)
+
+
+def _set_global_cache_value(key: str, value: Any) -> Any:
+    """Set a value in the global cache (without locking)."""
+    _global_cache[key] = value
index be9fd48322d1985d2649a000ca390f57c529d6d8..f9193f58fe24d0e8fdc5d63c93e783dc30be3e69 100644 (file)
@@ -202,11 +202,13 @@ class PlayerType(StrEnum):
     """Enum with possible Player Types.
 
     player: A regular player.
+    stereo_pair: Same as player but a dedicated stereo pair of 2 speakers.
     group: A (dedicated) group player or (universal) playergroup.
     sync_group: A group/preset of players that can be synced together.
     """
 
     PLAYER = "player"
+    STEREO_PAIR = "stereo_pair"
     GROUP = "group"
     SYNC_GROUP = "sync_group"
 
index 2727d7b2f1c6167c4cd7e6e49ba4855cb49698b1..577b879b78496747648da7265e7ed40b1f254cf3 100644 (file)
@@ -3,10 +3,11 @@
 from __future__ import annotations
 
 from dataclasses import dataclass, field, fields
-from typing import Any, Self
+from typing import TYPE_CHECKING, Any, Self, cast
 
 from mashumaro import DataClassDictMixin
 
+from music_assistant.common.helpers.global_cache import get_global_cache_value
 from music_assistant.common.helpers.uri import create_uri
 from music_assistant.common.helpers.util import create_sort_name, is_valid_uuid, merge_lists
 from music_assistant.common.models.enums import (
@@ -67,7 +68,7 @@ class AudioFormat(DataClassDictMixin):
         return self.output_format_str == other.output_format_str
 
 
-@dataclass(frozen=True, kw_only=True)
+@dataclass(kw_only=True)
 class ProviderMapping(DataClassDictMixin):
     """Model for a MediaItem's provider mapping details."""
 
@@ -87,6 +88,16 @@ class ProviderMapping(DataClassDictMixin):
         """Return quality score."""
         return self.audio_format.quality
 
+    def __post_init__(self):
+        """Call after init."""
+        # having items for unavailable providers can have all sorts
+        # of unpredictable results so ensure we have accurate availability status
+        if available_providers := get_global_cache_value("unique_providers"):
+            if TYPE_CHECKING:
+                available_providers = cast(set[str], available_providers)
+            if not available_providers.intersection({self.provider_domain, self.provider_instance}):
+                self.available = False
+
     def __hash__(self) -> int:
         """Return custom hash."""
         return hash((self.provider_instance, self.item_id))
index 7bb9b9a4d928b4de2e99ed79f23f099f921b0903..6b4635688e3fd9766e05ed55ea0e3b8ef3ebaf12 100644 (file)
@@ -90,3 +90,4 @@ CONFIGURABLE_CORE_CONTROLLERS = (
     "player_queues",
 )
 SYNCGROUP_PREFIX: Final[str] = "syncgroup_"
+VERBOSE_LOG_LEVEL: Final[int] = 5
index c6c99bd74dff47d8523331149d8a98ca7835e861..b06a949339da0b88bd8a2f43485b425e3af98cfb 100644 (file)
@@ -14,6 +14,7 @@ import shortuuid
 from aiofiles.os import wrap
 from cryptography.fernet import Fernet, InvalidToken
 
+from music_assistant.common.helpers.global_cache import get_global_cache_value
 from music_assistant.common.helpers.json import JSON_DECODE_EXCEPTIONS, json_dumps, json_loads
 from music_assistant.common.models import config_entries
 from music_assistant.common.models.config_entries import (
@@ -324,14 +325,13 @@ class ConfigController:
         self, provider: str | None = None, include_values: bool = False
     ) -> list[PlayerConfig]:
         """Return all known player configurations, optionally filtered by provider domain."""
-        available_providers = {x.instance_id for x in self.mass.providers}
         return [
             await self.get_player_config(raw_conf["player_id"])
             if include_values
             else PlayerConfig.parse([], raw_conf)
             for raw_conf in list(self.get(CONF_PLAYERS, {}).values())
             # filter out unavailable providers
-            if raw_conf["provider"] in available_providers
+            if raw_conf["provider"] in get_global_cache_value("available_providers", [])
             # optional provider filter
             and (provider in (None, raw_conf["provider"]))
         ]
index 42e520468869b8beb1764ea5ab4c37e4da1f87af..64ae6702a5f868553641c09061cf6cfe5672cca0 100644 (file)
@@ -16,7 +16,7 @@ import aiofiles
 from aiohttp import web
 
 from music_assistant.common.models.enums import ImageType, MediaType, ProviderFeature, ProviderType
-from music_assistant.common.models.errors import MediaNotFoundError
+from music_assistant.common.models.errors import MediaNotFoundError, ProviderUnavailableError
 from music_assistant.common.models.media_items import (
     Album,
     Artist,
@@ -357,6 +357,8 @@ class MetaDataController(CoreController):
         image_format: str = "png",
     ) -> bytes | str:
         """Get/create thumbnail image for path (image url or local path)."""
+        if provider != "url" and not self.mass.get_provider(provider):
+            raise ProviderUnavailableError
         thumbnail = await get_image_thumb(
             self.mass, path, size=size, provider=provider, image_format=image_format
         )
@@ -371,10 +373,11 @@ class MetaDataController(CoreController):
         provider = request.query.get("provider", "url")
         size = int(request.query.get("size", "0"))
         image_format = request.query.get("fmt", "png")
+        if provider != "url" and not self.mass.get_provider(provider):
+            return web.Response(status=404)
         if "%" in path:
             # assume (double) encoded url, decode it
             path = urllib.parse.unquote(path)
-
         with suppress(FileNotFoundError):
             image_data = await self.get_thumbnail(
                 path, size=size, provider=provider, image_format=image_format
index 812c27d9985eae9475724708a58e8225523a1004..74a46398f10ff8ed5e1e827f797395108dd36a42 100644 (file)
@@ -860,10 +860,10 @@ class PlayerController(CoreController):
                 # Poll player;
                 # - every 120 seconds if the player if not powered
                 # - every 30 seconds if the player is powered
-                # - every 10 seconds if the player is playing
+                # - every 5 seconds if the player is playing
                 if (
                     (player.powered and count % 30 == 0)
-                    or (player_playing and count % 10 == 0)
+                    or (player_playing and count % 5 == 0)
                     or count % 120 == 0
                 ) and (player_prov := self.get_player_provider(player_id)):
                     try:
index d5dfaf971872712755c36c9e8e8d281951d0f96d..e9db6aaef012561ab092aa44be60c54a41d9df60 100644 (file)
@@ -36,6 +36,7 @@ from music_assistant.constants import (
     CONF_PUBLISH_IP,
     SILENCE_FILE,
 )
+from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
 from music_assistant.server.helpers.audio import (
     check_audio_support,
     crossfade_pcm_parts,
@@ -91,7 +92,6 @@ class MultiClientQueueStreamJob:
         pcm_audio_source: AsyncGenerator[bytes, None],
         pcm_format: AudioFormat,
         expected_players: set[str],
-        auto_start: bool = True,
     ) -> None:
         """Initialize MultiClientQueueStreamJob instance."""
         self.mass = mass
@@ -99,17 +99,18 @@ class MultiClientQueueStreamJob:
         self.pcm_format = pcm_format
         self.expected_players = expected_players
         self.job_id = shortuuid.uuid()
-        self.auto_start = auto_start
         self.bytes_streamed: int = 0
         self.logger = self.mass.streams.logger.getChild(f"stream_job.{self.job_id}")
         self._subscribed_players: dict[str, asyncio.Queue] = {}
-        self._finished = asyncio.Event()
-        self._audio_task: asyncio.Task | None = None
+        self._finished = False
+        self._running = False
+        self._allow_start = asyncio.Event()
+        self._audio_task = asyncio.create_task(self._stream_job_runner())
 
     @property
     def finished(self) -> bool:
         """Return if this StreamJob is finished."""
-        return self._finished.is_set() or self._audio_task and self._audio_task.done()
+        return self._finished or self._audio_task and self._audio_task.done()
 
     @property
     def pending(self) -> bool:
@@ -119,21 +120,13 @@ class MultiClientQueueStreamJob:
     @property
     def running(self) -> bool:
         """Return if this Job is running."""
-        return self._audio_task and not self._audio_task.done()
+        return self._running and self._audio_task and not self._audio_task.done()
 
     def start(self) -> None:
         """Start running (send audio chunks to connected players)."""
-        if self.running:
-            return
         if self.finished:
             raise RuntimeError("Task is already finished")
-        self.logger.debug(
-            "Starting multi client stream job %s with %s out of %s connected clients",
-            self.job_id,
-            len(self._subscribed_players),
-            len(self.expected_players),
-        )
-        self._audio_task = asyncio.create_task(self._stream_job_runner())
+        self._allow_start.set()
 
     def stop(self) -> None:
         """Stop running this job."""
@@ -141,7 +134,7 @@ class MultiClientQueueStreamJob:
             return
         if self._audio_task:
             self._audio_task.cancel()
-        self._finished.set()
+        self._finished = True
 
     def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str:
         """Resolve the childplayer specific stream URL to this streamjob."""
@@ -180,7 +173,7 @@ class MultiClientQueueStreamJob:
     async def _subscribe_pcm(self, player_id: str) -> AsyncGenerator[bytes, None]:
         """Subscribe consumer and iterate incoming (raw pcm) chunks on the queue."""
         try:
-            self._subscribed_players[player_id] = queue = asyncio.Queue(1)
+            self._subscribed_players[player_id] = queue = asyncio.Queue(2)
 
             if self.running:
                 # client subscribes while we're already started
@@ -191,17 +184,8 @@ class MultiClientQueueStreamJob:
             else:
                 self.logger.debug("Subscribed player %s", player_id)
 
-            await asyncio.sleep(0.2)  # debounce
-            if (
-                self.auto_start
-                and not self.running
-                and len(self._subscribed_players) == len(self.expected_players)
-            ):
-                # we reached the number of expected subscribers, set event
-                # so that chunks can be pushed
-                self.start()
             # yield from queue until finished
-            while not self._finished.is_set():
+            while not self._finished:
                 yield await queue.get()
         finally:
             if sub_queue := self._subscribed_players.pop(player_id, None):
@@ -215,11 +199,29 @@ class MultiClientQueueStreamJob:
 
     async def _stream_job_runner(self) -> None:
         """Feed audio chunks to StreamJob subscribers."""
+        await self._allow_start.wait()
+        retries = 50
+        while retries:
+            retries -= 1
+            await asyncio.sleep(0.2)
+            if len(self._subscribed_players) != len(self.expected_players):
+                continue
+            await asyncio.sleep(0.2)
+            if len(self._subscribed_players) != len(self.expected_players):
+                continue
+            break
+
+        self.logger.debug(
+            "Starting multi client stream job %s with %s out of %s connected clients",
+            self.job_id,
+            len(self._subscribed_players),
+            len(self.expected_players),
+        )
         async for chunk in self.pcm_audio_source:
             async with asyncio.TaskGroup() as tg:
                 for listener_queue in list(self._subscribed_players.values()):
                     tg.create_task(listener_queue.put(chunk))
-        self._finished.set()
+        self._finished = True
 
 
 def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
@@ -319,6 +321,8 @@ class StreamsController(CoreController):
             version,
             "with libsoxr support" if libsoxr_support else "",
         )
+        # copy log level to audio module
+        AUDIO_LOGGER.setLevel(self.logger.level)
         # start the webserver
         self.publish_port = config.get_value(CONF_BIND_PORT)
         self.publish_ip = config.get_value(CONF_PUBLISH_IP)
@@ -395,7 +399,6 @@ class StreamsController(CoreController):
         pcm_bit_depth: int = 24,
         pcm_sample_rate: int = 48000,
         expected_players: set[str] | None = None,
-        auto_start: bool = True,
     ) -> MultiClientQueueStreamJob:
         """
         Create a MultiClientQueueStreamJob for the given queue..
@@ -426,7 +429,6 @@ class StreamsController(CoreController):
             ),
             pcm_format=pcm_format,
             expected_players=expected_players or set(),
-            auto_start=auto_start,
         )
         return stream_job
 
@@ -845,6 +847,7 @@ class StreamsController(CoreController):
                 queue.display_name,
                 queue_track.streamdetails.seconds_streamed,
             )
+
         # end of queue flow: make sure we yield the last_fadeout_part
         if last_fadeout_part:
             yield last_fadeout_part
index dd726900e77b9277d3167e378180bbaf2cd66e0f..095ccca63c1f3d4a92b56a76789ac802544567e6 100644 (file)
@@ -31,7 +31,7 @@ from music_assistant.common.models.api import (
 from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueOption
 from music_assistant.common.models.enums import ConfigEntryType
 from music_assistant.common.models.errors import InvalidCommand
-from music_assistant.constants import CONF_BIND_IP, CONF_BIND_PORT
+from music_assistant.constants import CONF_BIND_IP, CONF_BIND_PORT, VERBOSE_LOG_LEVEL
 from music_assistant.server.helpers.api import APICommandHandler, parse_arguments
 from music_assistant.server.helpers.audio import get_preview_stream
 from music_assistant.server.helpers.util import get_ips
@@ -47,7 +47,6 @@ if TYPE_CHECKING:
 DEFAULT_SERVER_PORT = 8095
 CONF_BASE_URL = "base_url"
 CONF_EXPOSE_SERVER = "expose_server"
-DEBUG = False  # Set to True to enable very verbose logging of all incoming/outgoing messages
 MAX_PENDING_MSG = 512
 CANCELLATION_ERRORS: Final = (asyncio.CancelledError, futures.CancelledError)
 
@@ -229,14 +228,6 @@ class WebserverController(CoreController):
         return web.Response(text=log_data, content_type="text/text")
 
 
-class WebSocketLogAdapter(logging.LoggerAdapter):
-    """Add connection id to websocket log messages."""
-
-    def process(self, msg: str, kwargs: Any) -> tuple[str, Any]:
-        """Add connid to websocket log messages."""
-        return f'[{self.extra["connid"]}] {msg}', kwargs
-
-
 class WebsocketClientHandler:
     """Handle an active websocket client connection."""
 
@@ -248,8 +239,7 @@ class WebsocketClientHandler:
         self._to_write: asyncio.Queue = asyncio.Queue(maxsize=MAX_PENDING_MSG)
         self._handle_task: asyncio.Task | None = None
         self._writer_task: asyncio.Task | None = None
-        self.log_level = webserver.log_level
-        self._logger = WebSocketLogAdapter(webserver.logger, {"connid": id(self)})
+        self._logger = webserver.logger
 
     async def disconnect(self) -> None:
         """Disconnect client."""
@@ -269,7 +259,7 @@ class WebsocketClientHandler:
             self._logger.warning("Timeout preparing request from %s", request.remote)
             return wsock
 
-        self._logger.debug("Connection from %s", request.remote)
+        self._logger.log(VERBOSE_LOG_LEVEL, "Connection from %s", request.remote)
         self._handle_task = asyncio.current_task()
         self._writer_task = asyncio.create_task(self._writer())
 
@@ -295,8 +285,7 @@ class WebsocketClientHandler:
                     disconnect_warn = "Received non-Text message."
                     break
 
-                if DEBUG:
-                    self._logger.debug("Received: %s", msg.data)
+                self._logger.log(VERBOSE_LOG_LEVEL, "Received: %s", msg.data)
 
                 try:
                     command_msg = CommandMessage.from_json(msg.data)
@@ -315,7 +304,7 @@ class WebsocketClientHandler:
         finally:
             # Handle connection shutting down.
             unsub_callback()
-            self._logger.debug("Unsubscribed from events")
+            self._logger.log(VERBOSE_LOG_LEVEL, "Unsubscribed from events")
 
             try:
                 self._to_write.put_nowait(None)
@@ -327,7 +316,7 @@ class WebsocketClientHandler:
 
             finally:
                 if disconnect_warn is None:
-                    self._logger.debug("Disconnected")
+                    self._logger.log(VERBOSE_LOG_LEVEL, "Disconnected")
                 else:
                     self._logger.warning("Disconnected: %s", disconnect_warn)
 
@@ -395,8 +384,7 @@ class WebsocketClientHandler:
                     message: str = process()
                 else:
                     message = process
-                if DEBUG:
-                    self._logger.debug("Writing: %s", message)
+                self._logger.log(VERBOSE_LOG_LEVEL, "Writing: %s", message)
                 await self.wsock.send_str(message)
 
     def _send_message(self, message: MessageType) -> None:
index 529bd4f693533b15975c617b4af02a44f525a9c3..9550cace93bae47c2e35097f314da7a5e071e56e 100644 (file)
@@ -15,6 +15,10 @@ from typing import TYPE_CHECKING
 import aiofiles
 from aiohttp import ClientResponseError, ClientTimeout
 
+from music_assistant.common.helpers.global_cache import (
+    get_global_cache_value,
+    set_global_cache_values,
+)
 from music_assistant.common.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
 from music_assistant.common.models.errors import (
     AudioError,
@@ -32,6 +36,7 @@ from music_assistant.constants import (
     CONF_VOLUME_NORMALIZATION,
     CONF_VOLUME_NORMALIZATION_TARGET,
     ROOT_LOGGER_NAME,
+    VERBOSE_LOG_LEVEL,
 )
 from music_assistant.server.helpers.playlists import fetch_playlist
 
@@ -102,7 +107,8 @@ async def crossfade_pcm_parts(
     async with AsyncProcess(args, True) as proc:
         crossfade_data, _ = await proc.communicate(fade_in_part)
         if crossfade_data:
-            LOGGER.debug(
+            LOGGER.log(
+                5,
                 "crossfaded 2 pcm chunks. fade_in_part: %s - "
                 "fade_out_part: %s - fade_length: %s seconds",
                 len(fade_in_part),
@@ -160,11 +166,12 @@ async def strip_silence(
 
     # return stripped audio
     bytes_stripped = len(audio_data) - len(stripped_data)
-    if LOGGER.isEnabledFor(logging.DEBUG):
+    if LOGGER.isEnabledFor(5):
         pcm_sample_size = int(sample_rate * (bit_depth / 8) * 2)
         seconds_stripped = round(bytes_stripped / pcm_sample_size, 2)
         location = "end" if reverse else "begin"
-        LOGGER.debug(
+        LOGGER.log(
+            5,
             "stripped %s seconds of silence from %s of pcm audio. bytes stripped: %s",
             seconds_stripped,
             location,
@@ -185,25 +192,16 @@ async def analyze_loudness(mass: MusicAssistant, streamdetails: StreamDetails) -
         item_name = f"{streamdetails.provider}/{streamdetails.item_id}"
         LOGGER.debug("Start analyzing EBU R128 loudness for %s", item_name)
         # calculate EBU R128 integrated loudness with ffmpeg
-        input_file = streamdetails.direct or "-"
-        proc_args = [
-            "ffmpeg",
-            "-protocol_whitelist",
-            "file,http,https,tcp,tls,crypto,pipe,fd",
-            "-t",
-            "600",  # limit to 10 minutes to prevent OOM
-            "-i",
-            input_file,
-            "-f",
-            streamdetails.audio_format.content_type,
-            "-af",
-            "loudnorm=print_format=json",
-            "-f",
-            "null",
-            "-",
-        ]
+        ffmpeg_args = _get_ffmpeg_args(
+            input_format=streamdetails.audio_format,
+            output_format=streamdetails.audio_format,
+            filter_params=["loudnorm=print_format=json"],
+            extra_args=["-t", "600"],  # limit to 10 minutes to prevent OOM
+            input_path=streamdetails.direct or "-",
+            output_path="NULL",
+        )
         async with AsyncProcess(
-            proc_args,
+            ffmpeg_args,
             enable_stdin=streamdetails.direct is None,
             enable_stdout=False,
             enable_stderr=True,
@@ -413,7 +411,7 @@ async def get_media_stream(  # noqa: PLR0915
         filter_params.append(filter_rule)
     if fade_in:
         filter_params.append("afade=type=in:start_time=0:duration=3")
-    ffmpeg_args = await _get_ffmpeg_args(
+    ffmpeg_args = _get_ffmpeg_args(
         input_format=streamdetails.audio_format,
         output_format=pcm_format,
         filter_params=filter_params,
@@ -432,14 +430,14 @@ async def get_media_stream(  # noqa: PLR0915
 
     async def writer() -> None:
         """Task that grabs the source audio and feeds it to ffmpeg."""
-        logger.debug("writer started for %s", streamdetails.uri)
+        logger.log(VERBOSE_LOG_LEVEL, "writer started for %s", streamdetails.uri)
         music_prov = mass.get_provider(streamdetails.provider)
         seek_pos = seek_position if streamdetails.can_seek else 0
         async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos):
             await ffmpeg_proc.write(audio_chunk)
         # write eof when last packet is received
         ffmpeg_proc.write_eof()
-        logger.debug("writer finished for %s", streamdetails.uri)
+        logger.log(VERBOSE_LOG_LEVEL, "writer finished for %s", streamdetails.uri)
 
     if streamdetails.direct is None:
         writer_task = asyncio.create_task(writer())
@@ -521,9 +519,11 @@ async def get_media_stream(  # noqa: PLR0915
         _, stderr = await ffmpeg_proc.communicate()
         if ffmpeg_proc.returncode != 0:
             # ffmpeg has a non zero returncode meaning trouble
-            logger.warning("STREAM ERROR on %s", streamdetails.uri)
+            logger.warning("stream error on %s", streamdetails.uri)
             logger.warning(stderr.decode())
+            finished = False
         elif loudness_details := _parse_loudnorm(stderr):
+            logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
             required_seconds = 300 if streamdetails.media_type == MediaType.RADIO else 60
             if finished or seconds_streamed >= required_seconds:
                 LOGGER.debug("Loudness measurement for %s: %s", streamdetails.uri, loudness_details)
@@ -532,7 +532,7 @@ async def get_media_stream(  # noqa: PLR0915
                     streamdetails.item_id, streamdetails.provider, loudness_details
                 )
         else:
-            logger.debug(stderr.decode())
+            logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
 
         # report playback
         if finished or seconds_streamed > 30:
@@ -723,7 +723,7 @@ async def get_ffmpeg_stream(
             yield chunk
         return
 
-    ffmpeg_args = await _get_ffmpeg_args(
+    ffmpeg_args = _get_ffmpeg_args(
         input_format=input_format,
         output_format=output_format,
         filter_params=filter_params or [],
@@ -768,15 +768,11 @@ async def get_ffmpeg_stream(
             # ffmpeg has a non zero returncode meaning trouble
             logger.warning("FFMPEG ERROR\n%s", stderr.decode())
         else:
-            logger.debug(stderr.decode())
+            logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
 
 
 async def check_audio_support() -> tuple[bool, bool, str]:
     """Check if ffmpeg is present (with/without libsoxr support)."""
-    cache_key = "audio_support_cache"
-    if cache := globals().get(cache_key):
-        return cache
-
     # check for FFmpeg presence
     returncode, output = await check_output("ffmpeg -version")
     ffmpeg_present = returncode == 0 and "FFmpeg" in output.decode()
@@ -785,7 +781,8 @@ async def check_audio_support() -> tuple[bool, bool, str]:
     version = output.decode().split("ffmpeg version ")[1].split(" ")[0].split("-")[0]
     libsoxr_support = "enable-libsoxr" in output.decode()
     result = (ffmpeg_present, libsoxr_support, version)
-    globals()[cache_key] = result
+    # store in global cache for easy access by '_get_ffmpeg_args'
+    await set_global_cache_values({"ffmpeg_support": result})
     return result
 
 
@@ -932,7 +929,7 @@ def get_player_filter_params(
     return filter_params
 
 
-async def _get_ffmpeg_args(
+def _get_ffmpeg_args(
     input_format: AudioFormat,
     output_format: AudioFormat,
     filter_params: list[str],
@@ -941,7 +938,7 @@ async def _get_ffmpeg_args(
     output_path: str = "-",
 ) -> list[str]:
     """Collect all args to send to the ffmpeg process."""
-    ffmpeg_present, libsoxr_support, version = await check_audio_support()
+    ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support")
 
     if not ffmpeg_present:
         msg = (
@@ -996,17 +993,22 @@ async def _get_ffmpeg_args(
     input_args += ["-i", input_path]
 
     # collect output args
-    output_args = [
-        "-acodec",
-        output_format.content_type.name.lower(),
-        "-f",
-        output_format.content_type.value,
-        "-ac",
-        str(output_format.channels),
-        "-ar",
-        str(output_format.sample_rate),
-        output_path,
-    ]
+    if output_path.upper() == "NULL":
+        output_args = ["-f", "null", "-"]
+    elif output_format.content_type == ContentType.UNKNOWN:
+        output_args = [output_path]
+    else:
+        output_args = [
+            "-acodec",
+            output_format.content_type.name.lower(),
+            "-f",
+            output_format.content_type.value,
+            "-ac",
+            str(output_format.channels),
+            "-ar",
+            str(output_format.sample_rate),
+            output_path,
+        ]
 
     # prefer libsoxr high quality resampler (if present) for sample rate conversions
     if input_format.sample_rate != output_format.sample_rate and libsoxr_support:
index 5e1adbaead037c4a707c4282eeabaeae4ffb629e..00bf37c3d575bdda16dddc0e55b5289f300417e5 100644 (file)
@@ -68,11 +68,8 @@ class CoreController:
             log_level = self.mass.config.get_raw_core_config_value(
                 self.domain, CONF_LOG_LEVEL, "GLOBAL"
             )
-        self.log_level = log_level
         if log_level == "GLOBAL":
             self.logger.setLevel(mass_logger.level)
-        else:
-            self.logger.setLevel("DEBUG" if log_level == "VERBOSE" else log_level)
+        elif logging.getLogger().level > self.logger.level:
             # if the root logger's level is higher, we need to adjust that too
-            if logging.getLogger().level > self.logger.level:
-                logging.getLogger().setLevel(self.logger.level)
+            logging.getLogger().setLevel(self.logger.level)
index f2a3510fc4ae2f9ed31e7b5b6a33c8e648c1d300..33562065b523adc16b8243b181a68206176ef152 100644 (file)
@@ -29,14 +29,12 @@ class Provider:
         self.config = config
         mass_logger = logging.getLogger(ROOT_LOGGER_NAME)
         self.logger = mass_logger.getChild(f"providers.{self.domain}")
-        self.log_level = log_level = config.get_value(CONF_LOG_LEVEL)
+        log_level = config.get_value(CONF_LOG_LEVEL)
         if log_level == "GLOBAL":
             self.logger.setLevel(mass_logger.level)
-        else:
-            self.logger.setLevel("DEBUG" if log_level == "VERBOSE" else log_level)
+        elif logging.getLogger().level > self.logger.level:
             # if the root logger's level is higher, we need to adjust that too
-            if logging.getLogger().level > self.logger.level:
-                logging.getLogger().setLevel(self.logger.level)
+            logging.getLogger().setLevel(self.logger.level)
         self.logger.debug("Log level configured to %s", log_level)
         self.cache = mass.cache
         self.available = False
@@ -46,6 +44,11 @@ class Provider:
         """Return the features supported by this Provider."""
         return ()
 
+    @property
+    def lookup_key(self) -> str:
+        """Return instance_id if multi_instance capable or domain otherwise."""
+        return self.instance_id if self.manifest.multi_instance else self.domain
+
     async def loaded_in_mass(self) -> None:
         """Call after the provider has been loaded."""
 
index 263b839dd621c58dcafde73857f79aaba822418c..67b04858beada3f89c7a553d7b97355e95286d47 100644 (file)
@@ -40,7 +40,7 @@ from music_assistant.common.models.enums import (
 from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.common.models.player_queue import PlayerQueue
-from music_assistant.constants import CONF_SYNC_ADJUST
+from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
 from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params
 from music_assistant.server.helpers.process import check_output
 from music_assistant.server.models.player_provider import PlayerProvider
@@ -220,9 +220,9 @@ class AirplayStreamJob:
             player_id, CONF_PASSWORD, None
         ):
             extra_args += ["-password", device_password]
-        if self.prov.log_level == "DEBUG":
+        if self.prov.logger.isEnabledFor(logging.DEBUG):
             extra_args += ["-debug", "5"]
-        elif self.prov.log_level == "VERBOSE":
+        elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
             extra_args += ["-debug", "10"]
 
         args = [
@@ -291,11 +291,10 @@ class AirplayStreamJob:
             with open(named_pipe, "w") as f:
                 f.write(command)
 
-        if self.prov.log_level == "VERBOSE":
-            self.airplay_player.logger.debug("sending command %s", command)
+        self.airplay_player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
         await self.mass.create_task(send_data)
 
-    async def _log_watcher(self) -> None:  # noqa: PLR0915
+    async def _log_watcher(self) -> None:
         """Monitor stderr for the running CLIRaop process."""
         airplay_player = self.airplay_player
         mass_player = self.mass.players.get(airplay_player.player_id)
@@ -337,16 +336,15 @@ class AirplayStreamJob:
                 continue
             if "lost packet out of backlog" in line:
                 lost_packets += 1
-                if lost_packets == 10:
-                    logger.warning("Packet loss detected, resuming playback...")
+                if lost_packets == 30:
+                    logger.warning("Packet loss detected, restart playback...")
                     queue = self.mass.player_queues.get_active_queue(mass_player.player_id)
                     await self.mass.player_queues.resume(queue.queue_id)
                 else:
                     logger.debug(line)
                 continue
-            # debug log everything else
-            if self.prov.log_level == "VERBOSE":
-                logger.debug(line)
+            # verbose log everything else
+            logger.log(VERBOSE_LOG_LEVEL, line)
 
         # if we reach this point, the process exited
         logger.debug(
@@ -633,7 +631,7 @@ class AirplayProvider(PlayerProvider):
         # always stop existing stream first
         for airplay_player in self._get_sync_clients(player_id):
             if airplay_player.active_stream and airplay_player.active_stream.running:
-                self.mass.create_task(airplay_player.active_stream.stop(force=True))
+                await airplay_player.active_stream.stop(force=True)
         pcm_format = AudioFormat(
             content_type=ContentType.PCM_S16LE,
             sample_rate=44100,
@@ -693,6 +691,8 @@ class AirplayProvider(PlayerProvider):
                     )
                 airplay_player.active_stream = AirplayStreamJob(self, airplay_player)
                 tg.create_task(airplay_player.active_stream.start(start_ntp, audio_iterator))
+        if queue_item.queue_item_id != "flow":
+            stream_job.start()
 
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
         """Send VOLUME_SET command to given player.
@@ -823,11 +823,6 @@ class AirplayProvider(PlayerProvider):
         address = get_primary_ip_address(info)
         if address is None:
             return
-        # some guards if our info is valid/complete
-        if "md" not in info.decoded_properties:
-            return
-        if "et" not in info.decoded_properties:
-            return
         self.logger.debug("Discovered Airplay device %s on %s", display_name, address)
         self._players[player_id] = AirPlayPlayer(
             player_id, discovery_info=info, address=address, logger=self.logger.getChild(player_id)
@@ -888,10 +883,16 @@ class AirplayProvider(PlayerProvider):
                     break
 
             request = raw_request.decode("UTF-8")
-            headers_raw, body = request.split("\r\n\r\n", 1)
+            if "\r\n\r\n" in request:
+                headers_raw, body = request.split("\r\n\r\n", 1)
+            else:
+                headers_raw = request
+                body = ""
             headers_raw = headers_raw.split("\r\n")
             headers = {}
             for line in headers_raw[1:]:
+                if ":" not in line:
+                    continue
                 x, y = line.split(":", 1)
                 headers[x.strip()] = y.strip()
             active_remote = headers.get("Active-Remote")
index 8019a1e791806a2a5f827280e2f8637187a51003..719e830af35f16a8ea13b060c662d448d242f6d9 100644 (file)
@@ -9,7 +9,7 @@ import threading
 import time
 from dataclasses import dataclass
 from logging import Logger
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
 from uuid import UUID
 
 import pychromecast
@@ -34,7 +34,12 @@ from music_assistant.common.models.enums import (
 )
 from music_assistant.common.models.errors import PlayerUnavailableError
 from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.constants import CONF_CROSSFADE, CONF_FLOW_MODE, CONF_PLAYERS, MASS_LOGO_ONLINE
+from music_assistant.constants import (
+    CONF_CROSSFADE,
+    CONF_FLOW_MODE,
+    CONF_PLAYERS,
+    VERBOSE_LOG_LEVEL,
+)
 from music_assistant.server.models.player_provider import PlayerProvider
 
 from .helpers import CastStatusListener, ChromecastInfo
@@ -68,6 +73,9 @@ PLAYER_CONFIG_ENTRIES = (
     CONF_ENTRY_CROSSFADE_DURATION,
 )
 
+DEFAULT_APP_ID = "CC1AD845"
+ALT_APP_ID = "46C1A819"
+
 
 # Monkey patch the Media controller here to store the queue items
 _patched_process_media_status_org = MediaController._process_media_status
@@ -121,7 +129,6 @@ class CastPlayer:
     status_listener: CastStatusListener | None = None
     mz_controller: MultizoneController | None = None
     active_group: str | None = None
-    current_queue_item_id: str | None = None
 
 
 class ChromecastProvider(PlayerProvider):
@@ -149,7 +156,7 @@ class ChromecastProvider(PlayerProvider):
             self.mass.aiozc.zeroconf,
         )
         # set-up pychromecast logging
-        if self.log_level == "VERBOSE":
+        if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
             logging.getLogger("pychromecast").setLevel(logging.DEBUG)
         else:
             logging.getLogger("pychromecast").setLevel(self.logger.level + 10)
@@ -256,32 +263,13 @@ class ChromecastProvider(PlayerProvider):
             fade_in=fade_in,
             flow_mode=use_flow_mode,
         )
-        if use_flow_mode:
-            # In flow mode, all queue tracks are sent to the player as continuous stream.
-            # This comes at the cost of metadata (cast does not support ICY metadata).
-            cc_queue_items = [
-                self._create_cc_queue_item(None, url),
-                # add a special 'command' item to the queue
-                # this allows for on-player next buttons/commands to still work
-                self._create_cc_queue_item(
-                    None, self.mass.streams.get_command_url(queue_item.queue_id, "next")
-                ),
-            ]
-        else:
-            # handle normal playback using the chromecast queue to play items one by one
-            cc_queue_items = [
-                self._create_cc_queue_item(queue_item, url),
-            ]
         queuedata = {
-            "type": "QUEUE_LOAD",
-            "repeatMode": "REPEAT_OFF",  # handled by our queue controller
-            "shuffle": False,  # handled by our queue controller
-            "queueType": "PLAYLIST",
-            "startIndex": 0,  # Item index to play after this request or keep same item if undefined
-            "items": cc_queue_items,
+            "type": "LOAD",
+            "media": self._create_cc_media_item(queue_item, url),
         }
         # make sure that the media controller app is launched
-        await self._launch_app(castplayer)
+        app_id = ALT_APP_ID if use_flow_mode else DEFAULT_APP_ID
+        await self._launch_app(castplayer, app_id)
         # send queue info to the CC
         media_controller = castplayer.cc.media_controller
         await asyncio.to_thread(media_controller.send_message, queuedata, True)
@@ -289,11 +277,15 @@ class ChromecastProvider(PlayerProvider):
     async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
         """Handle enqueuing of the next queue item on the player."""
         castplayer = self.castplayers[player_id]
-        url = await self.mass.streams.resolve_stream_url(
-            player_id,
-            queue_item=queue_item,
-            output_codec=ContentType.FLAC,
-        )
+        if isinstance(queue_item, str):
+            url = self.mass.streams.get_command_url(queue_item, "next")
+            queue_item = None
+        else:
+            url = await self.mass.streams.resolve_stream_url(
+                player_id,
+                queue_item=queue_item,
+                output_codec=ContentType.FLAC,
+            )
         next_item_id = None
         status = castplayer.cc.media_controller.status
         # lookup position of current track in cast queue
@@ -308,15 +300,19 @@ class ChromecastProvider(PlayerProvider):
                 continue
             next_item_id = item["itemId"]
             # check if the next queue item isn't already queued
-            if (
-                item.get("media", {}).get("customData", {}).get("queue_item_id")
-                == queue_item.queue_item_id
-            ):
+            if item.get("media", {}).get("customData", {}).get("uri") == url:
                 return
         queuedata = {
             "type": "QUEUE_INSERT",
             "insertBefore": next_item_id,
-            "items": [self._create_cc_queue_item(queue_item, url)],
+            "items": [
+                {
+                    "autoplay": True,
+                    "startTime": 0,
+                    "preloadTime": 0,
+                    "media": self._create_cc_media_item(queue_item, url),
+                }
+            ],
         }
         media_controller = castplayer.cc.media_controller
         queuedata["mediaSessionId"] = media_controller.status.media_session_id
@@ -350,6 +346,7 @@ class ChromecastProvider(PlayerProvider):
             return
         try:
             await asyncio.to_thread(castplayer.cc.media_controller.update_status)
+            await self.update_flow_metadata(castplayer)
         except ConnectionResetError as err:
             raise PlayerUnavailableError from err
 
@@ -401,6 +398,12 @@ class ChromecastProvider(PlayerProvider):
                 if exclude.lower() in cast_info.friendly_name.lower():
                     enabled_by_default = False
 
+            if cast_info.is_audio_group and cast_info.is_multichannel_group:
+                player_type = PlayerType.STEREO_PAIR
+            elif cast_info.is_audio_group:
+                player_type = PlayerType.GROUP
+            else:
+                player_type = PlayerType.PLAYER
             # Instantiate chromecast object
             castplayer = CastPlayer(
                 player_id,
@@ -412,7 +415,7 @@ class ChromecastProvider(PlayerProvider):
                 player=Player(
                     player_id=player_id,
                     provider=self.instance_id,
-                    type=(PlayerType.GROUP if cast_info.is_audio_group else PlayerType.PLAYER),
+                    type=player_type,
                     name=cast_info.friendly_name,
                     available=False,
                     powered=False,
@@ -428,7 +431,10 @@ class ChromecastProvider(PlayerProvider):
                         PlayerFeature.ENQUEUE_NEXT,
                         PlayerFeature.PAUSE,
                     ),
-                    max_sample_rate=96000,
+                    # originally/officially cast supports 96k sample rate
+                    # but it seems a (recent?) update broke this
+                    # for now use 48k as max sample rate to play safe
+                    max_sample_rate=48000,
                     supports_24bit=True,
                     enabled_by_default=enabled_by_default,
                 ),
@@ -473,7 +479,7 @@ class ChromecastProvider(PlayerProvider):
         )
         # handle stereo pairs
         if castplayer.cast_info.is_multichannel_group:
-            castplayer.player.type = PlayerType.PLAYER
+            castplayer.player.type = PlayerType.STEREO_PAIR
             castplayer.player.group_childs = set()
         # handle cast groups
         if castplayer.cast_info.is_audio_group and not castplayer.cast_info.is_multichannel_group:
@@ -559,10 +565,9 @@ class ChromecastProvider(PlayerProvider):
 
     ### Helpers / utils
 
-    async def _launch_app(self, castplayer: CastPlayer) -> None:
+    async def _launch_app(self, castplayer: CastPlayer, app_id: str = DEFAULT_APP_ID) -> None:
         """Launch the default Media Receiver App on a Chromecast."""
         event = asyncio.Event()
-        app_id = pychromecast.config.APP_MEDIA_RECEIVER
 
         if castplayer.cc.app_id == app_id:
             return  # already active
@@ -574,8 +579,12 @@ class ChromecastProvider(PlayerProvider):
             # Quit the previous app before starting splash screen or media player
             if castplayer.cc.app_id is not None:
                 castplayer.cc.quit_app()
-            castplayer.logger.debug("Launching Default Media Receiver (%s) as active app.", app_id)
-            castplayer.cc.media_controller.launch(launched_callback)
+            castplayer.logger.debug("Launching App %s.", app_id)
+            castplayer.cc.socket_client.receiver_controller.launch_app(
+                app_id,
+                force_launch=True,
+                callback_function=launched_callback,
+            )
 
         await self.mass.loop.run_in_executor(None, launch)
         await event.wait()
@@ -589,31 +598,8 @@ class ChromecastProvider(PlayerProvider):
         castplayer.status_listener = None
         self.castplayers.pop(castplayer.player_id, None)
 
-    def _create_cc_queue_item(self, queue_item: QueueItem | None, stream_url: str):
-        """Create CC queue item from MA QueueItem."""
-        if queue_item is None:
-            # flow mode or other special type
-            return {
-                "autoplay": True,
-                "preloadTime": 10,
-                "startTime": 0,
-                "activeTrackIds": [],
-                "media": {
-                    "contentId": stream_url,
-                    "customData": {
-                        "uri": stream_url,
-                        "queue_item_id": stream_url,
-                    },
-                    "contentType": "audio/flac",
-                    "streamType": STREAM_TYPE_LIVE,
-                    "metadata": {
-                        "metadataType": 0,
-                        "title": "Music Assistant",
-                        "images": [{"url": MASS_LOGO_ONLINE}],
-                    },
-                    "duration": None,
-                },
-            }
+    def _create_cc_media_item(self, queue_item: QueueItem, stream_url: str) -> dict[str, Any]:
+        """Create CC media item from MA QueueItem."""
         duration = int(queue_item.duration) if queue_item.duration else None
         image_url = self.mass.metadata.get_image_url(queue_item.image) if queue_item.image else ""
         if queue_item.media_type == MediaType.TRACK and queue_item.media_item:
@@ -630,6 +616,16 @@ class ChromecastProvider(PlayerProvider):
                 "title": queue_item.media_item.name,
                 "images": [{"url": image_url}] if image_url else None,
             }
+        elif queue_item.streamdetails and queue_item.streamdetails.stream_title:
+            stream_type = STREAM_TYPE_LIVE
+            metadata = {
+                "metadataType": 3,
+                "songName": queue_item.streamdetails.stream_title.split(" - ")[-1],
+                "artist": queue_item.streamdetails.stream_title.split(" - ")[0],
+                "albumName": queue_item.name,
+                "images": [{"url": image_url}] if image_url else None,
+                "title": queue_item.streamdetails.stream_title.split(" - ")[-1],
+            }
         else:
             stream_type = STREAM_TYPE_LIVE
             metadata = {
@@ -638,20 +634,69 @@ class ChromecastProvider(PlayerProvider):
                 "images": [{"url": image_url}] if image_url else None,
             }
         return {
-            "autoplay": True,
-            "preloadTime": 10,
-            "playbackDuration": duration,
-            "startTime": 0,
-            "activeTrackIds": [],
-            "media": {
-                "contentId": stream_url,
-                "customData": {
-                    "uri": queue_item.uri,
-                    "queue_item_id": queue_item.queue_item_id,
-                },
-                "contentType": "audio/flac",
-                "streamType": stream_type,
-                "metadata": metadata,
-                "duration": duration,
+            "contentId": stream_url,
+            "customData": {
+                "uri": queue_item.uri,
+                "queue_item_id": queue_item.queue_item_id,
+                "deviceName": "Music Assistant",
             },
+            "contentType": "audio/flac",
+            "streamType": stream_type,
+            "metadata": metadata,
+            "duration": duration,
         }
+
+    async def update_flow_metadata(self, castplayer: CastPlayer) -> None:
+        """Update the metadata of a cast player running the flow stream."""
+        if not castplayer.player.powered:
+            return
+        if not castplayer.cc.media_controller.status.player_is_playing:
+            return
+        if castplayer.player.state != PlayerState.PLAYING:
+            return
+        if castplayer.cc.app_id != ALT_APP_ID:
+            return
+        queue = self.mass.player_queues.get_active_queue(castplayer.player_id)
+        if not (current_item := queue.current_item):
+            return
+        media_controller = castplayer.cc.media_controller
+        # update metadata of current item chromecast
+        if media_controller.status.media_custom_data["queue_item_id"] != current_item.queue_item_id:
+            cc_item = self._create_cc_media_item(current_item, "")
+            queuedata = {
+                "type": "PLAY",
+                "mediaSessionId": media_controller.status.media_session_id,
+                "customData": {
+                    "metadata": cc_item["metadata"],
+                },
+            }
+            self.mass.create_task(media_controller.send_message, queuedata, True)
+
+        if len(getattr(media_controller.status, "items", [])) < 2:
+            # In flow mode, all queue tracks are sent to the player as continuous stream.
+            # add a special 'command' item to the queue
+            # this allows for on-player next buttons/commands to still work
+            cmd_next_url = self.mass.streams.get_command_url(queue.queue_id, "next")
+            msg = {
+                "type": "QUEUE_INSERT",
+                "mediaSessionId": media_controller.status.media_session_id,
+                "items": [
+                    {
+                        "media": {
+                            "contentId": cmd_next_url,
+                            "customData": {
+                                "uri": cmd_next_url,
+                                "queue_item_id": cmd_next_url,
+                                "deviceName": "Music Assistant",
+                            },
+                            "contentType": "audio/flac",
+                            "streamType": STREAM_TYPE_LIVE,
+                            "metadata": {},
+                        },
+                        "autoplay": True,
+                        "startTime": 0,
+                        "preloadTime": 0,
+                    }
+                ],
+            }
+            self.mass.create_task(media_controller.send_message, msg, inc_session_id=True)
index 25606235ecf5562486434f72eda78c510fa2781a..bfa3e378576272afddff3b81d1e2cb283e24fa48 100644 (file)
@@ -39,7 +39,13 @@ from music_assistant.common.models.enums import (
 )
 from music_assistant.common.models.errors import PlayerUnavailableError
 from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.constants import CONF_CROSSFADE, CONF_ENFORCE_MP3, CONF_FLOW_MODE, CONF_PLAYERS
+from music_assistant.constants import (
+    CONF_CROSSFADE,
+    CONF_ENFORCE_MP3,
+    CONF_FLOW_MODE,
+    CONF_PLAYERS,
+    VERBOSE_LOG_LEVEL,
+)
 from music_assistant.server.helpers.didl_lite import create_didl_metadata
 from music_assistant.server.models.player_provider import PlayerProvider
 
@@ -144,7 +150,7 @@ def catch_request_errors(
         player_id = kwargs["player_id"] if "player_id" in kwargs else args[0]
         dlna_player = self.dlnaplayers[player_id]
         dlna_player.last_command = time.time()
-        if self.log_level == "VERBOSE":
+        if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
             self.logger.debug(
                 "Handling command %s for player %s",
                 func.__name__,
@@ -157,7 +163,7 @@ def catch_request_errors(
             return await func(self, *args, **kwargs)
         except UpnpError as err:
             dlna_player.force_poll = True
-            if self.log_level == "VERBOSE":
+            if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
                 self.logger.exception("Error during call %s: %r", func.__name__, err)
             else:
                 self.logger.error("Error during call %s: %r", func.__name__, str(err))
@@ -276,7 +282,7 @@ class DLNAPlayerProvider(PlayerProvider):
         self.dlnaplayers = {}
         self.lock = asyncio.Lock()
         # silence the async_upnp_client logger
-        if self.log_level == "VERBOSE":
+        if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
             logging.getLogger("async_upnp_client").setLevel(logging.DEBUG)
         else:
             logging.getLogger("async_upnp_client").setLevel(self.logger.level + 10)
index 93ae197c564700f0d2974dfa87e7e545317897e5..d2cce7907eec2de37d8e2bd96d662137c74d3abe 100644 (file)
@@ -52,6 +52,7 @@ from music_assistant.constants import (
     CONF_PORT,
     CONF_SYNC_ADJUST,
     MASS_LOGO_ONLINE,
+    VERBOSE_LOG_LEVEL,
 )
 from music_assistant.server.models.player_provider import PlayerProvider
 
@@ -78,8 +79,8 @@ STATE_MAP = {
 REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
 
 # sync constants
-MIN_DEVIATION_ADJUST = 8  # 8 milliseconds
-MIN_REQ_PLAYPOINTS = 3  # we need at least 3 measurements
+MIN_DEVIATION_ADJUST = 6  # 6 milliseconds
+MIN_REQ_PLAYPOINTS = 8  # we need at least 8 measurements
 DEVIATION_JUMP_IGNORE = 2000  # ignore a sudden unrealistic jump
 MAX_SKIP_AHEAD_MS = 500  # 0.5 seconds
 
@@ -228,7 +229,11 @@ class SlimprotoProvider(PlayerProvider):
         control_port = self.config.get_value(CONF_PORT)
         telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT)
         json_port = self.config.get_value(CONF_CLI_JSON_PORT)
-        logging.getLogger("aioslimproto").setLevel(self.logger.level)
+        # silence aioslimproto logger a bit
+        if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
+            logging.getLogger("aioslimproto").setLevel(logging.DEBUG)
+        else:
+            logging.getLogger("aioslimproto").setLevel(self.logger.level + 10)
         self.slimproto = SlimServer(
             cli_port=telnet_port or None,
             cli_port_json=json_port or None,
@@ -358,7 +363,6 @@ class SlimprotoProvider(PlayerProvider):
                         self._handle_play_url(
                             slimplayer,
                             url=stream_job.resolve_stream_url(
-                                player_id,
                                 slimplayer.player_id,
                                 output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
                             ),
@@ -367,6 +371,8 @@ class SlimprotoProvider(PlayerProvider):
                             auto_play=False,
                         )
                     )
+            if queue_item.queue_item_id != "flow":
+                stream_job.start()
         else:
             # regular, single player playback
             slimplayer = self.slimproto.get_player(player_id)
index af0c41359a3013580e40c3629fb14aeb95eb2188..3690e26d2664f3e930885e399f567c0374b42100 100644 (file)
@@ -32,7 +32,7 @@ from music_assistant.common.models.enums import (
 )
 from music_assistant.common.models.errors import PlayerCommandFailed, PlayerUnavailableError
 from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.constants import CONF_CROSSFADE
+from music_assistant.constants import CONF_CROSSFADE, VERBOSE_LOG_LEVEL
 from music_assistant.server.helpers.didl_lite import create_didl_metadata
 from music_assistant.server.models.player_provider import PlayerProvider
 
@@ -90,7 +90,7 @@ async def setup(
     zonegroupstate.EVENT_CACHE_TIMEOUT = SUBSCRIPTION_TIMEOUT
     prov = SonosPlayerProvider(mass, manifest, config)
     # set-up soco logging
-    if prov.log_level == "VERBOSE":
+    if prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
         logging.getLogger("soco").setLevel(logging.DEBUG)
         logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO)
     else:
index 857575d32d17c73002f9246b0bee303e6e2f04e0..4bfeb783612bcc178034de7cbf2286f4086f4434 100644 (file)
@@ -32,6 +32,7 @@ from music_assistant.common.helpers.datetime import utc
 from music_assistant.common.models.enums import PlayerFeature, PlayerState
 from music_assistant.common.models.errors import PlayerCommandFailed
 from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.constants import VERBOSE_LOG_LEVEL
 
 from .helpers import SonosUpdateError, soco_error
 
@@ -255,7 +256,7 @@ class SonosPlayer:
                 ]
                 if not subscriptions:
                     return
-                self.logger.debug("Creating subscriptions for %s", self.zone_name)
+                self.logger.log(VERBOSE_LOG_LEVEL, "Creating subscriptions for %s", self.zone_name)
                 results = await asyncio.gather(*subscriptions, return_exceptions=True)
                 for result in results:
                     self.log_subscription_result(result, "Creating subscription", logging.WARNING)
@@ -271,7 +272,7 @@ class SonosPlayer:
         """Cancel all subscriptions."""
         if not self._subscriptions:
             return
-        self.logger.debug("Unsubscribing from events for %s", self.zone_name)
+        self.logger.log(VERBOSE_LOG_LEVEL, "Unsubscribing from events for %s", self.zone_name)
         results = await asyncio.gather(
             *(subscription.unsubscribe() for subscription in self._subscriptions),
             return_exceptions=True,
@@ -284,7 +285,7 @@ class SonosPlayer:
         """Validate availability of the speaker based on recent activity."""
         if not self.should_poll:
             return
-        self.logger.debug("Polling player for availability...")
+        self.logger.log(VERBOSE_LOG_LEVEL, "Polling player for availability...")
         try:
             await asyncio.to_thread(self.ping)
             self._speaker_activity("ping")
@@ -301,7 +302,7 @@ class SonosPlayer:
         """Handle updated IP of a Sonos player (NOT async friendly)."""
         if self.available:
             return
-        self.logger.info(
+        self.logger.debug(
             "Player IP-address changed from %s to %s", self.soco.ip_address, ip_address
         )
         try:
@@ -427,7 +428,8 @@ class SonosPlayer:
         if av_transport_uri == current_track_uri and av_transport_uri.startswith("x-rincon:"):
             new_coordinator_uid = av_transport_uri.split(":")[-1]
             if new_coordinator_speaker := self.sonos_prov.sonosplayers.get(new_coordinator_uid):
-                self.logger.debug(
+                self.logger.log(
+                    5,
                     "Media update coordinator (%s) received for %s",
                     new_coordinator_speaker.zone_name,
                     self.zone_name,
@@ -828,7 +830,7 @@ class SonosPlayer:
                 return
             self._resub_cooldown_expires_at = None
 
-        self.logger.debug("Activity on %s from %s", self.zone_name, source)
+        self.logger.log(VERBOSE_LOG_LEVEL, "Activity on %s from %s", self.zone_name, source)
         self._last_activity = time.monotonic()
         was_available = self.available
         self.available = True
index 6aa5b16462fe711a77fce1780d39b82c562545c0..a9a0f786fd15ecf4a4d07c4b0e330281e8e2f0e0 100644 (file)
@@ -179,7 +179,7 @@ class UniversalGroupProvider(PlayerProvider):
 
         # create a multi-client stream job - all (direct) child's of this UGP group
         # will subscribe to this multi client queue stream
-        await self.mass.streams.create_multi_client_stream_job(
+        stream_job = await self.mass.streams.create_multi_client_stream_job(
             player_id,
             start_queue_item=queue_item,
             seek_position=seek_position,
@@ -199,6 +199,7 @@ class UniversalGroupProvider(PlayerProvider):
                     if member is None:
                         continue
                 tg.create_task(player_prov.play_media(member.player_id, ugp_queue_item, 0, False))
+        stream_job.start()
 
     async def poll_player(self, player_id: str) -> None:
         """Poll player for state updates."""
index 9a2f2dc7ee1100b23bed68aaa8ed7fd22135eba9..461f01b04912b7f72d849bcde19aacb9cd867f5b 100644 (file)
@@ -15,6 +15,7 @@ from aiohttp import ClientSession, TCPConnector
 from zeroconf import IPVersion, NonUniqueNameException, ServiceStateChange, Zeroconf
 from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf
 
+from music_assistant.common.helpers.global_cache import set_global_cache_values
 from music_assistant.common.helpers.util import get_ip_pton
 from music_assistant.common.models.api import ServerInfoMessage
 from music_assistant.common.models.enums import EventType, ProviderType
@@ -442,6 +443,7 @@ class MusicAssistant:
         self.create_task(provider.loaded_in_mass())
         self.config.set(f"{CONF_PROVIDERS}/{conf.instance_id}/last_error", None)
         self.signal_event(EventType.PROVIDERS_UPDATED, data=self.get_providers())
+        await self._update_available_providers_cache()
         # if this is a music provider, start sync
         if provider.type == ProviderType.MUSIC:
             self.music.start_sync(providers=[provider.instance_id])
@@ -468,6 +470,7 @@ class MusicAssistant:
                 LOGGER.warning("Error while unload provider %s: %s", provider.name, str(err))
             finally:
                 self._providers.pop(instance_id, None)
+                await self._update_available_providers_cache()
                 self.signal_event(EventType.PROVIDERS_UPDATED, data=self.get_providers())
 
     def _register_api_commands(self) -> None:
@@ -647,3 +650,17 @@ class MusicAssistant:
         if exc_val:
             raise exc_val
         return exc_type
+
+    async def _update_available_providers_cache(self) -> None:
+        """Update the global cache variable of loaded/available providers."""
+        await set_global_cache_values(
+            {
+                "provider_domains": {x.domain for x in self.providers},
+                "provider_instance_ids": {x.instance_id for x in self.providers},
+                "available_providers": {
+                    *{x.domain for x in self.providers},
+                    *{x.instance_id for x in self.providers},
+                },
+                "unique_providers": {x.lookup_key for x in self.providers},
+            }
+        )